当前位置 博文首页 > 努力工作的小码农:sentinel流控规则校验之源码分析

    努力工作的小码农:sentinel流控规则校验之源码分析

    作者:努力工作的小码农 时间:2021-02-05 12:24

    前言:

      上节给大家把sentinel流控整个执行大致过了,但涉及到最核心的流控算法还没有讲,先提前说明一下 sentinel用的流控算法是令牌桶算法,参考了Guava的RateLimiter,有读过RateLimiter源码再理解sentinel限流算法会更容易,本节依然以源码为主给大家拨开sentinel流控算法的原理

    接着上节没有讲到的FlowSlot来看,先来看对应流控规则配置

     

     FlwSlot

    /***********************************************FlowSlot***********************************************/
    public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        private final FlowRuleChecker checker;
    
        public FlowSlot() {
            this(new FlowRuleChecker());
        }
    
        FlowSlot(FlowRuleChecker checker) {
            AssertUtil.notNull(checker, "flow checker should not be null");
            this.checker = checker;
        }
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            checkFlow(resourceWrapper, context, node, count, prioritized);
    
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    
        void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
            throws BlockException {
            checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
        }
    }
    
    public class FlowRuleChecker {
    
        public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                              Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
            if (ruleProvider == null || resource == null) {
                return;
            }
            // 拿到当前资源对应的流控规则
            Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
            if (rules != null) {
                for (FlowRule rule : rules) {
                    //通行校验
                    if (!canPassCheck(rule, context, node, count, prioritized)) {
                        throw new FlowException(rule.getLimitApp(), rule);
                    }
                }
            }
        }
    
        public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node,
                                                        int acquireCount) {
            return canPassCheck(rule, context, node, acquireCount, false);
        }
    
        public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                        boolean prioritized) {
            String limitApp = rule.getLimitApp();
            // 对应控制台配置的来源
            if (limitApp == null) {
                return true;
            }
            if (rule.isClusterMode()) {
                // 集群模式校验
                return passClusterCheck(rule, context, node, acquireCount, prioritized);
            }
            // 本地应用校验
            return passLocalCheck(rule, context, node, acquireCount, prioritized);
        }
    
        private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                              boolean prioritized) {
            // 针对配置的流控模式拿到对应的node                                  
            Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
            if (selectedNode == null) {
                return true;
            }
            // 针对配置的流控效果来作校验
            return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
        }
        
        static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
            String limitApp = rule.getLimitApp();
            int strategy = rule.getStrategy();
            String origin = context.getOrigin();
    
            if (limitApp.equals(origin) && filterOrigin(origin)) {
                if (strategy == RuleConstant.STRATEGY_DIRECT) {
                    // 配置的来源和当前相同 流控模式为直接
                    return context.getOriginNode();
                }
    
                return selectReferenceNode(rule, context, node);
            } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
                if (strategy == RuleConstant.STRATEGY_DIRECT) {
                    // 配置来源为default 流控模式为直接
                    return node.getClusterNode();
                }
    
                return selectReferenceNode(rule, context, node);
            } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
                && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
                if (strategy == RuleConstant.STRATEGY_DIRECT) {
                    // 配置来源为other 流控模式为直接
                    return context.getOriginNode();
                }
    
                return selectReferenceNode(rule, context, node);
            }
    
            return null;
        }
    
        static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
            // 关联资源
            String refResource = rule.getRefResource();
            int strategy = rule.getStrategy();
    
            if (StringUtil.isEmpty(refResource)) {
                return null;
            }
            // 流控模式为关联
            if (strategy == RuleConstant.STRATEGY_RELATE) {
                return ClusterBuilderSlot.getClusterNode(refResource);
            }
            // 流控模式为链路
            if (strategy == RuleConstant.STRATEGY_CHAIN) {
                if (!refResource.equals(context.getName())) {
                    return null;
                }
                return node;
            }
            // No node.
            return null;
        }
    }

    流控类型只针对qps或线程数限制

    流控模式分别有三种直接:originNode,关联ClusterNode,链路EntranceNode,针对这几个node的区别上节已经做过说明,不清楚的读者打开上节node树形结构一看便知

    流控效果对应四种,具体实现由TrafficShappingController的实现类完成

    1. 快速失败(DefaultController):如果是限制qps会抛出异常,线程数返回false不通过
    2. Warm Up(WarmUpController):预热,防止流量突然暴增,导致系统负载过重,有时候系统设置的最大负载是在理想状态达到的,当系统长时间处于冷却状态 需要通过一定时间的预热才能达到最大负载,比如跟数据库建立连接
    3. 排队等待(RateLimiterController):当前没有足够的令牌通过,会进行睡眠等待,直接能拿到足够的令牌数
    4. WarmUpRateLimiterController:第二种和第三种的结合

    来看TrafficShappingController具体的实现类

    DefaultController

    public class DefaultController implements TrafficShapingController {
    
        private static final int DEFAULT_AVG_USED_TOKENS = 0;
    
        //阀值
        private double count;
        
        // 1=qps,0=ThreadNum
        private int grade;
    
        public DefaultController(double count, int grade) {
            this.count = count;
            this.grade = grade;
        }
    
        @Override
        public boolean canPass(Node node, int acquireCount) {
            return canPass(node, acquireCount, false);
        }
    
        @Override
        public boolean canPass(Node node, int acquireCount, boolean prioritized) {
            // 获取当前node的ThreadNum或QPS
            int curCount = avgUsedTokens(node);
            if (curCount + acquireCount > count) { 
            // 设置当前流量为优先级和流控模式为QPS if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); // 算出拿到当前令牌数的等待时间(ms) waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); // OccupyTimeoutProperty.getOccupyTimeout = 500ms // 如果流量具有优先级,会获取未来的令牌数 if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                // 添加占用未来的QPS,会调用OccupiableBucketLeapArray.addWaiting(long time, int acquireCount) node.addWaitingRequest(currentTime
    + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } // 控制的是线程数返回false return false; } return true; } private int avgUsedTokens(Node node) { if (node == null) { return DEFAULT_AVG_USED_TOKENS; } return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); } private void sleep(long timeMillis) { try { Thread.sleep(timeMillis); } catch (InterruptedException e) { // Ignore. } } }

    在上节讲滑动窗口的时候,还有一个秒维度的窗口OccupiableBucketLeapArray没有讲解,它同样继承LeapArray,但它还有额外的概念 未来占用,在DefaultController中当前令牌数不够并且流量具有优先级,那么会提前获取未来的令牌,因为阀值固定每秒能获取的令牌数也固定,既然占用了未来的令牌数,那等到时间到了这个未来时间点,当前可获取的令牌数=阀值—之前占用的令牌

    OccupiableBucketLeapArray有个FutureBucketLeapArray就是来存储占用未来的窗口数据

    public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {
    
        private final FutureBucketLeapArray borrowArray;
    
        public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
            // This class is the original "CombinedBucketArray".
            super(sampleCount, intervalInMs);
            this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
        }
    
        @Override
        // LeapArray.currentWindow(long timeMillis)添加新窗口时会调用
        public MetricBucket newEmptyBucket(long time) {
            MetricBucket newBucket = new MetricBucket();
            // 已被之前占用,将之前占用的数据添加到当前新的窗口中
            MetricBucket borrowBucket = borrowArray.getWindowValue(time);
            if (borrowBucket != null) {
                newBucket.reset(borrowBucket);
            }
    
            return newBucket;
        }
    
        @Override
        protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
            // Update the start time and reset value.
            w.resetTo(time);
            // 重置时当前窗口数据时,也需要考虑被之前占用的情况
            MetricBucket borrowBucket = borrowArray.getWindowValue(time);
            if (borrowBucket != null) {
                w.value().reset();
                w.value().addPass((int)borrowBucket.pass());
            } else {
                w.value().reset();
            }
    
            return w;
        }
    
        @Override
        public long currentWaiting() {
            // 获取当前时间被之前占用的qps
            borrowArray.currentWindow();
            long currentWaiting = 0;
            List<MetricBucket> list = borrowArray.values();
    
            for (MetricBucket window : list) {
                currentWaiting += window.pass();
            }
            return currentWaiting;
        }
    
        @Override
        public void addWaiting(long time, int acquireCount) {
            // 添加到未来窗口
            WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
            window.value().add(MetricEvent.PASS, acquireCount);
        }
    }

    当新窗口添加或重置旧窗口数据都需要考虑之前占用的情况,然后把之前占用的窗口数据添加进去

    RateLimiterController

    跟Guava中SmoothBursty原理类似

    public class RateLimiterController implements TrafficShapingController {
    
        // 最大等待时间
        private final int maxQueueingTimeMs;
        // 阀值
        private final double count;
    
        // 最新一次拿令牌的时间
        private final AtomicLong latestPassedTime = new AtomicLong(-1);
    
        public RateLimiterController(int timeOut, double count) {
            this.maxQueueingTimeMs = timeOut;
            this.count = count;
        }
    
        @Override
        public boolean canPass(Node node, int acquireCount) {