分布式出站 http 速率限制器

我有一个微服务架构应用程序,其中多个服务轮询外部 API。外部 API 的速率限制为每分钟 600 个请求。如何让所有实例一起保持在共享 600 速率限制以下?

Google只给我带来了3个解决方案,最有希望的是:

  • myntra/golimit是这三个中最有前途的,但我实际上不知道如何设置它。

  • wallstreetcn/rate似乎仅在达到限制时才拒绝(我的应用程序需要等到它可以发出请求)并且funcEvery中的函数rate.NewLimiter似乎是不同的导入/依赖项,我无法弄清楚它是什么

  • manavo/go-rate-limiter有一个“软”限制,显然,这可能会让我超过限制。有些端点如果几秒钟内无法访问我并不介意,但其他端点请求应该尽可能有效。

目前我有一个业余解决方案。下面的代码允许我设置每分钟的限制,并且它在请求之间休眠以将请求分散到一分钟内。此客户端速率限制是针对每个实例的,因此我必须对 600 个请求除以实例数量进行硬编码。

var semaphore = make(chan struct{}, 5)

var rate = make(chan struct{}, 10)


func init(){

    // leaky bucket

    go func() {

        ticker := time.NewTicker(100 * time.Millisecond)

        defer ticker.Stop()

        for range ticker.C {

            _, ok := <-rate

            // if this isn't going to run indefinitely, signal

            // this to return by closing the rate channel.

            if !ok {

                return

            }

        }

}()

以及发出 http API 请求的函数内部。


rate <- struct{}{}


    // check the concurrency semaphore

    semaphore <- struct{}{}

    defer func() {

        <-semaphore

}()

如何让所有实例一起保持在共享 600 速率限制以下?


首选项: - 基于密钥的速率限制计数器,因此可以设置多个计数器。- 在设定的持续时间内分散请求,以便在前 30 秒内而不是在整分钟持续时间内发送 600 个请求。


HUWWW
浏览 107回答 2
2回答

拉风的咖菲猫

我无法与您找到的库交谈,但漏桶速率限制器非常简单。您需要某种共享事务存储。每个存储桶(或速率限制器)只是一个整数和一个时间值。该整数是特定时间桶中的滴数。每次必须应用速率限制时,减去自上次更新以来泄漏的滴数,然后加一,然后检查滴数是否在桶的容量范围内。我们正在使用 Redis 来完成这类事情。要在 Redis 中实现此事务性,需要一个脚本(。例如,在 SQL 数据库中,aSELECT FOR UPDATE后跟一条语句可以实现相同的效果。UPDATE这是我们的 Redis 脚本:-- replicate_commands allows us to use the TIME command. We depend on accurate-- (and reasonably consistent) timestamps. Multiple clients may have-- inacceptable clock drift.redis.replicate_commands()local rate = tonumber(ARGV[1]) -- how many drops leak away in one secondlocal cap = tonumber(ARGV[2]) -- how many drops fit in the bucketlocal now, _ = unpack(redis.call('TIME'))-- A bucket is represented by a hash with two keys, n and t. n is the number of-- drops in the bucket at time t (seconds since epoch).local xs = redis.call('HMGET', KEYS[1], 'n', 't')local n = tonumber(xs[1])local t = tonumber(xs[2])if type(n) ~= "number" or type(t) ~= "number" then    -- The bucket doesn't exist yet (n and t are false), or someone messed with    -- our hash values. Either way, pretend the bucket is empty.    n, t = 0, nowend-- remove drops that leaked since tn = n - (now-t)*rateif n < 0 then    n = 0end-- add one drop if it fitsif n < cap then    n = n + 1else    n = capendredis.call('HMSET', KEYS[1], 'n', n, 't', now)redis.call('EXPIRE', KEYS[1], math.floor(n/rate) + 1)return n每秒 10 滴的调用示例,容量为 10 滴:EVALSHA <SHA_IN_HEX> 1 rate-limit:my-bucket 10 10 该脚本返回桶中的滴数。如果该数字等于容量,您可以短暂休眠并重试,或者完全拒绝该请求,具体取决于您的要求。请注意,脚本永远不会返回大于容量的值,因此在您的情况下恢复时间不会超过十分之一秒。这可能不是您所需要的,因为您正在尝试匹配第三方速率限制器。也就是说,您可能可以接受桶溢出,从而导致突发请求后恢复时间更长。

慕森王

如果你想要一个全局速率限制器,你需要一个地方来维护分布式状态,比如zookeeper。通常,我们不想支付管理费用。或者,您可以设置转发代理,在其中进行速率限制。
打开App,查看更多内容
随时随地看视频慕课网APP