当前位置 : 主页 > 编程语言 > java >

Spring Cloud Alibaba——Sentinel SlotChain链流程总览

来源:互联网 收集:自由互联 发布时间:2023-02-04
前言 Sentinel作为ali开源的一款轻量级流控框架,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。相比于Hystrix,Sentinel的设计更加

前言

Sentinel作为ali开源的一款轻量级流控框架,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。相比于Hystrix,Sentinel的设计更加简单,在 Sentinel中资源定义和规则配置是分离的,也就是说用户可以先通过Sentinel API给对应的业务逻辑定义资源(埋点),然后在需要的时候再配置规则,通过这种组合方式,极大的增加了Sentinel流控的灵活性。

 

引入Sentinel带来的性能损耗非常小。只有在业务单机量级超过25W QPS的时候才会有一些显著的影响(5% - 10% 左右),单机QPS不太大的时候损耗几乎可以忽略不计。

 

Sentinel提供两种埋点方式:

  • try-catch 方式(通过 SphU.entry(...)),用户在 catch 块中执行异常处理 / fallback

  • if-else 方式(通过 SphO.entry(...)),当返回 false 时执行异常处理 / fallback

写在前面

在此之前,需要先了解一下Sentinel的工作流程。

 

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如默认情况下会创建以下7个插槽:

  • NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。

  • ClusterBuilderSlot:则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据。

  • StatisticSlot:则用于记录、统计不同纬度的 runtime 指标监控信息。

  • SystemSlot:则通过系统的状态,例如 load1 等,来控制总的入口流量。

  • AuthoritySlot:则根据配置的黑白名单和调用来源信息,来做黑白名单控制。

  • FlowSlot:则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。

  • DegradeSlot:则通过统计信息以及预设的规则,来做熔断降级。

注意:这里的插槽链都是一一对应资源名称的。

 

每个Slot执行完业务逻辑处理后,会调用fireEntry()方法,该方法将会触发下一个节点的entry方法,下一个节点又会调用他的fireEntry,以此类推直到最后一个Slot,由此就形成了Sentinel的责任链。

 

上面的所介绍的插槽(slot chain)是Sentinel非常重要的概念。同时还有一个非常重要的概念那就是Node。

Node之间的树形结构

在创建context会先创建DefaultNode 实际是它的父类EntranceNode,context可以相同context-name反复申明创建,但是DefaultNode同一context-name只会创建一次,DefaultNode包含了一个链路所有的资源,每一个资源对应一个ClusterNode,ClusterNode再根据来源细分为StatisticNode,它们之间的关系就是一个树形结构 如下:

  • EntranceNode:根据context-name来创建,就算同一个context-name多次创建context,entranceNode也只会创建一次, 用来统计该链路上所有的资源信息。

  • DefaultNode:根据context-name + resource-name创建,用来统计某链路上的资源信息。

  • ClusterNode:根据resource-name来创建,用来统计资源信息。

  • StatisticsNode:根据origin-name+resource-name来创建,针对请求来源统计该来源的资源信息,上面几个node都是它的子类,基于它的数据做汇总。

一定要搞清楚这几个node之间的关系和作用,下面重点来看StatisticsNode,它用来完成信息统计,以供后续的限流规则使用, 它只统计了两个维度数据,qps和线程数。

入门案例

  • 引入maven依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.12.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <!--整合Spring Cloud Alibaba--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.6.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
  • 下面举一个最简单的案例埋点来引出流控入口
