当前位置 : 主页 > 编程语言 > java >

Spring Cloud Alibaba——Sentinel 滑动窗口流量统计

来源:互联网 收集:自由互联 发布时间:2023-02-04
前言 Sentinel的核心功能之一是流量统计,例如我们常用的指标QPS,当前线程数等。之前已经大致提到了提供数据统计功能的Slot(StatisticSlot),StatisticSlot在Sentinel的整个体系中扮演了一

前言

Sentinel的核心功能之一是流量统计,例如我们常用的指标QPS,当前线程数等。之前已经大致提到了提供数据统计功能的Slot(StatisticSlot),StatisticSlot在Sentinel的整个体系中扮演了一个非常重要的角色,后续的一系列操作(限流,熔断)等都依赖于StatisticSlot所统计出的数据。

 

本文所要讨论的重点就是StatisticSlot是如何做的流量统计?

 

其实在常用限流算法:https://www.cnblogs.com/liran123/p/13467089.html中,有一个算法滑动窗口限流,该算法的滑动窗口原理其实跟Sentinel所提供的流量统计原理是一样的,都是基于时间窗口的滑动统计。

问题思考

  • 1、StatisticSlot主要职责是流量统计(为后面插槽链的流控和降级做准备),这些流量是如何统计出来的?

  • 2、统计出来的流量在流控中是如何使用的?

StatisticSlot请求流量统计

以StatisticSlot为切入点,追踪请求流量统计调用链。

1、入口

@Spi(order = Constants.ORDER_STATISTIC_SLOT) public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. //next(下一个)节点调用Entry方法 //先进行后续的check,包括规则的check,黑白名单check fireEntry(context, resourceWrapper, node, count, prioritized, args); // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级 // 统计默认qps 线程数, 当前线程数加1 node.increaseThreadNum(); //通过的请求加上count node.addPassRequest(count); // 元节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1 if (context.getCurEntry().getOriginNode() != null) { // 根据来源统计qps 线程数 context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } // 入口节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1 if (resourceWrapper.getEntryType() == EntryType.IN) { // 统计入口 qps 线程数 Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers. // 注册的扩展点的数据统计 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count. node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { // Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } } }

两个维度进行统计:

  • 1、一个是统计线程数通过StatisticNode#curThreadNum递增来完成,curThreadNum为AtomicInteger类型。
  • 2、另外一个是递增请求数量。

可以看到StatisticSlot主要统计了两个维度的数据

  • 1、线程数
  • 2、请求数(QPS)

对于线程数的统计比较简单,通过内部维护一个LongAdder来进行当前线程数的统计,每进入一个请求加1,每释放一个请求减1,从而得到当前的线程数

 

对于请求数QPS的统计则相对比较复杂,其中有用到滑动窗口原理(也是本文的重点),下面根据源码来深入的分析

DefaultNode和StatisticNode

public class DefaultNode extends StatisticNode { @Override public void addPassRequest(int count) { // 调用父类(StatisticNode)来进行统计 super.addPassRequest(count); // 根据clusterNode 汇总统计(背后也是调用父类StatisticNode) this.clusterNode.addPassRequest(count); } }

最终都是调用了父类StatisticNode的addPassRequest方法

public class StatisticNode implements Node { //按秒统计,分成两个窗口,每个窗口500ms,用来统计QPS private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); //按分钟统计,分成60个窗口,每个窗口 1000ms private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); @Override public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); } }

代码比较简单,可以知道内部是调用了ArrayMetric的addPass方法来统计的,并且统计了两种不同时间维度的数据(秒级和分钟级)

ArrayMetric

public class ArrayMetric implements Metric { private final LeapArray<MetricBucket> data; public ArrayMetric(int sampleCount, int intervalInMs) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) { if (enableOccupy) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } else { this.data = new BucketLeapArray(sampleCount, intervalInMs); } } @Override public void addPass(int count) { // 1、获取当前窗口 WindowWrap<MetricBucket> wrap = data.currentWindow(); // 2、当前窗口加1 wrap.value().addPass(count); } }

ArrayMetric其实也是一个包装类,内部通过实例化LeapArray的对应实现类,来实现具体的统计逻辑,LeapArray是一个抽象类,OccupiableBucketLeapArray和BucketLeapArray都是其具体的实现类

 

OccupiableBucketLeapArray在1.5版本之后才被引入,主要是为了解决一些高优先级的请求在限流触发的时候也能通过(通过占用未来时间窗口的名额来实现) 也是默认使用的LeapArray实现类

 

而统计的逻辑也比较清楚,分成了两步:

