当前位置:网站首页>Alibaba Sentinel - Slot chain解析

Alibaba Sentinel - Slot chain解析

2022-08-11 08:12:00 普通人zzz~

一、插槽链的构建

我们先回顾一下 Alibaba Sentinel - 工作流程及原理解析 中插槽链的构建。

通过 SlotChainBuilder 对象,构建一个插槽链。

public final class SlotChainProvider {
    

    private static volatile SlotChainBuilder slotChainBuilder = null;

    // 创建一个新的插槽:通过 SlotChainBuilder 对象构建
    public static ProcessorSlotChain newSlotChain() {
    
        if (slotChainBuilder != null) {
    
            return slotChainBuilder.build();
        }

        // SPI:获取SlotChainBuilder,并加载第一个实例或默认值
        slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

        if (slotChainBuilder == null) {
    
            // Should not go through here.
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
    
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
                slotChainBuilder.getClass().getCanonicalName());
        }
        // 构建插槽链
        return slotChainBuilder.build();
    }

    private SlotChainProvider() {
    }
}

构建插槽链 slotChainBuilder.build(),代码如下:

@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
    

    @Override
    public ProcessorSlotChain build() {
    
    	// 先创建一个默认的 ProcessorSlotChain
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

		// SPI 机制:获取所有 ProcessorSlot(默认的、自定义的),并排序后返回
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        for (ProcessorSlot slot : sortedSlotList) {
    
        	// 去除脏数据
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
    
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}

在这里插入图片描述

二、ProcessorSlot

在 Sentinel 里面,所有的资源都对应一个资产名称以及一个 Entry。 Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显示创建;每一个 Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,通过责任链方式进行构建,整体框架如下:
在这里插入图片描述

这些插槽(slot chain)具有不同职责。

  • NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流、降级。
  • ClusterBuilderSlot:用于存储资产的统计信息,以及调用者信息,例如,该资源的RT(平均响应时间)、QPS、Thread Count等等,这些信息将用于作为多维度限流、降级的依据。
  • StatisticsSlot:用于记录、统计不同维度的 runtime 指标监控信息。
  • ParamFlowSlot:用于资源配置热点参数、限流规则以及前面 slot 统计的状态,来进行流量控制。
  • SystemSlot:通过系统的状态,例如 load1 等,来控制总的入口流量。
  • AuthoritySlot:根据配置的黑白名单和调用来源信息,来做黑白名单控制。
  • FlowSlot:用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。
  • DegradeSlot:通过统计信息以及预设的规则,来做熔断降级。

其中 ParamFlowSlot 需要引入另外一个 jar 包,相当于自定义插槽实现的,这里不做过多降级,有兴趣可以参考 Sentinel 官网

所有的插槽都继承自 AbstractLinkedProcessorSlot 抽象类(顶层为 ProcessorSlot 实现)。
在这里插入图片描述

public interface ProcessorSlot<T> {
    
    // 执行
    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
               Object... args) throws Throwable;

    // 链路调用entry
    void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
                   Object... args) throws Throwable;

    // 退出当前slot
    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

    // 链路调用exit
    void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

当然,我们也可以自定义插槽。

Sentinel 将 com.alibaba.csp.sentinel.slotchain.ProcessorSlot 作为 SPI 接口进行扩展(1.7.2 版本以前 com.alibaba.csp.sentinel.slotchain.SlotChainBuilder 作为 SPI),使得 Slot Chain 具备了扩展的能力。

您可以自行加入自定义的 slot 并编排 slot 间的顺序,从而可以给 Sentinel 添加自定义的功能。

在这里插入图片描述

默认处理器插槽的顺序

// NodeSelectorSlot
public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
// ClusterBuilderSlot
public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
// LogSlot
public static final int ORDER_LOG_SLOT = -8000;
// StatisticsSlot
public static final int ORDER_STATISTIC_SLOT = -7000;
// AuthoritySlot
public static final int ORDER_AUTHORITY_SLOT = -6000;
// SystemSlot
public static final int ORDER_SYSTEM_SLOT = -5000;
// FlowSlot
public static final int ORDER_FLOW_SLOT = -2000;
// DegradeSlot
public static final int ORDER_DEGRADE_SLOT = -1000;

下面我会根据整个插槽链的处理顺序进行源码分析。

1. NodeSelectorSlot

负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流、降级。

例如:

ContextUtil.enter("entrance1", "appA");
Entry nodeA = SphU.entry("nodeA");
if (nodeA != null) {
    
    nodeA.exit();
}
ContextUtil.exit();
ContextUtil.enter("entrance2", "appA");
nodeA = SphU.entry("nodeA");
if (nodeA != null) {
    
    nodeA.exit();
}
ContextUtil.exit();

// 上面的代码会在内存中生成如下调用结构:
/** * * machine-root * / \ * / \ * EntranceNode1 EntranceNode2 * / \ * / \ * DefaultNode(nodeA) DefaultNode(nodeA) * | | * +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA); */

