当前位置 博文首页 > 努力工作的小码农:sentinel流控规则校验之源码分析
前言:
上节给大家把sentinel流控整个执行大致过了,但涉及到最核心的流控算法还没有讲,先提前说明一下 sentinel用的流控算法是令牌桶算法,参考了Guava的RateLimiter,有读过RateLimiter源码再理解sentinel限流算法会更容易,本节依然以源码为主给大家拨开sentinel流控算法的原理
接着上节没有讲到的FlowSlot来看,先来看对应流控规则配置
/***********************************************FlowSlot***********************************************/ public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { private final FlowRuleChecker checker; public FlowSlot() { this(new FlowRuleChecker()); } FlowSlot(FlowRuleChecker checker) { AssertUtil.notNull(checker, "flow checker should not be null"); this.checker = checker; } @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); } } public class FlowRuleChecker { public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } // 拿到当前资源对应的流控规则 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { //通行校验 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } } public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount) { return canPassCheck(rule, context, node, acquireCount, false); } public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { String limitApp = rule.getLimitApp(); // 对应控制台配置的来源 if (limitApp == null) { return true; } if (rule.isClusterMode()) { // 集群模式校验 return passClusterCheck(rule, context, node, acquireCount, prioritized); } // 本地应用校验 return passLocalCheck(rule, context, node, acquireCount, prioritized); } private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { // 针对配置的流控模式拿到对应的node Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } // 针对配置的流控效果来作校验 return rule.getRater().canPass(selectedNode, acquireCount, prioritized); } static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) { String limitApp = rule.getLimitApp(); int strategy = rule.getStrategy(); String origin = context.getOrigin(); if (limitApp.equals(origin) && filterOrigin(origin)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { // 配置的来源和当前相同 流控模式为直接 return context.getOriginNode(); } return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { // 配置来源为default 流控模式为直接 return node.getClusterNode(); } return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { if (strategy == RuleConstant.STRATEGY_DIRECT) { // 配置来源为other 流控模式为直接 return context.getOriginNode(); } return selectReferenceNode(rule, context, node); } return null; } static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { // 关联资源 String refResource = rule.getRefResource(); int strategy = rule.getStrategy(); if (StringUtil.isEmpty(refResource)) { return null; } // 流控模式为关联 if (strategy == RuleConstant.STRATEGY_RELATE) { return ClusterBuilderSlot.getClusterNode(refResource); } // 流控模式为链路 if (strategy == RuleConstant.STRATEGY_CHAIN) { if (!refResource.equals(context.getName())) { return null; } return node; } // No node. return null; } }
流控类型只针对qps或线程数限制
流控模式分别有三种直接:originNode,关联ClusterNode,链路EntranceNode,针对这几个node的区别上节已经做过说明,不清楚的读者打开上节node树形结构一看便知
流控效果对应四种,具体实现由TrafficShappingController的实现类完成
来看TrafficShappingController具体的实现类
public class DefaultController implements TrafficShapingController { private static final int DEFAULT_AVG_USED_TOKENS = 0; //阀值 private double count; // 1=qps,0=ThreadNum private int grade; public DefaultController(double count, int grade) { this.count = count; this.grade = grade; } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // 获取当前node的ThreadNum或QPS int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) {
// 设置当前流量为优先级和流控模式为QPS if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); // 算出拿到当前令牌数的等待时间(ms) waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); // OccupyTimeoutProperty.getOccupyTimeout = 500ms // 如果流量具有优先级,会获取未来的令牌数 if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
// 添加占用未来的QPS,会调用OccupiableBucketLeapArray.addWaiting(long time, int acquireCount) node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } // 控制的是线程数返回false return false; } return true; } private int avgUsedTokens(Node node) { if (node == null) { return DEFAULT_AVG_USED_TOKENS; } return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); } private void sleep(long timeMillis) { try { Thread.sleep(timeMillis); } catch (InterruptedException e) { // Ignore. } } }
在上节讲滑动窗口的时候,还有一个秒维度的窗口OccupiableBucketLeapArray没有讲解,它同样继承LeapArray,但它还有额外的概念 未来占用,在DefaultController中当前令牌数不够并且流量具有优先级,那么会提前获取未来的令牌,因为阀值固定每秒能获取的令牌数也固定,既然占用了未来的令牌数,那等到时间到了这个未来时间点,当前可获取的令牌数=阀值—之前占用的令牌
OccupiableBucketLeapArray有个FutureBucketLeapArray就是来存储占用未来的窗口数据
public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> { private final FutureBucketLeapArray borrowArray; public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) { // This class is the original "CombinedBucketArray". super(sampleCount, intervalInMs); this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs); } @Override // LeapArray.currentWindow(long timeMillis)添加新窗口时会调用 public MetricBucket newEmptyBucket(long time) { MetricBucket newBucket = new MetricBucket(); // 已被之前占用,将之前占用的数据添加到当前新的窗口中 MetricBucket borrowBucket = borrowArray.getWindowValue(time); if (borrowBucket != null) { newBucket.reset(borrowBucket); } return newBucket; } @Override protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) { // Update the start time and reset value. w.resetTo(time); // 重置时当前窗口数据时,也需要考虑被之前占用的情况 MetricBucket borrowBucket = borrowArray.getWindowValue(time); if (borrowBucket != null) { w.value().reset(); w.value().addPass((int)borrowBucket.pass()); } else { w.value().reset(); } return w; } @Override public long currentWaiting() { // 获取当前时间被之前占用的qps borrowArray.currentWindow(); long currentWaiting = 0; List<MetricBucket> list = borrowArray.values(); for (MetricBucket window : list) { currentWaiting += window.pass(); } return currentWaiting; } @Override public void addWaiting(long time, int acquireCount) { // 添加到未来窗口 WindowWrap<MetricBucket> window = borrowArray.currentWindow(time); window.value().add(MetricEvent.PASS, acquireCount); } }
当新窗口添加或重置旧窗口数据都需要考虑之前占用的情况,然后把之前占用的窗口数据添加进去
跟Guava中SmoothBursty原理类似
public class RateLimiterController implements TrafficShapingController { // 最大等待时间 private final int maxQueueingTimeMs; // 阀值 private final double count; // 最新一次拿令牌的时间 private final AtomicLong latestPassedTime = new AtomicLong(-1); public RateLimiterController(int timeOut, double count) { this.maxQueueingTimeMs = timeOut; this.count = count; } @Override public boolean canPass(Node node, int acquireCount) {