  • 1、定位到当前窗口
  • 2、获取到当前窗口WindowWrap的MetricBucket并执行addPass逻辑

这里我们先看下第二步中的MetricBucket类,看看它做了哪些事情

MetricBucket

public class MetricBucket { //存放当前窗口各种类型的统计值(类型包括 PASS BLOCK EXCEPTION 等) private final LongAdder[] counters; public MetricBucket() { MetricEvent[] events = MetricEvent.values(); this.counters = new LongAdder[events.length]; for (MetricEvent event : events) { counters[event.ordinal()] = new LongAdder(); } initMinRt(); } // 统计pass数 public void addPass(int n) { add(MetricEvent.PASS, n); } // 统计可占用的pass数 public void addOccupiedPass(int n) { add(MetricEvent.OCCUPIED_PASS, n); } // 统计异常数 public void addException(int n) { add(MetricEvent.EXCEPTION, n); } // 统计block数 public void addBlock(int n) { add(MetricEvent.BLOCK, n); } }

MetricBucket通过定义了一个LongAdder数组来存储不同类型的流量统计值,具体的类型则都定义在MetricEvent枚举中。

 

执行addPass方法对应LongAdder数组索引下表为0的值递增

 

下面再来看下data.currentWindow()的内部逻辑

OccupiableBucketLeapArray

OccupiableBucketLeapArray继承了抽象类LeapArray,核心逻辑也是在LeapArray中

public abstract class LeapArray<T> { //时间窗口大小 单位ms protected int windowLengthInMs; //切分的窗口数 protected int sampleCount; //统计的时间间隔 intervalInMs = windowLengthInMs * sampleCount protected int intervalInMs; private double intervalInSecond; //窗口数组 数组大小 = sampleCount protected final AtomicReferenceArray<WindowWrap<T>> array; //update lock 更新窗口时需要上锁 private final ReentrantLock updateLock = new ReentrantLock(); /** * @param sampleCount 需要划分的窗口数 * * @param intervalInMs 间隔的统计时间 */ public LeapArray(int sampleCount, int intervalInMs) { AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount); AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive"); AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.intervalInSecond = intervalInMs / 1000.0; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); } /** * 获取当前窗口 */ public WindowWrap<T> currentWindow() { return currentWindow(TimeUtil.currentTimeMillis()); } }

以上需要着重理解的是几个参数的含义:

  • sampleCount:定义的窗口的数。
  • intervalInMs:统计的时间间隔。
  • windowLengthInMs:每个窗口的时间大小 = intervalInMs / sampleCount。

sampleCount 比较好理解,就是需要定义几个窗口(默认秒级统计维度的话是两个窗口),intervalInMs 指的就是我们需要统计的时间间隔,例如我们统计QPS的话那就是1000ms,windowLengthInMs 指的每个窗口的大小,是由intervalInMs除以sampleCount得来。

时间窗口大小和统计区间可以自定义,以默认进行分析。:

