当前位置 : 主页 > 编程语言 > java >

Spring Cloud Alibaba——Sentinel Slot

来源:互联网 收集:自由互联 发布时间:2023-02-04
slot概述 在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用

slot概述

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:

  • NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。

  • ClusterBuilderSlot:则用于存储资源的统计信息以及调用者信息,例如该资源的 RT、QPS、thread count 等等,这些信息将用作为多维度限流,降级的依据。

  • LogSlot:则用于记录用于记录块异常,为故障排除提供具体的日志。

  • StatisticSlot:则用于记录、统计不同纬度的 runtime 指标监控信息。

  • AuthoritySlot:则根据配置的黑白名单和调用来源信息,来做黑白名单控制。

  • SystemSlot:则通过系统的状态,例如 load1 等,来控制总的入口流量。

  • FlowSlot:则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。

  • DegradeSlot:则通过统计信息以及预设的规则,来做熔断降级。

下面是关系结构图:

solt的基本逻辑及代码演示

每个Slot执行完业务逻辑处理后,会调用fireEntry()方法,该方法将会触发下一个节点的entry方法,下一个节点又会调用他的fireEntry,以此类推直到最后一个Slot,由此就形成了sentinel的责任链。

 

工作流概述:

根据slot 的基本实现processorSlot的实现类讲一下slot 的基本结构

先看看顶层接口ProcessorSlot

