服务治理:常用限流算法总结
一、为什么会有限流限流,看字面意思,限制流动。
为什么要限制流动?
比如高速公路出现了事故,交警会对高速路车辆的进入进行指挥和限制。
发生了一些意外情况,才可能要限制流动。等恢复正常情况后,就解除限制。不可能无缘无故的限制流动,毕竟限制会影响正常系统运行。
在举一个例子:
足球馆看足球比赛,足球馆的场地大小是固定的,座位数是固定的,能容纳看球人数总量是有限的。如果超过足球馆容量最大承载,会导致场内拥挤,这样会发生2个问题:一个是导致大家行动不便,一个可能会发生意想不到的事故。
那怎么办?球票。
一个足球场出售的球票是有限制的,一共卖多少张票是有一定数量额度。
系统容量有限,如果超过了系统的负荷,那么就需要做一些限制措施,避免系统运行时出现异常情况。
那在计算机系统中,为什么要限流?
同理,访问计算机系统时或者是计算机系统本身出现了一些异常情况。比如流量过大,系统处理不过来。比如系统升级等等情况。
计算机系统容量是有限的,内存大小,CPU 处理数据的速度,都是有限的,不可能无限大。如果超过了一定的阙值,系统就会出现异常,甚至宕机。
现在微服务架构比较流行,各种服务比较多,服务之间调用频繁。
如果访问一个 API 服务时,超过了这个服务能提供的最大访问能力,服务会崩溃,那就要对这个服务进行保护,避免服务因访问过大导致服务不可用,不仅影响自己服务,也可能影响其它相关服务。
采用什么方法保护服务呢?限流就是保护方法之一。
在 IT 高并发系统中,处于对系统的保护,需要对系统进行限流。
二、IT 系统中的限流上面已经介绍了 IT 系统中的一些限流问题。
下面来看看对使用限流的一些具体情况描述。
在互联网世界里,一根一根的网线把整个世界连接起来,那么网络里面传输的数据
流动起来就形成了网络流。TCP 里就有限制流量的算法-滑动窗口算法。
在微服务系统里的 API 接口中,对接口做限制,保护接口安全,保证系统稳定。
对接口访问请求,怎么描述接口请求情况?
一般用每秒请求数(request per second),并发请求数等,来描述对接口的请求情况。所以限制也是对每秒请求数进行限制。
还有平常使用的连接池技术,也可以理解为限流思想的一种,把连接数限制在一个数量上。把固定数量的连接放入“池子”中,很形象的说法。当然也是复用减少损耗。
三、常用限流算法常用的限流算法,一般有 4 种:
- 计数器
- 滑动窗口
- 漏桶
- 令牌桶
计数器算法:
在固定窗口内对请求进行计数,然后与设置的最大请求数进行比较,如果超过了最大值,就进行限流。到达了一个固定时间窗口终点,将计数器清零,重新开始计数。
计数器算法又叫 固定窗口算法-Fixed Window。
举个例子,比如在微服务中有一个接口,限制调用次数: 1 分钟内最大调用次数为 30。
根据描述,这个算法为:
设置最大请求数 MaxRequest = 30,窗口时间 WindowTime = 60 秒,
还有一个计时开始时间 BeginTime , 请求计数 Counter。
用 Go 写一个 demo,不过为了测试方便,把 MaxRequest 设置为 10,WindowTime 设置为 3 秒,程序如下:
package main
import (
"fmt"
"sync"
"time"
)
const (
MAXREQUEST = 10 // 限制最大请求数
WINDOWTIME = 3 * time.Second // 最大窗口时间
)
type limit struct {
beginTime time.Time
counter int
mu sync.Mutex
}
func (limit *limit) apiLimit() bool {
limit.mu.Lock()
defer limit.mu.Unlock()
nowTime := time.Now()
if nowTime.Sub(limit.beginTime) >= WINDOWTIME {
limit.beginTime = nowTime
limit.counter = 0
}
if limit.counter > MAXREQUEST {
return false
}
limit.counter++
fmt.Println("counter: ", limit.counter)
return true
}
func main() {
var wg sync.WaitGroup
var limit limit
for i := 0; i < 15; i++ {
wg.Add(1)
fmt.Println("req start:", i, time.Now())
go func(i int) {
if limit.apiLimit() {
fmt.Println("req counter: ", i, time.Now())
}
wg.Done()
}(i)
time.Sleep(150 * time.Millisecond)
}
wg.Wait()
}
-
算法优点:实现简单
-
算法缺点:
1.计数器算法有一个”临界时间点“问题。
比如限制 1 分钟内最大请求为 30 个。在 21:30:59 秒到达 30 个,然后 21:31:01 秒(临界时间)又瞬间到达 30 个,虽然两个时间窗内请求都符合限流要求,但在两个窗口临界时间 2 秒内集中了 60 次请求,超过了规定值 30。全局从速率来看 30/60s=0.5,而现在 60/2s =30,远超 0.5,这对系统来说可能就无法承受了。在这个 2 秒内不做限流,就可能会把我们的应用搞崩溃。
它无法应对两个时间窗口临界时间内的突发流量。
2.如果请求速度太快,会丢掉一些请求。
怎么解决“临界时间点”问题,看下面滑动窗口算法。
滑动窗口算法滑动窗口算法(Sliding Window)部分解决了计数器算法(固定时间窗口算法)“时间临界点” 的问题。
有的人还会把滑动窗口算法细分:滑动窗口日志(sliding window log) 和 滑动窗口计数(sliding window counter)。
滑动窗口计数滑动窗口算法:
在计数器算法中,把大时间窗口在进一步划分为更细小的时间窗口格子,随着时间向前移动,大时间窗每次向前移动一个小格子,而不是大时间窗向前移动。每个小格子都有自己独立计数器,小格子会记录每个请求到达的时间点。
最终统计比较:
- 比较小格子内请求数:(大时间窗口内规定最大请求数 / N个小格子) > 小格子时间窗内总请求数
举个例子:
把 1 分钟时间窗在划分为 6 个小格子时间窗,每个小格子 10 秒。每过 10 秒钟,时间窗口向右滑动一小格。每一个小格
都有自己独立的计数器 counter。下面图1到图2:
Go 例子 slidingwindow.go:
这个例子把最大请求数设置为300,最大时间窗时间设置为 30秒,小格子时间窗设置为 1秒,便于程序演示。
package main
import (
"fmt"
"sync"
"time"
)
const (
MAXREQUEST = 300 // 限制最大请求数
WINDOWTIME = 30 * time.Second // 最大窗口时间
)
type SlidingWindow struct {
smallWindowTime int64 // 小窗口时间大小
smallWindowNum int64 // 小窗口总数
smallWindowCap int // 小窗口请求容量
counters map[int64]int // 小窗口计数器
mu sync.Mutex // 锁
}
func NewSlidingWindow(smallWindowTime time.Duration) (*SlidingWindow, error) {
num := int64(WINDOWTIME / smallWindowTime) // 小窗口总数
return &SlidingWindow{
smallWindowTime: int64(smallWindowTime),
smallWindowNum: num,
smallWindowCap: MAXREQUEST / int(num),
counters: make(map[int64]int),
}, nil
}
func (sw *SlidingWindow) ReqLimit() bool {
sw.mu.Lock()
sw.mu.Unlock()
// 获取当前小格子窗口所在的时间值
curSmallWindowTime := time.Now().Unix()
// 计算当前小格子窗口起始时间
beginTime := curSmallWindowTime - sw.smallWindowTime*(sw.smallWindowNum-1)
// 计算当前小格子窗口请求总数
var count int
for sWindowTime, counter := range sw.counters { // 遍历计数器
if sWindowTime < beginTime { // 判断不是当前小格子
delete(sw.counters, sWindowTime)
} else {
count += counter // 当前小格子窗口计数器累加
}
}
// 当前小格子请求到达请求限制,请求失败,返回 false
if count >= sw.smallWindowCap {
return false
}
// 没有到达请求上限,当前小格子窗口计数器+1,请求成功
sw.counters[curSmallWindowTime]++
return true
}
func main() {
var wg sync.WaitGroup
sw, _ := NewSlidingWindow(1 * time.Second)
fmt.Println("num:", sw.smallWindowNum, "cap:", sw.smallWindowCap)
for i := 0; i < 15; i++ {
wg.Add(1)
fmt.Println("req start:", i, time.Now())
go func(i int) {
if sw.ReqLimit() {
fmt.Println("req counter: ", time.Now())
}
wg.Done()
}(i)
time.Sleep(200 * time.Millisecond)
}
wg.Wait()
}
滑动窗口算法是怎么解决“临界时间点”问题?
还是用上面计数器的例子:比如限制 1 分钟内最大请求为 30 个。
在 21:30:59 秒到达 30 个请求,它落在上图2中灰色小格子中,然后 21:31:01 秒又瞬间到达 30 个,它会落在图2橘色小格子中。而当时间到达 21:31:01 时,时间窗要向右移动一小格(如上图2箭头所示),此时大时间窗内的总请求数为 60,超过了规定的最大请求数 30 个,这时就能检测出超过了请求阙值从而触发限流。
为什么说是部分解决“临界时间点”,或者说它的缺点?
这个看划分小格子的时间大小了。比如说上面例子小格子时间是 10 秒,如果瞬间流量是微秒呢?可能又会超过限制。那划分更细时间单位。理论上流量到达时间也可以更细。
这个又咋办?
多层次限流,同一个接口设置多条限流规则。比如 1 分钟 30 个,100ms 2 个。
与计数器算法(固定时间窗口算法)区别:
漏桶算法计数器其实是一个固定时间窗口,它只有一格,比较大的一格时间,计数器算法是按照一大格时间窗向前移动。滑动窗口算法是按照一小格时间向前移动。固定窗口可以说是滑动窗口的一种特殊情况。
滑动时间窗口小格子划分的时间越细,向前移动就越平滑。
先看下面一张图:
这个图很形象的把漏桶算法表示出来了:
(a)图:有一个控制开关的水龙头,下面有一个桶用来装水,桶下面有一个放水的洞。把请求比作水,水来了就先放到桶里,然后按照一定
的速率放出水。水龙头放水过快,桶里的水满了就会溢出。表现为请求就是多出的请求丢掉。(b)图:把 (a) 图在进一步算法化,把漏桶算法形象表示出来。
流入的请求速率是不确定,请求可以是任意速率流入桶中,流出的请求则是按照固定速率流出。把流入桶中的请求计数(桶的当前水位),当请求超过桶的容量(最高水位)时,桶溢出丢弃这部分请求。
有的人形象把它叫作流量“整形”,因为不管你流入有多快,流出都是固定速率。
一个 Go demo:
package main
import (
"fmt"
"sync"
"time"
)
const (
MAXREQUEST = 5
)
type LeakyBucket struct {
capacity int // 桶的容量 - 最高水位
currentReqNum int // 桶中当前请求数量 - 当前水位
lastTime time.Time // 桶中上次请求时间 - 上次放水时间
rate int // 桶中流出请求的速率,每秒流出多少请求,水流速度/秒
mu sync.Mutex // 锁
}
func NewLeakyBucket(rate int) *LeakyBucket {
return &LeakyBucket{
capacity: MAXREQUEST, //容量
lastTime: time.Now(),
rate: rate,
}
}
func (lb *LeakyBucket) ReqLimit() bool {
lb.mu.Lock()
lb.mu.Unlock()
now := time.Now()
// 计算距离上次放水时间间隔
gap := now.Sub(lb.lastTime)
fmt.Println("gap:", gap)
if gap >= time.Second {
// gap 这段时间流出的请求数=gap时间 * 每秒流出速率
out := int(gap/time.Second) * lb.rate
// 计算当前桶中请求数
lb.currentReqNum = maxInt(0, lb.currentReqNum-out)
lb.lastTime = now
}
// 桶中的当前请求数大于桶容量,请求失败
if lb.currentReqNum >= lb.capacity {
return false
}
// 若没超过桶容量,桶中请求量+1,返回true
lb.currentReqNum++
fmt.Println("curReqNum:", lb.currentReqNum)
return true
}
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
// 测试
func main() {
var wg sync.WaitGroup
lb := NewLeakyBucket(1)
fmt.Println("cap:", lb.capacity)
for i := 0; i < 15; i++ {
wg.Add(1)
fmt.Println("req start:", i, time.Now())
go func(i int) {
if lb.ReqLimit() {
fmt.Println("req counter: ", time.Now())
}
wg.Done()
}(i)
time.Sleep(10 * time.Millisecond)
}
wg.Wait()
}
缺点:
令牌桶算法这个漏桶算法能保护系统,但是有大量请求时还是会丢弃很多请求,导致请求失败数高。
上面漏桶算法流入速度不稳定,流出速度是稳定的。
漏桶算法是直接把请求放入到桶里,令牌桶算法,一看名字,放入桶中的是令牌,然后请求获取令牌成功才能往下执行,否则丢弃请求。
令牌总数超过桶容量,就丢弃。令牌我们可以匀速生产,所以流入桶中令牌是稳定的。
因为令牌是自己生产的,所以生产令牌的快慢可以控制,那是不是接受对应的请求可以快也可以慢,这样就能够应对突发流量。流量大,生产令牌就快点。能不能应对无限大突发流量?当然不行,资源是有限,对桶的最大流量也要进行限制。
令牌桶算法如下图所示:
总结下令牌桶算法几个关键参数:
1.令牌桶的容量
2.令牌生产的速率,比如每秒生产多少个令牌
3.最大限流量,最大请求的容量,这个关系到令牌桶里的令牌总数
Go demo:
package main
import (
"fmt"
"sync"
"time"
)
const (
MAXREQUEST = 5
)
type TokenBucket struct {
capacity int // 桶的容量
currentTokenNum int // 当前桶中tokent总量
lastTime time.Time // 上次发放 tokent 时间
rate int // 流出(发放) tokent 速率, token/s - 每秒发放多少个token
mu sync.Mutex
}
func NewTokenBucket(rate int) *TokenBucket {
return &TokenBucket{
capacity: MAXREQUEST,
lastTime: time.Now(),
rate: rate,
}
}
func (tb *TokenBucket) ReqLimit() bool {
tb.mu.Lock()
tb.mu.Unlock()
now := time.Now()
// 距离上次发放令牌间隔时间
gap := now.Sub(tb.lastTime)
fmt.Println("gap: ", gap)
if gap >= time.Second {
// 这段时间发放的token数 = gap时间(秒为单位) * 每秒发放token速率
in := int(gap/time.Second) * tb.rate
// 这段时间桶中的token总量
// tb.currentTokenNum+in : 当前已有的token总量+发放的token树
// 当前桶中的token数量当然不能超过桶的最大容量
tb.currentTokenNum = minInt(tb.capacity, tb.currentTokenNum+in)
tb.lastTime = now
}
// 没有token,请求失败
if tb.currentTokenNum <= 0 {
return false
}
// 如果有token,当前 token 数量-1,也就是发放token,请求成功
tb.currentTokenNum--
fmt.Println("curTokenNum:", tb.currentTokenNum)
return true
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
// 测试
func main() {
var wg sync.WaitGroup
tb := NewTokenBucket(3)
fmt.Println("cap:", tb.capacity)
for i := 0; i < 15; i++ {
wg.Add(1)
fmt.Println("req start:", i, time.Now())
go func(i int) {
if tb.ReqLimit() {
fmt.Println("req counter: ", time.Now())
}
wg.Done()
}(i)
time.Sleep(1 * time.Second)
}
wg.Wait()
}
桶中是可以存储令牌的,这就使得令牌桶算法支持一定程度突发大流量。这一点来说,对用户访问比较友好。
四、参考- https://www.cnblogs.com/liqiangchn/p/14253924.html