public String getOrderInfo(String orderNo) { ContextUtil.enter("getOrderInfo", "application-a"); Entry entry = null; try { // name:资源名 EntryType 流量类型为入口还是出口,系统规则只针对入口流量, batchCount:当前请求流量, args:参数 entry = SphU.entry("getOrderInfo", EntryType.IN, 1, orderNo); getUserInfo(); } catch (BlockException e) { e.printStackTrace(); } finally { entry.exit(); } return "orderInfo = " + orderNo; } public String getUserInfo() { Entry entry = null; try { entry = SphU.entry("getUserInfo", EntryType.OUT, 1); // 通过http或feign对用户服务完成该接口调用 } catch (BlockException e) { e.printStackTrace(); } finally { entry.exit(); } return "userInfo"; } public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args) throws BlockException

也可以通过注解的方式引入,执行方法时SentinelResourceAspect会做拦截进行流控处理,当然什么都不配也是可以的,因为引入spring-cloud-starter-alibaba-sentinel包spring mvc和spring webflux做了适配,自动会对每一个请求做埋点

@GetMapping("getOrderInfo") @SentinelResource(value = "/getOrderInfo", entryType = EntryType.IN) public String getOrderInfo(@RequestParam("orderNo") String orderNo) { return "orderInfo = " + orderNo; }

ContextUtil.enter("getOrderInfo", "application-a") 来表示调用链的入口,可以暂时理解为上下文,一般不做声明后面会默认创建。

 

第一个参数为context-name,区分不同的调用链入口,默认常量值sentinel_default_context。

 

第二参数为调用来源,这个参数可以细分从不同应用来源发出的请求,授权规则白名单和黑名单会根据该参数做限制,然后通过SphU.entry()埋点进入,下面说下这个方法几个参数的含义:

  • name:当前资源名。
  • trafficType:流量类型 分别为入口流量和出口流量。入口流量和出口流量执行过程都是差不多,只是入口流量会多了一个系统规则拦截,像是上面案例从订单服务调用getUserInfo,getUserInfo是去调用用户服务,它的流量方式是出去的,压力都在用户服务那边,不用考虑当前服务器的压力,所以标为出口流量。
  • batchCount:当前流量数量,一般默认为1。
  • args:参数,后面做热点参数规则时用到。

BlockException:当某一规则不通过时会抛出对应异常。

SphU.entry(xxx) 需要与 entry.exit() 方法成对出现,匹配调用,如有嵌套像上面,需先退出getUserInfo的entry在退出getOrderInfo的entry  

打开打控制台,此时应该是空白的,sentinel控制台是懒加载模式,需要调用一下相关资源接口就可以看到

可以看到sentinel规则配置主要有流控规则,降级规则,热点规则,系统规则,授权规则,先简单介绍下规则作用,其它配置也很简单 一目了然,后面通过结合源码来深入分析:

  • 流控规则:针对资源流量控制。
  • 热点规则:针对资源的热点参数做流量控制。
  • 降级规则:针对资源的调度情况来做降级处理。
  • 系统规则:针对当前服务做全局流量控制。
  • 授权规则:对访问资源的特定应用做授权处理。

源码分析

从上面的Sentinel使用的示例代码,我们就从这里切入开始分析

ContextUtil.enter()

public class ContextUtil { /** * Store the context in ThreadLocal for easy access. */ private static ThreadLocal<Context> contextHolder = new ThreadLocal<>(); /** * Holds all {@link EntranceNode}. Each {@link EntranceNode} is associated with a distinct context name. */ private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>(); public static Context enter(String name, String origin) { // 判断上下文名称是否为默认的名称(sentinel_default_context) 是的话直接抛出异常 if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) { throw new ContextNameDefineException( "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!"); } return trueEnter(name, origin); } protected static Context trueEnter(String name, String origin) { // 先从ThreadLocal中尝试获取,获取到则直接返回 Context context = contextHolder.get(); if (context == null) { Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; // 尝试从缓存中获取该上下文名称对应的 入口节点 DefaultNode node = localCacheNameMap.get(name); if (node == null) { // 判断缓存中入口节点数量是否大于2000 if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { try { // 加锁 LOCK.lock(); node = contextNameNodeMap.get(name); // 双重检查锁 if (node == null) { // 判断缓存中入口节点数量是否大于2000 if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { // 根据上下文名称生成入口节点(entranceNode) node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); // Add entrance node. // 加入至全局根节点下 Constants.ROOT.addChild(node); // 加入缓存中 Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); } } } // 初始化上下文对象 context = new Context(node, name); context.setOrigin(origin); // 设置到当前线程中 contextHolder.set(context); } return context; } }

主要做了2件事情:

  • 1、根据ContextName生成entranceNode,并加入缓存,每个ContextName对应一个入口节点entranceNode。

  • 2、根据ContextName和entranceNode初始化上下文对象,并将上下文对象设置到当前线程中。

这里有几点需要注意:

  • 1、入口节点数量不能大于2000,大于会直接抛异常。
  • 2、每个ContextName对应一个入口节点entranceNode。
  • 3、每个entranceNode都有共同的父节点。也就是根节点。

SphU.entry(xxx)执行过程分析

public class SphU { private static final Object[] OBJECTS0 = new Object[0]; public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args) throws BlockException { // 默认为 出口流量类型,单位统计数为1 return Env.sph.entry(name, trafficType, batchCount, args); } } public class CtSph implements Sph { @Override public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { // 生成资源对象 StringResourceWrapper resource = new StringResourceWrapper(name, type); return entry(resource, count, args); } public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { return entryWithPriority(resourceWrapper, count, false, args); } }

上面的代码比较简单,不指定EntryType的话,则默认为出口流量类型,最终会调用entryWithPriority方法,主要业务逻辑也都在这个方法中

entryWithPriority方法

public class CtSph implements Sph { private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { // 获取当前线程上下文对象 Context context = ContextUtil.getContext(); // 上下文名称对应的入口节点是否已经超过阈值2000,超过则会返回空 CtEntry if (context instanceof NullContext) { return new CtEntry(resourceWrapper, null, context); } if (context == null) { // 如果没有指定上下文名称,则使用默认名称,也就是默认入口节点 context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // 全局开关 if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } // 生成插槽链 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * 表示资源(插槽链)超过6000,因此不会进行规则检查。 */ if (chain == null) { return new CtEntry(resourceWrapper, null, context); } // 生成 Entry 对象 // 创建一个流量入口,将context curEntry进行指定 Entry e = new CtEntry(resourceWrapper, chain, context); try { // 开始执行插槽链 调用逻辑 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { // 发生流控异常进行退出 清除上下文 e.exit(count, args); // 将异常向上抛 throw e1; } catch (Throwable e1) { // 除非Sentinel内部存在错误,否则不应发生这种情况。 RecordLog.info("Sentinel unexpected exception", e1); } return e; } }

这个方法可以说是涵盖了整个Sentinel的核心逻辑

  • 1、获取上下文对象,如果上下文对象还未初始化,则使用默认名称初始化。初始化逻辑在上文已经分析过。
  • 2、判断全局开关。
  • 3、根据给定的资源生成插槽链,插槽链是跟资源相关的,Sentinel最关键的逻辑也都在各个插槽中。初始化的逻辑在lookProcessChain(resourceWrapper);中,下面会分析。
  • 4、依顺序执行每个插槽逻辑。

lookProcessChain(resourceWrapper)方法

lookProcessChain方法为指定资源生成插槽链,下面我们来看下它的初始化逻辑

public class CtSph implements Sph { private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(); ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { // 根据资源尝试从全局缓存中获取 ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { // 非常常见的双重检查锁 synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // 判断资源数是否大于6000 if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // 初始化插槽链 chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; } }
  • 1、根据资源尝试从全局缓存中获取插槽链。每个资源对应一个插槽链(资源最多只能定义6000个)

  • 2、初始化插槽链上的插槽(SlotChainProvider.newSlotChain()方法中)

下面我们看下初始化插槽链上的插槽的逻辑

SlotChainProvider.newSlotChain()

public final class SlotChainProvider { private static volatile SlotChainBuilder slotChainBuilder = null; public static ProcessorSlotChain newSlotChain() { // 判断是否已经初始化过 if (slotChainBuilder != null) { return slotChainBuilder.build(); } // Resolve the slot chain builder SPI. // SPI获取构造器 slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(); // 加载失败则使用默认 插槽链 if (slotChainBuilder == null) { // Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } // 构建完成 return slotChainBuilder.build(); } } public final class SpiLoader<S> { private static final String SPI_FILE_PREFIX = "META-INF/services/"; //初始化spiLoader类加载器 public static <T> SpiLoader<T> of(Class<T> service) { AssertUtil.notNull(service, "SPI class cannot be null"); AssertUtil.isTrue(service.isInterface() || Modifier.isAbstract(service.getModifiers()), "SPI class[" + service.getName() + "] must be interface or abstract class"); String className = service.getName(); SpiLoader<T> spiLoader = SPI_LOADER_MAP.get(className); if (spiLoader == null) { synchronized (SpiLoader.class) { spiLoader = SPI_LOADER_MAP.get(className); if (spiLoader == null) { SPI_LOADER_MAP.putIfAbsent(className, new SpiLoader<>(service)); spiLoader = SPI_LOADER_MAP.get(className); } } } return spiLoader; } public S loadFirstInstanceOrDefault() { //通过spiLoader去 META-INF/services/目录下去加载文件 load(); for (Class<? extends S> clazz : classList) { if (defaultClass == null || clazz != defaultClass) { return createInstance(clazz); } } return loadDefaultInstance(); } }

SPI获取构造器——spiLoader类加载器,通过spiLoader去 META-INF/services/目录下去加载文件DefaultSlotChainBuilder。

下面是DefaultSlotChainBuilder的build方法:

@Spi(isDefault = true) public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { // 先创建一个default作为header // chain是一个单向链表 first -> next -> end ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // SPI机制——即SpiLoader在META-INF/services/目录下去加载文件 找到所有slot的实现,并根据@SpiOrder注解排序 List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); for (ProcessorSlot slot : sortedSlotList) { // 剔除掉Abstract类 if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } //addLast方法往链表的最后追加Slot对象。 chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; } }

首先初始化一个DefaultProcessorSlotChain,DefaultProcessorSlotChain里面维护了一个Slot的单向链表。

 

然后利用Java SPI机制——即SpiLoader在META-INF/services/目录下去加载文件,找到所有slot的实现,并根据@SpiOrder注解排序, 最后调用addLast方法往链表的最后追加Slot对象。

public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end = first; @Override public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; } } public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { public void setNext(AbstractLinkedProcessorSlot<?> next) { this.next = next; } }

至此SlotChain构造完成。

执行SlotChain链

SlotChain链执行的入口是 chain.entry(context, resourceWrapper, null, count, prioritized, args);这行

com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#entry

public class DefaultProcessorSlotChain extends ProcessorSlotChain { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { first.transformEntry(context, resourceWrapper, t, count, prioritized, args); } }

就是执行first的transformEntry

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } }

first的entry方法

public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; }

父类的fireEntry

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { private AbstractLinkedProcessorSlot<?> next = null; @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } } }

可以看到,实际上就是执行了next的transformEntry,而next的transformEntry又会调用它的entry方法,entry方法由每个Slot自己实现,只要在里面调用fireEntry,即触发next的next的entry调用。

 

这样继续下去,直到next为空

简单画个图描述一下这个过程:

exit()方法和entry方法类似,这里就不分析了。

至此SlotChain调用链分析完了,总结一下

  • 1、Chain中维护了一个单向链表
  • 2、通过fileEntry触发next的entry调用。
  • 3、责任链模式

Sentinel中预设的SlotChain执行的完整流程:

参考: https://www.cnblogs.com/taromilk/p/11750962.html

https://www.cnblogs.com/zzz-blogs/p/14342608.html

https://blog.csdn.net/qq_19414183/article/details/111035989

https://blog.csdn.net/wk52525/article/details/104439404

上一篇:Spring Cloud Alibaba——Sentinel核心概念
下一篇:没有了
网友评论