一文聊聊go語言中的限流漏桶和令牌桶庫-每日短訊

          來源:php中文網 | 2023-02-04 12:05:49 |

          本篇文章帶大家聊聊go語言中的限流漏桶和令牌桶庫,介紹令牌桶和漏桶的實現原理以及在實際項目中簡單應用。


          (資料圖片)

          為什么需要限流中間件?

          在大數據量高并發訪問時,經常會出現服務或接口面對大量的請求而導致數據庫崩潰的情況,甚至引發連鎖反映導致整個系統崩潰?;蛘哂腥藧阂夤艟W站,大量的無用請求出現會導致緩存穿透的情況出現。使用限流中間件可以在短時間內對請求進行限制數量,起到降級的作用,從而保障了網站的安全性。

          應對大量并發請求的策略?

          使用消息中間件進行統一限制(降速)

          使用限流方案將多余請求返回(限流)

          升級服務器

          緩存(但仍然有緩存穿透等危險)

          等等

          可以看出在代碼已經無法提升的情況下,只能去提升硬件水平?;蛘吒膭蛹軜嬙偌右粚?!也可以使用消息中間件統一處理。而結合看來,限流方案是一種既不需要大幅改動也不需要高額開銷的策略。

          常見的限流方案

          令牌桶算法

          漏桶算法

          滑動窗口算法

          等等

          漏桶

          引入ratelimit庫

          庫函數源代碼

          // New returns a Limiter that will limit to the given RPS. func New(rate int, opts ...Option) Limiter {     return newAtomicBased(rate, opts...) }  // newAtomicBased returns a new atomic based limiter. func newAtomicBased(rate int, opts ...Option) *atomicLimiter {     // TODO consider moving config building to the implementation     // independent code.     config := buildConfig(opts)     perRequest := config.per / time.Duration(rate)     l := &atomicLimiter{         perRequest: perRequest,         maxSlack:   -1 * time.Duration(config.slack) * perRequest,         clock:      config.clock,     }      initialState := state{         last:     time.Time{},         sleepFor: 0,     }     atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))     return l }

          該函數使用了函數選項模式多個結構體對象進行初始化

          根據傳入的值來初始化一個桶結構體 rateint傳參 。

          初始化過程中包括了

          每一滴水需要的時間 perquest = config.per / time.Duration(rate)maxSlack寬松度(寬松度為負值)-1 * time.Duration(config.slack) * perRequest松緊度是用來規范等待時間的
          // Clock is the minimum necessary interface to instantiate a rate limiter with // a clock or mock clock, compatible with clocks created using // github.com/andres-erbsen/clock. type Clock interface {    Now() time.Time    Sleep(time.Duration) }

          同時還需要結構體Clock來記錄當前請求的時間now和此刻的請求所需要花費等待的時間sleep

          type state struct {    last     time.Time    sleepFor time.Duration }

          state主要用來記錄上次執行的時間以及當前執行請求需要花費等待的時間(作為中間狀態記錄)

          最重要的Take邏輯

          func (t *atomicLimiter) Take() time.Time {    var (       newState state       taken    bool       interval time.Duration    )    for !taken {       now := t.clock.Now()        previousStatePointer := atomic.LoadPointer(&t.state)       oldState := (*state)(previousStatePointer)        newState = state{          last:     now,          sleepFor: oldState.sleepFor,       }       if oldState.last.IsZero() {          taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))          continue       }       // 計算是否需要進行等待取水操作       newState.sleepFor += t.perRequest(每兩滴水之間的間隔時間) - now.Sub(oldState.last)(當前時間與上次取水時間的間隔)                // 如果等待取水時間特別小,就需要松緊度進行維護       if newState.sleepFor < t.maxSlack {          newState.sleepFor = t.maxSlack       }        // 如果等待時間大于0,就進行更新       if newState.sleepFor > 0 {          newState.last = newState.last.Add(newState.sleepFor)          interval, newState.sleepFor = newState.sleepFor, 0       }       taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))    }    t.clock.Sleep(interval)    // 最后返回需要等待的時間     return newState.last }

          實現一個Take方法

          該Take方法會進行原子性操作(可以理解為加鎖和解鎖),在大量并發請求下仍可以保證正常使用。

          記錄下當前的時間 now := t.clock.Now()

          oldState.last.IsZero()判斷是不是第一次取水,如果是就直接將state結構體中的值進行返回。而這個結構體中初始化了上次執行時間,如果是第一次取水就作為當前時間直接傳參。

          如果 newState.sleepFor非常小,就會出現問題,因此需要借助寬松度,一旦這個最小值比寬松度小,就用寬松度對取水時間進行維護。

          如果newState.sleepFor > 0就直接更新結構體中上次執行時間newState.last = newState.last.Add(newState.sleepFor)并記錄需要等待的時間interval, newState.sleepFor = newState.sleepFor, 0。

          如果允許取水和等待操作,那就說明沒有發生并發競爭的情況,就模擬睡眠時間t.clock.Sleep(interval)。然后將取水的目標時間進行返回,由服務端代碼來判斷是否打回響應或者等待該時間后繼續響應。

          t.clock.Sleep(interval)

          func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }

          實際上在一個請求來的時候,限流器就會進行睡眠對應的時間,并在睡眠后將最新取水時間返回。

          實際應用(使用Gin框架)

          func ratelimit1() func(ctx *gin.Context) {     r1 := rate1.New(100)     return func(ctx *gin.Context) {         now := time.Now()         //  Take 返回的是一個 time.Duration的時間         if r1.Take().Sub(now) > 0 {             // 返回的時間比當前的時間還大,說明需要進行等待             // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行             // 如果不需要等待請求時間,就直接進行Abort 然后返回             response(ctx, http.StatusRequestTimeout, "rate1 limit...")             fmt.Println("rate1 limit...")             ctx.Abort()             return         }         // 放行         ctx.Next()     } }

          這里你可以進行選擇是否返回。因為Take一定會執行sleep函數,所以當執行take結束后表示當前請求已經接到了水。當前演示使用第一種情況。

          如果你的業務要求響應不允許進行等待。那么可以在該請求接完水之后然后,如上例。

          如果你的業務允許響應等待,那么該請求等待對應的接水時間后進行下一步。具體代碼就是將if中的內容直接忽略。(建議使用)

          測試代碼

          這里定義了一個響應函數和一個handler函數方便測試

          func response(c *gin.Context, code int, info any) {    c.JSON(code, info) }  func pingHandler(c *gin.Context) {    response(c, 200, "ping ok~") }

          執行go test -run=Run -v先開啟一個web服務

          func TestRun(t *testing.T) {    r := gin.Default()     r.GET("/ping1", ratelimit1(), pingHandler)    r.GET("/ping2", ratelimit2(), helloHandler)     _ = r.Run(":4399") }

          使用接口壓力測試工具go-wrk進行測試->tsliwowicz/go-wrk: go-wrk)

          在golang引入install版本可以直接通過go install github.com/tsliwowicz/go-wrk@latest下載

          使用幫助

          Usage: go-wrk <options> <url>    Options:     -H       Header to add to each request (you can define multiple -H flags) (Default )     -M       HTTP method (Default GET)     -T       Socket/request timeout in ms (Default 1000)     -body    request body string or @filename (Default )     -c       Number of goroutines to use (concurrent connections) (Default 10)     -ca      CA file to verify peer against (SSL/TLS) (Default )     -cert    CA certificate file to verify peer against (SSL/TLS) (Default )     -d       Duration of test in seconds (Default 10)     -f       Playback file name (Default <empty>)     -help    Print help (Default false)     -host    Host Header (Default )     -http    Use HTTP/2 (Default true)     -key     Private key file name (SSL/TLS (Default )     -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)     -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)     -no-vr   Skip verifying SSL certificate of the server (Default false)     -redir   Allow Redirects (Default false)     -v       Print version details (Default false)

          -t 8個線程 -c 400個連接 -n 模擬100次請求 -d 替換-n 表示連接時間

          輸入go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1

          可以稍微等待一下水流積攢(壓測速度過快)。

          可以看出,89個請求全部返回。也就是說在一段請求高峰期,不會有請求進行響應。因此我認為既然內部已經睡眠,那么就也就應該對請求放行處理。

          令牌桶

          引入ratelimit

          go get -u github.com/juju/ratelimit

          初始化

          // NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {    return NewBucketWithClock(fillInterval, capacity, nil) }  // NewBucketWithClock is identical to NewBucket but injects a testable clock // interface. func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {    return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) }

          進行Bucket桶的初始化。

          func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {    if clock == nil {       clock = realClock{}    }     // 填充速率    if fillInterval <= 0 {       panic("token bucket fill interval is not > 0")    }     // 最大令牌容量    if capacity <= 0 {       panic("token bucket capacity is not > 0")    }     // 單次令牌生成量    if quantum <= 0 {       panic("token bucket quantum is not > 0")    }    return &Bucket{       clock:           clock,       startTime:       clock.Now(),       latestTick:      0,       fillInterval:    fillInterval,       capacity:        capacity,       quantum:         quantum,       availableTokens: capacity,    } }

          令牌桶初始化過程,初始化結構體 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。

          如果三個變量有一個小于或者等于0的話直接進行報錯返回。在最開始就將當前令牌數初始化為最大容量。

          調用

          // TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 {    tb.mu.Lock()    defer tb.mu.Unlock()    return tb.takeAvailable(tb.clock.Now(), count) }

          調用TakeAvailable函數,傳入參數為需要取出的令牌數量,返回參數是實際能夠取出的令牌數量。

          內部實現

          func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {    // 如果需要取出的令牌數小于等于零,那么就返回0個令牌     if count <= 0 {       return 0    }     // 根據時間對當前桶中令牌數進行計算    tb.adjustavailableTokens(tb.currentTick(now))     // 計算之后的令牌總數小于等于0,說明當前令牌不足取出,那么就直接返回0個令牌    if tb.availableTokens <= 0 {       return 0    }     // 如果當前存儲的令牌數量多于請求數量,那么就返回取出令牌數    if count > tb.availableTokens {       count = tb.availableTokens    }     // 調整令牌數    tb.availableTokens -= count    return count }

          如果需要取出的令牌數小于等于零,那么就返回0個令牌

          根據時間對當前桶中令牌數進行計算

          計算之后的令牌總數小于等于0,說明當前令牌不足取出,那么就直接返回0個令牌

          如果當前存儲的令牌數量多于請求數量,那么就返回取出令牌數

          調整令牌數

          調整令牌

          func (tb *Bucket) adjustavailableTokens(tick int64) {    lastTick := tb.latestTick    tb.latestTick = tick     // 如果當前令牌數大于最大等于容量,直接返回最大容量    if tb.availableTokens >= tb.capacity {       return    }     // 當前令牌數 += (當前時間 - 上次取出令牌數的時間) * quannum(每次生成令牌量)    tb.availableTokens += (tick - lastTick) * tb.quantum     // 如果當前令牌數大于最大等于容量, 將當前令牌數 = 最大容量 然后返回 當前令牌數    if tb.availableTokens > tb.capacity {       tb.availableTokens = tb.capacity    }    return }

          如果當前令牌數大于最大等于容量,直接返回最大容量

          當前令牌數 += (當前時間 - 上次取出令牌數的時間) * quannum(每次生成令牌量)

          如果當前令牌數大于最大等于容量, 將當前令牌數 = 最大容量 然后返回 當前令牌數

          實現原理

          加鎖 defer解鎖

          判斷count(想要取出的令牌數) 是否小于等于 0,如果是直接返回 0

          調用函數adjustTokens獲取可用的令牌數量

          如果當前可以取出的令牌數小于等于0 直接返回 0

          如果當前可以取出的令牌數小于當前想要取出的令牌數(count) count = 當前可以取出的令牌數

          當前的令牌數 -= 取出的令牌數 (count)

          返回 count(可以取出的令牌數)

          額外介紹

          take函數,能夠返回等待時間和布爾值,允許欠賬,沒有令牌也可以取出。

          func (tb *Bucket) Take(count int64) time.Duration

          takeMaxDuration函數,可以根據最大等待時間來進行判斷。

          func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

          因為他們內部的實現都基于令牌調整,我這里不做過多介紹,如果感興趣可以自行研究一下。

          測試

          func ratelimit2() func(ctx *gin.Context) {     // 生成速率 最大容量     r2 := rate2.NewBucket(time.Second, 200)     return func(ctx *gin.Context) {         //r2.Take() // 允許欠賬,令牌不夠也可以接收請求         if r2.TakeAvailable(1) == 1 {             // 如果想要取出1個令牌并且能夠取出,就放行             ctx.Next()             return         }         response(ctx, http.StatusRequestTimeout, "rate2 limit...")         ctx.Abort()         return     } }

          壓測速度過于快速,在實際過程中可以根據調整令牌生成速率來進行具體限流!

          小結

          令牌桶可以允許自己判斷請求是否繼續,內部不會進行睡眠操作。而漏桶需要進行睡眠,并沒有提供方法讓程序員進行判斷是否放行。

          以上就是一文聊聊go語言中的限流漏桶和令牌桶庫的詳細內容,更多請關注php中文網其它相關文章!

          關鍵詞: Golang go語言

          国产精品国产亚洲区艳妇糸列短篇 | 亚洲成人激情在线| 中文日韩亚洲欧美制服| 亚洲人成电影网站色| 亚洲综合av一区二区三区 | 亚洲AV性色在线观看| 亚洲第一区精品日韩在线播放| 国产亚洲日韩在线三区| 亚洲Aⅴ无码专区在线观看q| 亚洲国产成人爱av在线播放| 中文字幕日韩亚洲| 亚洲视频一区在线| 亚洲精品无码久久久久久| 久久精品国产精品亚洲| 亚洲成av人在线观看网站 | 久久精品国产亚洲av四虎| 亚洲AV无码久久久久网站蜜桃| 国产亚洲一卡2卡3卡4卡新区 | 亚洲伦理一区二区| 亚洲人配人种jizz| 亚洲区小说区图片区| 久久精品亚洲一区二区三区浴池 | 亚洲AⅤ无码一区二区三区在线| 亚洲Av综合色区无码专区桃色| 亚洲一区二区三区高清视频| 精品亚洲福利一区二区| 亚洲AV无码专区国产乱码4SE| 亚洲大成色www永久网址| 亚洲一级特黄无码片| 亚洲最大在线视频| 久久亚洲精品无码网站| 亚洲国产精品va在线播放 | 亚洲精品综合久久中文字幕| 亚洲另类无码一区二区三区| 国产亚洲成AV人片在线观黄桃| 亚洲欧洲另类春色校园网站| 国外亚洲成AV人片在线观看 | 亚洲日本久久一区二区va| 久久久久久久亚洲精品| 亚洲视频免费在线播放| 2022中文字字幕久亚洲|