public interface ProcessorSlot<T> { //开始入口 void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, Object... args) throws Throwable; //finish意味着结束 void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable; //退出插槽 void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args); //退出插槽结束 void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args); }

这个接口有4个方法,entry,fireEntry,exit,fireExit

ProcessorSlot 的抽象实现 AbstractLinkedProcessorSlot

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { private AbstractLinkedProcessorSlot<?> next = null; @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { //当业务执行完毕后,如果还有下一个slot if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } } @SuppressWarnings("unchecked") //指向下一个slot的entry,每一个slot根据自己的职责不同,有自己的实现 void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } @Override public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { //当一个slot的exit执行完毕后,如果还有下一个未关闭slot if (next != null) { //指向下一个slot的exit next.exit(context, resourceWrapper, count, args); } } public AbstractLinkedProcessorSlot<?> getNext() { return next; } public void setNext(AbstractLinkedProcessorSlot<?> next) { this.next = next; } }

DefaultProcessorSlotChain实现了上述的chain(setNext和getNext)

public class DefaultProcessorSlotChain extends ProcessorSlotChain { //直接实现了AbstractLinkedProcessorSlot的实例并作为first,可以理解为当前slot AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; //默认的end(可以理解为当前的后一个slot) AbstractLinkedProcessorSlot<?> end; public DefaultProcessorSlotChain() { this.end = this.first; } public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(this.first.getNext()); this.first.setNext(protocolProcessor); //如果当前为最后一个 if (this.end == this.first) { this.end = protocolProcessor; } } public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { //将后一个slot放进当前slot的next this.end.setNext(protocolProcessor); //将end指向后一个slot this.end = protocolProcessor; } public void setNext(AbstractLinkedProcessorSlot<?> next) { this.addLast(next); } public AbstractLinkedProcessorSlot<?> getNext() { return this.first.getNext(); } public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { this.first.transformEntry(context, resourceWrapper, t, count, prioritized, args); } public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { this.first.exit(context, resourceWrapper, count, args); } }

NodeSelectorSlot

@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT) public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { /** * 相同的资源但是Context不同,分别新建 DefaultNode,并以ContextName为key */ private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { // 根据ContextName尝试获取DefaultNode DefaultNode node = map.get(context.getName()); if (node == null) { synchronized (this) { node = map.get(context.getName()); if (node == null) { // 初始化resource对应的Node 类型为DefaultNode,每个Context对应一个 node = new DefaultNode(resourceWrapper, null); //保存Node 一个resource对应一个Node HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; // 构建 Node tree // 添加到Context的Node节点,这里构造了一棵节点树 ((DefaultNode) context.getLastNode()).addChild(node); } } } //设置当前Context的当前节点为node context.setCurNode(node); // 唤醒执行下一个插槽,调用下一个Slot fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } }

NodeSelectorSlot顾名思义是用来构建Node的。

 

我们可以看到NodeSelectorSlot对于不同的上下文都会生成一个DefaultNode。这里还有一个要注意的点:相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中,因此不同的上下文可以进入到同一个对象的NodeSelectorSlot.entry方法中,那么这里要怎么区分不同的上下文所创建的资源Node呢?显然可以使用上下文名称作为映射键以区分相同的资源Node。

 

然后这里要考虑另一个问题。一个资源有可能创建多个DefaultNode(有多个上下文时),那么我们应该如何快速的获取总的统计数据呢?

 

答案就在下一个Slot(ClusterBuilderSlot)中被解决了。

ClusterBuilderSlot

上面有提到一个问题,我们要如何统计不同上下文相同资源的总量数据。ClusterBuilderSlot给了很好的解决方案:具有相同资源名称的共享一个ClusterNode。

@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT) public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> { // 相同的资源共享一个 ClusterNode private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>(); private static final Object lock = new Object(); private volatile ClusterNode clusterNode = null; @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 判断本资源是否已经初始化过clusterNode if (clusterNode == null) { synchronized (lock) { if (clusterNode == null) { // 没有初始化则初始化clusterNode clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType()); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16)); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; } } } // 给相同资源的DefaultNode设置一样的ClusterNode node.setClusterNode(clusterNode); /* * 如果有来源则新建一个来源Node */ if (!"".equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } fireEntry(context, resourceWrapper, node, count, prioritized, args); } }

上面的代码其实就是做了一件事情,为资源创建CluserNode,相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中。也就是说,能进入到同一个ClusterBuilderSlot对象的entry方法的请求都是来自同一个资源的,所以这些相同资源需要初始化一个统一的CluserNode用来做流量的汇总统计。

LogSlot

代码比较简单,逻辑就是打印异常日志,就不分析了。

StatisticSlot

StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。

@Spi(order = Constants.ORDER_STATISTIC_SLOT) public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. //next(下一个)节点调用Entry方法 //先进行后续的check,包括规则的check,黑白名单check fireEntry(context, resourceWrapper, node, count, prioritized, args); // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级 // 统计默认qps 线程数, 当前线程数加1 node.increaseThreadNum(); //通过的请求加上count node.addPassRequest(count); // 元节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1 if (context.getCurEntry().getOriginNode() != null) { // 根据来源统计qps 线程数 context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } // 入口节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1 if (resourceWrapper.getEntryType() == EntryType.IN) { // 统计入口 qps 线程数 Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers. // 注册的扩展点的数据统计 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count. node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { // Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } } }

StatisticSlot主要做了4种不同维度的流量统计

  • 1、资源在上下文维度(DefaultNode)的统计。
  • 2、clusterNode 维度的统计。
  • 3、Origin 来源维度的统计。
  • 4、入口全局流量的统计。

SystemSlot

SystemSlot比较简单,其实就是根据StatisticSlot所统计的全局入口流量进行限流。

AuthoritySlot

@Spi(order = Constants.ORDER_AUTHORITY_SLOT) public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkBlackWhiteAuthority(resourceWrapper, context); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException { Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules(); if (authorityRules == null) { return; } // 根据资源获取黑白名单规则 Set<AuthorityRule> rules = authorityRules.get(resource.getName()); if (rules == null) { return; } // 对规则进行校验,只要有一条不通过 就抛异常 for (AuthorityRule rule : rules) { if (!AuthorityRuleChecker.passCheck(rule, context)) { throw new AuthorityException(context.getOrigin(), rule); } } } }

AuthoritySlot会对资源的黑白名单做检查,并且只要有一条不通过就抛异常。

FlowSlot

@Spi(order = Constants.ORDER_FLOW_SLOT) public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { private final FlowRuleChecker checker; @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { // 检查限流规则 checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); } }

这个slot主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止,并且同样也会根据三种不同的维度来进行限流:

  • 1、资源在上下文维度(DefaultNode)的统计。
  • 2、clusterNode 维度的统计。
  • 3、Origin 来源维度的统计。

DegradeSlot

这个slot主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。

总结

  • 1、相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中。

  • 2、流控有多个维度,分别包括:

    • 1、不同上下文中的资源。
    • 2、相同资源。
    • 3、入口流量。
    • 4、相同的来源。

Sentinel中预设的SlotChain执行的完整流程:

参考: https://www.cnblogs.com/taromilk/p/11751000.html

https://juejin.cn/post/6870671845582962702

https://blog.csdn.net/wk52525/article/details/104439404

网友评论