一、workQueue分类
在Informer最后将资源对象已经写入到事件回调函数中,此后我们直接处理这些数据即可,但是我们使用golang中的chanel来处理会存在处理效率低,存在数据大并发量,数据积压等其他异常情况,为此client-go单独将workqueue提出来,作为公共组件,不仅可以在Kubernetes内部使用,还可以供Client-go使用,用户侧可以通过Workqueue相关方法进行更加灵活的队列处理,如失败重试,延迟入队,限速控制等,实现非阻塞异步逻辑处理。
WorkQueue 支持 3 种队列,并提供了 3 种接口,不同队列实现可应对不同的使用场景,分别介绍如下。
- Interface:通用队FIFO 队列接口,先进先出队列,并支持去重机制。
- DelayingInterface:延迟队列接口,基于 Interface 接口封装,延迟一段时间后再将元素存入队列。
- RateLimitingInterface:限速队列接口,基于 DelayingInterface 接口封装,支持元素存入队列时进行速率限制。
从图中可以看到,workqueue.RateLimitingInterface 集成了 DelayingInterface,DelayingInterface 集成了 Interface,最终由 rateLimitingType 进行实现,提供了 rateLimit 限速、delay 延时入队(由优先级队列通过小顶堆实现)、queue 队列处理 三大核心能力,另外,在代码中可看到 K8s 实现了三种 RateLimiter:BucketRateLimiter、ItemExponentialFailureRateLimiter、ItemFastSlowRateLimiter。
二、workQueue特点
WorkQueue 称为工作队列,Kubernetes 的 WorkQueue 队列与普通 FIFO(先进先出,First-In, First-Out)队列相比,实现略显复杂,它的主要功能在于标记和去重,并支持如下特性。
- 有序:按照添加顺序处理元素(item)。
- 去重:相同元素在同一时间不会被重复处理,例如一个元素在处理之前被添加了多次,它只会被处理一次。
- 并发性:多生产者和多消费者。
- 标记机制:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队。
- 通知机制:ShutDown 方法通过信号量通知队列不再接收新的元素,并通知 metric goroutine 退出。
- 延迟:支持延迟队列,延迟一段时间后再将元素存入队列。
- 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。
- Metric:支持 metric 监控指标,可用于 Prometheus 监控。
三、普通FIFO队列
数据结构及方法
普通FIFO队列,先进先出,支持去重。
queue实际存储元素的地方,是 slice 结构,用于保证元素有序;
dirty用于去重机制,是map结构,保证一个元素被处理之前哪怕其被添加了多次(并发情况下),但也只会被处理一次;
processing用于标记机制,是map结构,标记一个元素是否正在被处理;
Add方法,添加一个元素到queue中,首先会判断队列是否关闭,其次会判断元素是否在dirty中,如果在则直接返回实现去重,不在则添加,然后判断元素是否在processing中,如果存在则直接返回,如果不存在则将其添加进queue中;
Get方法,从queue队列头部弹出一个元素放到processing中,并从dirty中移除;
Done方法,标记元素已处理完成,从processing中移除,并判断元素是否还在dirty中,如果在则将其重新添加至queue队尾;
FIFO队列存储过程
通过 Add 方法往 FIFO 队列中分别插入 1、2、3 三个元素,此时队列中的 queue 和 dirty 分别存有 1、2、3 元素,processing为空;然后通过 Get 方法获取最先进入的1元素,此时1 元素被放入 processing中,queue 和 dirty 剩余有 2、3 元素,表示1元素正在被处理;最后,当我们处理完 1 元素时通过 Done 方法标记该元素已经被处理完成,此时将1元素从 processing中移除。
高并发下如何保证一个元素哪怕其被添加了多次,但也只会被处理一次?
元素在processing中正被处理:
在并发场景下,假设 goroutine A 通过 Get 方法获取 1 元素,1 元素被添加到 processing 中并从queue和dirty中移除;同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在 processing 中已经存在相同的元素,所以后面的 1 元素并不会被直接添加到 queue 字段中,而是仅添加到dirty中;在 goroutine A 通过 Done 方法标记1袁术被处理完成并从processing删除后,检测到dirty 字段中存有 1 元素,则将 1 元素追加到 queue 字段中的尾部。
元素在queue和dirty中还未放入processing:
在并发场景下,假设 goroutine A 通过Add方法出入1袁术到queue和dirty中;同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在dirty中已经存在相同的元素,会直接返回。
四、延迟队列
数据结构及方法
延迟队列,其原理是延迟一段时间后再将元素插入 FIFO 队列,防止hot-loop。继承了普通FIFO队列的通用接口,在其基础上主要新增了waitingForAddCh字段和AddAfter、waitingLoop方法。
waitingForAddCh:其默认初始大小为 1000,通过 AddAfter 方法插入元素时,是非阻塞状态的,只有当插入的元素大于或等于 1000 时,延迟队列才会处于阻塞状态。waitingForAddCh 字段中的数据通过新启 goroutine 运行的 waitingLoop 函数持久运行。
延迟队列存储过程
将元素 1 放入 waitingForAddCh 字段中,通过 waitingLoop 函数消费元素1数据。当元素1的延迟时间还没到则添加到优先级队列,如果延迟时间到了则添加到 FIFO 队列中。同时会不断遍历优先队列中的元素,判断延迟时间是否到达,到达则从优先队列删除并添加到普通FIFO队列中进行正常处理。其中优先级队列实现利用了golang底层库heap实现,堆顶元素为延迟时间最近的元素。
五、限速队列
数据结构及方法
限速队列,继承了延迟队列的通用接口,并结合限速器利用延迟队列的特性延迟某个元素的插入时间,从而实现对元素入队有一定速率限制,新增有AddRateLimited(获取到限速器延迟时间,然后加入到延迟队列)、Forget、NumRequeues 3个方法。
限速器主要包括三个函数When获取某个元素应该等待的时间,Forget释放某个元素不再监测,NumRequeues返回元素入队列的次数。
限速队列存储过程
将元素 1 通过限速器计算出延迟时间,然后放入延迟队列中
限速器的实现
RateLimiter主要有四种类型,主要行为表现在当某一事件元素失败后,等待时间的计算规则不一致:
- - BucketRateLimiter令牌桶算法,可以处理尖峰流量,实现平滑限流,可以控制在一定频率来执行;
- - ItemExponentialFailureRateLimiter将失败次数作为指数,失败次数增大,速率限制呈指数级增长,但其最大值不会超过 maxDelay;
- - ItemFastSlowRateLimiter根据失败次数限速时间间隔先小后大;
- - MaxOfRateLimiter包含多个限流器,并返回所有RateLimiter的最坏值。
默认的DefaultControllerRateLimiter是MaxOfRateLimiter取最坏值,其中并包括两个限流器,
BucketRateLimiter限流器是一个令牌桶算法处理尖峰流量,实现平滑限流,令牌桶大小是100,生成令牌速度是10QPS,拿令牌是没有速度限制的;
ItemExponentialFailureRateLimiter:等待时间=min(1000s,5ms*2^n),与失败次数n呈指数关系;
备注:k8s controller的重试等待时间是5ms*2的n次方,n是重试次数,但是不超过1000s,并且还有一个令牌桶算法处理尖峰流量实现平滑限流,所以流量暴增时等待时间可能比1000s更长。