注意:可以通过调用 http:localhost:8719tree?type=root 来检查这个结构

核心代码:

@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
    

    // 存储当前资源所有调用Node
    // key -> 上下文name value -> 节点node
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
    
        // 根据 context name 获取Node
        DefaultNode node = map.get(context.getName());
        if (node == null) {
    
            synchronized (this) {
    
                node = map.get(context.getName());
                // 不存在,创建一个Node
                if (node == null) {
    
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    
                    // Build invocation tree
                    // 构建调用树
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }
        
		// 设置当前节点
        context.setCurNode(node);
        
        // 调用下一个链路
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    
    	// 退出
        fireExit(context, resourceWrapper, count, args);
    }
}

2. ClusterBuilderSlot

用于存储资源的统计信息,以及调用者信息,例如,该资源的RT、QPS、Thread count等,这些信息将用于作为多维度限流、降级的依据。

@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    

    // 集群节点信息
    // key -> 资源 value -> 集群节点
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

    private static final Object lock = new Object();

    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
    
        if (clusterNode == null) {
    
            synchronized (lock) {
    
                if (clusterNode == null) {
    
                    // Create the cluster node.
                    // 创建集群节点
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
        // 给当前Node 设置 ClusterNode 信息
        node.setClusterNode(clusterNode);

        // 如果设置了上下文来源origin,我们应该获取或创建 OriginNode
        if (!"".equals(context.getOrigin())) {
    
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        // 调用下一个链路
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    
    	// 退出
        fireExit(context, resourceWrapper, count, args);
    }

    // 根据资源id、节点类型获取 ClusterNode
    public static ClusterNode getClusterNode(String id, EntryType type) {
    
        return clusterNodeMap.get(new StringResourceWrapper(id, type));
    }

    // 根据资源id获取 ClusterNode
    public static ClusterNode getClusterNode(String id) {
    
        if (id == null) {
    
            return null;
        }
        ClusterNode clusterNode = null;

        for (EntryType nodeType : EntryType.values()) {
    
            clusterNode = clusterNodeMap.get(new StringResourceWrapper(id, nodeType));
            if (clusterNode != null) {
    
                break;
            }
        }

        return clusterNode;
    }
    
    public static Map<ResourceWrapper, ClusterNode> getClusterNodeMap() {
    
        return clusterNodeMap;
    }
	
	// 重置
    public static void resetClusterNodes() {
    
        for (ClusterNode node : clusterNodeMap.values()) {
    
            node.reset();
        }
    }
}

3. LogSlot

用于日志监控。

4. StatisticsSlot

用于记录、统计不同维度的 runtime 指标监控信息。

@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
    
        try {
    
            // Do some checking.
            // 执行后续链路的entry方法,顺序是 AuthoritySlot -> SystemSlot -> FlowSlot -> DegradeSlot,做一些白名单、系统状态、规则限流、熔断检查
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
			
			// 请求通过,添加线程数
            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
			// 请求通过,添加请求通过数
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
    
            	// 添加 OrginNode 统计计数
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
    
            	// 如果请求类型为IN,添加全局统计计数
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

			// 执行onPass CallBack
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
    
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
    
        	// PriorityWaitException 异常处理,添加线程数
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
    
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
    
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
    
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
    
        	// BlockException 异常处理,添加阻塞Qps
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
    
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
    
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

			// 执行onBlocked CallBack
            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
    
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
    
        	// 其他异常
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    
    	// 退出
        Node node = context.getCurNode();

        if (context.getCurEntry().getBlockError() == null) {
    
            // Calculate response time (use completeStatTime as the time of completion).
            long completeStatTime = TimeUtil.currentTimeMillis();
            context.getCurEntry().setCompleteTimestamp(completeStatTime);
            long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();

            Throwable error = context.getCurEntry().getError();

            // Record response time and success count.
            recordCompleteFor(node, count, rt, error);
            recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
            if (resourceWrapper.getEntryType() == EntryType.IN) {
    
                recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
            }
        }

        // Handle exit event with registered exit callback handlers.
        Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
        for (ProcessorSlotExitCallback handler : exitCallbacks) {
    
            handler.onExit(context, resourceWrapper, count, args);
        }

        // fix bug https://github.com/alibaba/Sentinel/issues/2374
        fireExit(context, resourceWrapper, count, args);
    }

    private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {
    
        if (node == null) {
    
            return;
        }
        node.addRtAndSuccess(rt, batchCount);
        node.decreaseThreadNum();

        if (error != null && !(error instanceof BlockException)) {
    
            node.increaseExceptionQps(batchCount);
        }
    }
}

5. AuthoritySlot

根据配置的黑白名单和调用来源信息,来做黑白名单控制。

@Spi(order = Constants.ORDER_AUTHORITY_SLOT)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
    
        // 黑白名单检查
        checkBlackWhiteAuthority(resourceWrapper, context);
        // 调用下一个链路
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    
    	// 退出
        fireExit(context, resourceWrapper, count, args);
    }

	// 黑白名单检查
    void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
    
    	// 获取所有 AuthorityRule
        Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();

        if (authorityRules == null) {
    
            return;
        }

		// 获取当前 Resource 的所有AuthorityRule
        Set<AuthorityRule> rules = authorityRules.get(resource.getName());
        if (rules == null) {
    
            return;
        }

		// 黑白名单检查
        for (AuthorityRule rule : rules) {
    
            if (!AuthorityRuleChecker.passCheck(rule, context)) {
    
                throw new AuthorityException(context.getOrigin(), rule);
            }
        }
    }
}

