猿问

如何同时从速率受限的 API 端点进行提取?

我不能把我的头缠绕在这个问题上。我有一个需要从中提取数据的服务,该服务的速率限制为每秒5个请求,即使在使用包并在每个请求之前设置和调用它时,它有时仍会达到速率限制,我需要退出发送请求。我可能与另一个可能干扰请求预算的服务竞争,因此我想更好地处理它。x/rate/limitrate.Limiter(5,1)


我的问题是我需要解决这个问题,我一次处理5个请求,但是当一个请求达到速率限制时,下一个请求也是如此,服务器有时会增加我在发送另一个请求之前必须等待的时间。因此,如果有5个请求发出,如果一个达到速率限制,则其余请求也达到速率限制并且会卡住的可能性更大。


如何有效地解决此问题?我需要通过将受速率限制的请求反馈给工作人员来重新处理这些请求。当我达到速率限制时,我正在尝试停止所有工作线程,在给定的延迟后退,然后继续处理请求。


以下是我拥有的一些模拟代码示例:


package main


import (

    "context"

    "log"

    "net/http"

    "strconv"

    "sync"

    "time"


    "golang.org/x/time/rate"

)


// Rate-limit => 5 req/s


const (

    workers = 5

)


func main() {

    ctx, cancel := context.WithCancel(context.Background())


    // Mock function to grab all the serials to use in upcoming requests.

    serials, err := getAllSerials(ctx)

    if err != nil {

        panic(err)

    }


    // Set up for concurrent processing.

    jobC := make(chan string)            // job queue

    delayC := make(chan int)             // channel to receive delay

    resultC := make(chan *http.Response) // channel for results


    var wg *sync.WaitGroup


    // Set up rate limiter.

    limiter := rate.NewLimiter(5, 1)


    for i := 0; i < workers; i++ {

        wg.Add(1)

        go func() {

            defer wg.Done()


            for s := range jobC {

                limiter.Wait(ctx)

                res, err := doSomeRequest(s)

                if err != nil {

                    // Handle error.

                    log.Println(err)

                }


我注意到的是,当4/5请求达到速率限制时,将成功休眠和延迟(所有所有速率限制请求的总时间和时间,其中它只需要是最新的,因为它将具有新的总持续时间等待),但是当所有5个请求都达到速率限制时, 工人被卡住了,没有从频道读取。backOffProcessbackOffProcess


实现这一目标的更好方法是什么?


海绵宝宝撒
浏览 132回答 1
1回答

拉莫斯之舞

我真的不明白为什么你的被处在一个单独的goroutine中。我认为每个工作进程都应该在执行任务之前退缩(如果需要的话)。我看到它是这样的:backOffProcess&nbsp;backOffUntil := time.Now()&nbsp;backOffMutex := sync.Mutex{}&nbsp;go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for s := range jobC {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <-time.After(time.Until(backOffUntil))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; limiter.Wait(ctx)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; res, err := doSomeRequest(s)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Handle error.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Println(err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Handle rate limit.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if res.StatusCode == 429 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delay, _ := strconv.Atoi(res.Header.Get("Retry-After"))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Println("rate limit hit, backing off")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Back off.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; newbackOffUntil := time.Now().Add(time.Second * delay)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; backOffMutex.Lock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if newbackOffUntil.Unix() > backOffUntil.Unix() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; backOffUntil = newbackOffUntil&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; backOffMutex.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Put serial back into job queue.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; jobC <- s&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; resultC <- res&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }()
随时随地看视频慕课网APP

相关分类

Go
我要回答