  • 统计区间:intervalInMs为1秒
  • 滑动时间窗口大小:windowLengthInMs为500毫秒
  • 采样窗口数量:默认2个 sampleCount=intervalInMs/windowLengthInMs=2
  • 采样数据数组:数据大小默认2 使用数组来封装(array)数组大小与采样窗口数量相同 采样数据数组下标:
long timeId = timeMillis / windowLengthInMs; idx=(int)(timeId % array.length()) long timeId = timeMillis / windowLengthInMs

小结:随着时间(time)的向前推进,采样数据下标idx也在不断切换(由于2个窗口在0和1之间切换);根据下标进而获取采样数据,通过比较当前时间与采样数据中的窗口开始时间,确定当前时间是否属于该滑动窗口以及该采样数据的窗口是否过期;通过不断重置与更新采样数据的值实现统计数据的动态变化。

 

理解了上诉几个参数的含义后,我们直接进入到LeapArray的currentWindow(long time)方法中去看看具体的实现:

public abstract class LeapArray<T> { //时间窗口大小 单位ms protected int windowLengthInMs; //切分的窗口数 protected int sampleCount; //统计的时间间隔 intervalInMs = windowLengthInMs * sampleCount protected int intervalInMs; private double intervalInSecond; //窗口数组 数组大小 = sampleCount protected final AtomicReferenceArray<WindowWrap<T>> array; //update lock 更新窗口时需要上锁 private final ReentrantLock updateLock = new ReentrantLock(); public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 计算数组array对应的下标 int idx = calculateTimeIdx(timeMillis); // 计算窗口开始时间(剔除余数) long windowStart = calculateWindowStart(timeMillis); /* * 根据下脚标在环形数组中获取滑动窗口(桶) * * (1) 如果桶不存在则创建新的桶,并通过CAS将新桶赋值到数组下标位。 * (2) 如果获取到的桶不为空,并且桶的开始时间等于刚刚算出来的时间,那么返回当前获取到的桶。 * (3) 如果获取到的桶不为空,并且桶的开始时间小于刚刚算出来的开始时间,那么说明这个桶是上一圈用过的桶,重置当前桶 * (4) 如果获取到的桶不为空,并且桶的开始时间大于刚刚算出来的开始时间,理论上不应该出现这种情况。 */ // 一直运行直到返回窗口 while (true) { // 获取数据对应的采样数据 WindowWrap<T> old = array.get(idx); // 没有窗口统计数据 if (old == null) { // 构造窗口 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); // cas 操作,确保只初始化一次 if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 取出的窗口的开始时间和本次请求计算出的窗口开始时间一致,命中 } else if (windowStart == old.windowStart()) { // 处于同一个时间窗口 return old; // 本次请求计算出的窗口开始时间大于取出的窗口,说明取出的窗口过期了 } else if (windowStart > old.windowStart()) { // 时间已经推进到下一个窗口原来窗口过期 // 尝试获取更新锁 if (updateLock.tryLock()) { try { // 重置时间窗口 // 成功则更新,重置窗口开始时间为本次计算出的窗口开始时间,计数器重置为 0 return resetWindowTo(old, windowStart); } finally { // 解锁 updateLock.unlock(); } } else { // 获取锁失败,让其他线程取更新 Thread.yield(); } } else if (windowStart < old.windowStart()) { // 正常情况不会进入该分支(机器时钟回拨等异常情况) // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } /** * 根据当前时间戳计算当前所属的窗口数组索引下标 */ private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); } /** * 计算当前窗口的开始时间戳 */ protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; } }

上面的方法就是整个滑动窗口逻辑的核心代码,注释其实也写的比较清晰了,简单概括下可以分为以下几步:

  • 1、根据当前时间戳 和 窗口数组大小 获取到当前的窗口数组索引下标idx,如果窗口数是2,那其实idx只有两种值(0 或 1)

  • 2、根据当前时间戳(windowStart) 计算得到当前窗口的开始时间戳值。通过calculateWindowStart计算来得到,这个方法还蛮有意思的,通过当前时间戳和窗口时间大小取余来得到 与当前窗口开始时间的 偏移量。

  • 3、然后就是根据上面得到的两个值 来获取当前时间窗口,这里其实又分为三种情况:

    • A、当前窗口为空还未创建,则初始化一个。
    • B、当前窗口的开始时间和上面计算出的窗口开始时间(windowStart)一致,表明当前窗口还未过期,直接返回当前窗口。
    • C、当前窗口的开始时间 小于 上面计算出的窗口(windowStart)开始时间,表明当前窗口已过期,需要替换当前窗口。

参考: https://www.cnblogs.com/taromilk/p/11751009.html

https://blog.csdn.net/gaoliang1719/article/details/106566923

https://www.cnblogs.com/magexi/p/13124870.html

网友评论