6. SystemSlot

通过系统的状态,例如load1 等,来控制总的入口流量。

@Spi(order = Constants.ORDER_SYSTEM_SLOT)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
    
        // 系统检查
        SystemRuleManager.checkSystem(resourceWrapper, count);
        // 调用下一个链路
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    
    	// 退出
        fireExit(context, resourceWrapper, count, args);
    }
}

SystemRuleManager.checkSystem(resourceWrapper, count); 方法如下:

public final class SystemRuleManager {
    
	public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {
    
        if (resourceWrapper == null) {
    
            return;
        }
        
        // 确保检查开关已打开。
        if (!checkSystemStatus.get()) {
    
            return;
        }

        // 只检查入口流量
        if (resourceWrapper.getEntryType() != EntryType.IN) {
    
            return;
        }

        // 统计 qps
        double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.passQps();
        if (currentQps + count > qps) {
    
            throw new SystemBlockException(resourceWrapper.getName(), "qps");
        }

        // 统计 线程数
        int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
        if (currentThread > maxThread) {
    
            throw new SystemBlockException(resourceWrapper.getName(), "thread");
        }

		// 统计 平均响应时间
        double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
        if (rt > maxRt) {
    
            throw new SystemBlockException(resourceWrapper.getName(), "rt");
        }

        // load. BBR algorithm.
        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
    
            if (!checkBbr(currentThread)) {
    
                throw new SystemBlockException(resourceWrapper.getName(), "load");
            }
        }

        // cpu usage
        if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
    
            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }
}

7. FlowSlot

用于根据预设的限流规则已经前面的 slot 统计的状态,来进行流量控制。

@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    

    private final FlowRuleChecker checker;

    public FlowSlot() {
    
        this(new FlowRuleChecker());
    }

    FlowSlot(FlowRuleChecker checker) {
    
        AssertUtil.notNull(checker, "flow checker should not be null");
        this.checker = checker;
    }

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
    
        // 规则检查
        checkFlow(resourceWrapper, context, node, count, prioritized);

		// 调用下一个链路
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    // 规则检查
    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
    
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    
    	// 退出
        fireExit(context, resourceWrapper, count, args);
    }

	// 获取当前 Resource 所有 FlowRule
    private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
    
        @Override
        public Collection<FlowRule> apply(String resource) {
    
            // Flow rule map should not be null.
            Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
            return flowRules.get(resource);
        }
    };
}

8. DegradeSlot

通过统计信息以及预设的规则,来做熔断降级。

@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
    
        // 检查
        performChecking(context, resourceWrapper);

		// 调用下一个链路
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    // 检查
    void performChecking(Context context, ResourceWrapper r) throws BlockException {
    
    	// 获取当前 Resource 所有 CircuitBreaker
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
    
            return;
        }
        for (CircuitBreaker cb : circuitBreakers) {
    
        	// 尝试通过
            if (!cb.tryPass(context)) {
    
            	// 抛出降级异常DegradeException
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper r, int count, Object... args) {
    
        Entry curEntry = context.getCurEntry();
        if (curEntry.getBlockError() != null) {
    
            fireExit(context, r, count, args);
            return;
        }
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
    
            fireExit(context, r, count, args);
            return;
        }

        if (curEntry.getBlockError() == null) {
    
            // passed request
            for (CircuitBreaker circuitBreaker : circuitBreakers) {
    
                circuitBreaker.onRequestComplete(context);
            }
        }

        fireExit(context, r, count, args);
    }
}

DegradeException 异常类 类结构图如下,会在 StatisticsSlot 中被捕获处理,代码如下:
在这里插入图片描述

@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
    
        try {
    
            ......
        } catch (PriorityWaitException ex) {
    
        	// PriorityWaitException 异常处理,添加线程数
            ......
        } catch (BlockException e) {
    
        	// BlockException 异常处理,添加阻塞Qps
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
    
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
    
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

			// 执行onBlocked CallBack:降级
            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
    
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
    
        	// 其他异常
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }
}

经过上面的分析,Sentinel通过各个 Slot 完成统计、限流、降级等逻辑,但Sentinel 是如何进行流量统计(线程数、RT、QPS等)的核心代码还未分析到,实际上,Sentinel 是通过滑动窗口完成上面各个维度的统计的。

三、Sentinel 滑动窗口

Alibaba Sentinel - 滑动窗口

原网站

版权声明
本文为[普通人zzz~]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_33375499/article/details/126044845