slot概述
在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
-
NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。
-
ClusterBuilderSlot:则用于存储资源的统计信息以及调用者信息,例如该资源的 RT、QPS、thread count 等等,这些信息将用作为多维度限流,降级的依据。
-
LogSlot:则用于记录用于记录块异常,为故障排除提供具体的日志。
-
StatisticSlot:则用于记录、统计不同纬度的 runtime 指标监控信息。
-
AuthoritySlot:则根据配置的黑白名单和调用来源信息,来做黑白名单控制。
-
SystemSlot:则通过系统的状态,例如 load1 等,来控制总的入口流量。
-
FlowSlot:则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。
-
DegradeSlot:则通过统计信息以及预设的规则,来做熔断降级。
下面是关系结构图:
solt的基本逻辑及代码演示
每个Slot执行完业务逻辑处理后,会调用fireEntry()方法,该方法将会触发下一个节点的entry方法,下一个节点又会调用他的fireEntry,以此类推直到最后一个Slot,由此就形成了sentinel的责任链。
工作流概述:
根据slot 的基本实现processorSlot的实现类讲一下slot 的基本结构
先看看顶层接口ProcessorSlot
public interface ProcessorSlot<T> { //开始入口 void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, Object... args) throws Throwable; //finish意味着结束 void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable; //退出插槽 void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args); //退出插槽结束 void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args); }这个接口有4个方法,entry,fireEntry,exit,fireExit
ProcessorSlot 的抽象实现 AbstractLinkedProcessorSlot
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { private AbstractLinkedProcessorSlot<?> next = null; @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { //当业务执行完毕后,如果还有下一个slot if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } } @SuppressWarnings("unchecked") //指向下一个slot的entry,每一个slot根据自己的职责不同,有自己的实现 void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } @Override public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { //当一个slot的exit执行完毕后,如果还有下一个未关闭slot if (next != null) { //指向下一个slot的exit next.exit(context, resourceWrapper, count, args); } } public AbstractLinkedProcessorSlot<?> getNext() { return next; } public void setNext(AbstractLinkedProcessorSlot<?> next) { this.next = next; } }DefaultProcessorSlotChain实现了上述的chain(setNext和getNext)
public class DefaultProcessorSlotChain extends ProcessorSlotChain { //直接实现了AbstractLinkedProcessorSlot的实例并作为first,可以理解为当前slot AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; //默认的end(可以理解为当前的后一个slot) AbstractLinkedProcessorSlot<?> end; public DefaultProcessorSlotChain() { this.end = this.first; } public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(this.first.getNext()); this.first.setNext(protocolProcessor); //如果当前为最后一个 if (this.end == this.first) { this.end = protocolProcessor; } } public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { //将后一个slot放进当前slot的next this.end.setNext(protocolProcessor); //将end指向后一个slot this.end = protocolProcessor; } public void setNext(AbstractLinkedProcessorSlot<?> next) { this.addLast(next); } public AbstractLinkedProcessorSlot<?> getNext() { return this.first.getNext(); } public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { this.first.transformEntry(context, resourceWrapper, t, count, prioritized, args); } public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { this.first.exit(context, resourceWrapper, count, args); } }NodeSelectorSlot
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT) public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { /** * 相同的资源但是Context不同,分别新建 DefaultNode,并以ContextName为key */ private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { // 根据ContextName尝试获取DefaultNode DefaultNode node = map.get(context.getName()); if (node == null) { synchronized (this) { node = map.get(context.getName()); if (node == null) { // 初始化resource对应的Node 类型为DefaultNode,每个Context对应一个 node = new DefaultNode(resourceWrapper, null); //保存Node 一个resource对应一个Node HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; // 构建 Node tree // 添加到Context的Node节点,这里构造了一棵节点树 ((DefaultNode) context.getLastNode()).addChild(node); } } } //设置当前Context的当前节点为node context.setCurNode(node); // 唤醒执行下一个插槽,调用下一个Slot fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } }NodeSelectorSlot顾名思义是用来构建Node的。
我们可以看到NodeSelectorSlot对于不同的上下文都会生成一个DefaultNode。这里还有一个要注意的点:相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中,因此不同的上下文可以进入到同一个对象的NodeSelectorSlot.entry方法中,那么这里要怎么区分不同的上下文所创建的资源Node呢?显然可以使用上下文名称作为映射键以区分相同的资源Node。
然后这里要考虑另一个问题。一个资源有可能创建多个DefaultNode(有多个上下文时),那么我们应该如何快速的获取总的统计数据呢?
答案就在下一个Slot(ClusterBuilderSlot)中被解决了。
ClusterBuilderSlot
上面有提到一个问题,我们要如何统计不同上下文相同资源的总量数据。ClusterBuilderSlot给了很好的解决方案:具有相同资源名称的共享一个ClusterNode。
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT) public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> { // 相同的资源共享一个 ClusterNode private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>(); private static final Object lock = new Object(); private volatile ClusterNode clusterNode = null; @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 判断本资源是否已经初始化过clusterNode if (clusterNode == null) { synchronized (lock) { if (clusterNode == null) { // 没有初始化则初始化clusterNode clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType()); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16)); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; } } } // 给相同资源的DefaultNode设置一样的ClusterNode node.setClusterNode(clusterNode); /* * 如果有来源则新建一个来源Node */ if (!"".equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } fireEntry(context, resourceWrapper, node, count, prioritized, args); } }上面的代码其实就是做了一件事情,为资源创建CluserNode,相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中。也就是说,能进入到同一个ClusterBuilderSlot对象的entry方法的请求都是来自同一个资源的,所以这些相同资源需要初始化一个统一的CluserNode用来做流量的汇总统计。
LogSlot
代码比较简单,逻辑就是打印异常日志,就不分析了。
StatisticSlot
StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。
@Spi(order = Constants.ORDER_STATISTIC_SLOT) public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. //next(下一个)节点调用Entry方法 //先进行后续的check,包括规则的check,黑白名单check fireEntry(context, resourceWrapper, node, count, prioritized, args); // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级 // 统计默认qps 线程数, 当前线程数加1 node.increaseThreadNum(); //通过的请求加上count node.addPassRequest(count); // 元节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1 if (context.getCurEntry().getOriginNode() != null) { // 根据来源统计qps 线程数 context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } // 入口节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1 if (resourceWrapper.getEntryType() == EntryType.IN) { // 统计入口 qps 线程数 Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers. // 注册的扩展点的数据统计 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count. node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { // Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } } }StatisticSlot主要做了4种不同维度的流量统计
- 1、资源在上下文维度(DefaultNode)的统计。
- 2、clusterNode 维度的统计。
- 3、Origin 来源维度的统计。
- 4、入口全局流量的统计。
SystemSlot
SystemSlot比较简单,其实就是根据StatisticSlot所统计的全局入口流量进行限流。
AuthoritySlot
@Spi(order = Constants.ORDER_AUTHORITY_SLOT) public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkBlackWhiteAuthority(resourceWrapper, context); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException { Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules(); if (authorityRules == null) { return; } // 根据资源获取黑白名单规则 Set<AuthorityRule> rules = authorityRules.get(resource.getName()); if (rules == null) { return; } // 对规则进行校验,只要有一条不通过 就抛异常 for (AuthorityRule rule : rules) { if (!AuthorityRuleChecker.passCheck(rule, context)) { throw new AuthorityException(context.getOrigin(), rule); } } } }AuthoritySlot会对资源的黑白名单做检查,并且只要有一条不通过就抛异常。
FlowSlot
@Spi(order = Constants.ORDER_FLOW_SLOT) public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { private final FlowRuleChecker checker; @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { // 检查限流规则 checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); } }这个slot主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止,并且同样也会根据三种不同的维度来进行限流:
- 1、资源在上下文维度(DefaultNode)的统计。
- 2、clusterNode 维度的统计。
- 3、Origin 来源维度的统计。
DegradeSlot
这个slot主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。
总结
-
1、相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中。
-
2、流控有多个维度,分别包括:
- 1、不同上下文中的资源。
- 2、相同资源。
- 3、入口流量。
- 4、相同的来源。
Sentinel中预设的SlotChain执行的完整流程:
参考: https://www.cnblogs.com/taromilk/p/11751000.html
https://juejin.cn/post/6870671845582962702
https://blog.csdn.net/wk52525/article/details/104439404