ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Sentinel 源码分析(一)

2022-05-08 20:00:08  阅读:186  来源: 互联网

标签:分析 node prioritized count resourceWrapper 源码 context Sentinel entry


版本:

  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
  <version>2.2.5.RELEASE</version>

在spring.factories中:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration,\     #controller层也可以被资源保护到得原因
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration,\
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration,\
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration,\   #核心配置类
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration  #Feign整合Sentinel

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
com.alibaba.cloud.sentinel.custom.SentinelCircuitBreakerConfiguration

主要看:SentinelAutoConfiguration,不用配置任何参数,默认生效得。

 里面有一个重要得bean,SentinelResourceAspect ,这个切面就是拦截@SentinelResource 注解标识的资源得。

 

 

 源码不长,把代码贴出来:

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
    
// 拦截 @SentinelResource 注解标识的资源 @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try {
// 开启资源保护,走流控,降级等配置的规则 entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// 业务逻辑 return pjp.proceed(); } catch (BlockException ex) {
// 处理Block异常,所有的Sentinel 规则的异常都走这里 return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) {
// 业务异常都走这里 Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) {
// 执行exit逻辑 entry.exit(1, pjp.getArgs()); } } } }

 

跟着:SphU.entry(resourceName, resourceType, entryType, pjp.getArgs())   的逻辑看下

在CtSph#entryWithPriority 下有个 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);  来构建一个Slotchain。

Sentinel会把页面配置的 流控规则,降级规则,系统规则,热点规则,授权规则每个规则都抽象出来一个Slot,比如:FlowSlot 处理流控规则,AuthoritySlot 处理授权规则,StatisticSlot 统计请求的QPS,成功的请求数,失败的请求数,阻塞的请求数,

最大响应时间等。上面说的构造的chain,就是通过责任链模式把这些slot都串起来,然后去执行。

来看下:

 ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
// 每个资源都会维护一个slotChain chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // 构建责任链 chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }

 

跟到:DefaultSlotChainBuilder#build()

 public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        //  Sentinel 维护了一套spi机制来加载ProcessorSlot接口的实现类,就是那些规则slot,会通过排序的,根据slot上的自定义注解@Spi中的order排序
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
              // 加载到的slot放入责任链中
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }

 

在sentinel-core核心包下面的有spi加载的对应的文件:

 

 最后构建出来的 ProcessorSlotChain 是下面的链条:

每个slot都实现 ProcessorSlot接口,对于具体的内容都在entry方法里面。

 NodeSelectorSlot:

    构建资源的链路,比如在控制台的簇点链路下面的树形结构。

 ClusterBuilderSlot:

           构建集群中的资源节点,会在集群限流中用到。

 LogSlot:

      记录些报错日志而已。

     

 

 StatisticSlot:

       这个slot就比较重要了,从名字就可以看出,这是做统计的,对于那些qps,成功,失败,阻塞的次数都会记录下来。@Override

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking. 执行下一个slot之前 并没有什么操作
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
// 执行完责任链后面的slot之后,执行我们的业务逻辑之前,就会进行qps 加1的操作
// 请求的线程数加1
node.increaseThreadNum();
// qps 请求数加1 这里是向滑动时间窗口中的bucket中增加数量 这个后面详细说明 node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers.
// 这里可以自己扩展 执行callBack
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) {
// sentinel 规则异常的处理 // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count.
// 增加block的qps node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) {
// Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } }

 

AuthoritySlot

  授权规则的校验

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
// 就是校验授权规则的黑白名单规则 checkBlackWhiteAuthority(resourceWrapper, context); fireEntry(context, resourceWrapper, node, count, prioritized, args); }

 

SystemSlot

     执行系统规则。

FlowSlot  

     执行限流规则。

 @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);
         // 触发下一个slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

checker 是 FlowRuleChecker 实例。

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
// 获取所有的限流规则 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) {
// 校验流控是否通过 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } }

canPassCheck 会根据流控规则时候选择的快速失败,Warm Up, 排队等待。分别使用DefaultController,WarmUpRateLimiterController,RateLimiterController类来控制,用到的限流算法分别是滑动时间窗口,

令牌桶,漏桶算法。

最后会走到 DefaultController#canPass   这里是快速失败规则会走到的地方。用到的是滑动时间窗口。

 public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 从滑动时间窗口中得到已经通过的qps 就是比较最近1s内的流量是否达到阈值 int curCount = avgUsedTokens(node);
// acquireCount 一般就是 1 ,判断是否达到限流的要求 if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
// 这里一般不走到 long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }

DegradeSlot

     熔断规则使用的。

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        performChecking(context, resourceWrapper);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
// 根据在控制台配置的熔断规则,得到断路器,这里一共两个类型的断路器 :ExceptionCircuitBreaker 当熔断策略是:异常比例,异常数的时候用。
// ResponseTimeCircuitBreaker: 熔断策略是:慢调用比例的时候使用 List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); if (circuitBreakers == null || circuitBreakers.isEmpty()) { return; } for (CircuitBreaker cb : circuitBreakers) { if (!cb.tryPass(context)) { throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); } } }

上面重要的限流逻辑和和降级逻辑后面单独分析,这里先把整体流程记录下。

责任链上的entry方法执行完之后,如果没有BlockException异常,也就是配置的规则都正常通过了,那就会执行业务逻辑了,业务逻辑执行完之后,在上面的 SentinelResourceAspect

切面中的finally方法中会调用责任链上的exit方法。

 

 在exit方法中,主要做一些数据的统计。涉及到上面说的流控和降级等。这个和分析流控算法和降级的时候再记录。

 Sentinel的核心流程就是这个责任链的执行。各种规则的校验都在这个链上执行的。

 

标签:分析,node,prioritized,count,resourceWrapper,源码,context,Sentinel,entry
来源: https://www.cnblogs.com/krock/p/16240466.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有