My goal is to safely consume an API without getting 429 Too Many Requests
error, activating DDoS protection or going over system limit of maximum open file descriptors. To achieve that I wrote this simple wrapper around http.Client
using worker pool pattern:
package client
import (
"errors"
"net/http"
"time"
)
type job struct {
req *http.Request
respc chan *http.Response
errc chan error
}
type client chan job
func NewClient(
HTTPClient *http.Client,
MaxRequestsPerSecond float64,
MaxParallelConnections int,
) (*client, error) {
if MaxRequestsPerSecond <= 0 {
return nil, errors.New(
"client.NewClient: invalid MaxRequestsPerSecond",
)
}
if MaxParallelConnections <= 0 {
return nil, errors.New(
"client.NewClient: invalid MaxParallelConnections",
)
}
tasks := make(chan job)
go func() {
jobs := make(chan job)
defer close(jobs)
for i := 0; i < MaxParallelConnections; i++ {
go func() {
for j := range jobs {
resp, err := HTTPClient.Do(j.req)
if err != nil {
j.errc <- err
continue
}
j.respc <- resp
}
}()
}
for task := range tasks {
jobs <- task
time.Sleep(
time.Duration(
float64(time.Second) / MaxRequestsPerSecond,
),
)
}
}()
return (*client)(&tasks), nil
}
func (c *client) Close() {
close(*c)
}
func (c *client) Do(req *http.Request) (*http.Response, error) {
respc := make(chan *http.Response)
errc := make(chan error)
*c <- job{req, respc, errc}
select {
case resp := <- respc:
return resp, nil
case err := <- errc:
return nil, err
}
}
I already see comments like "export X to separate function" coming, but please make them more detailed.
Should I make the channels buffered? If so, how big should they be, or should the consumer provide the size?
Should I make
MaxParallelConnections
uint
and ditch the corresponding check?Could the problem be solved with a different approach altogether?
I am pretty new to Go, so any and all additional input is very welcome.
1 Answer 1
You can use a rate limiter to limit the number of requests processed. Think of the limiter as something that causes the program to wait on the line where the limiter is called, till the conditions are satisfied, then it'll continue
You can add it/change this section:
for i := 0; i < MaxParallelConnections; i++ {
go func() {
for j := range jobs {
resp, err := HTTPClient.Do(j.req)
if err != nil {
j.errc <- err
continue
}
j.respc <- resp
}
}()
}
create a limiter with a context:
throttlerCtx := context.Background()
throttler := rate.NewLimiter(25, 50) // immediately handle 25 requests, and process 25/sec; and unblock the limiter at a rate of 50/sec
So you could initialise like:
throttlerCtx := context.Background()
throttler := rate.NewLimiter(MaxParallelConnections, MaxParallelConnections) // think carefully here how you'd like it to be limited
So remove the for loop to do with MaxParallelConnections:
throttlerCtx := context.Background()
throttler := rate.NewLimiter(1, 1) // immediately handle 1, and 1 per second; unblock the limit at rate 1/sec
go func() {
for j := range jobs {
err := throttler.Wait(throttlerCtx) // this will block code here following: immediately handle 1, and 1 per second; unblock the limit at rate 1/sec
if err != nil {
// handle error
}
resp, err := HTTPClient.Do(j.req)
if err != nil {
j.errc <- err
continue
}
j.respc <- resp
}
Explore related questions
See similar questions with these tags.