Dubbo loadbalance(2) - RoundRobin
RoundRobinLoadBalance
本文讨论的加权轮训算法:RoundRobinLoadBalance,介绍加权轮训算法怎么一步一步演进的。版本:2.6.5版本开始说起。
RoundRobinLoadBalance-2.6.4
RoundRobinLoadBalance-2.6.4,假设我们启动了三个服务提供者实例,权重配置分别1,2,3
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int length = invokers.size(); int maxWeight = 0; int minWeight = Integer.MAX_VALUE; final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); int weightSum = 0; for (int i = 0; i < length; i++) { int weight = NoahLeastActiveLoadBalance.getWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); minWeight = Math.min(minWeight, weight); if (weight > 0) { invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); weightSum += weight; } } AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } int currentSequence = sequence.getAndIncrement(); if (maxWeight > 0 && minWeight < maxWeight) { int mod = currentSequence % weightSum; for (int i = 0; i < maxWeight; i++) { for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<T> k = each.getKey(); final IntegerWrapper v = each.getValue(); if (mod == 0 && v.getValue() > 0) { return k; } if (v.getValue() > 0) { v.decrement(); mod--; } } } } return invokers.get(currentSequence % length); }
|
代码结果运行如下:
mod |
20881权重 |
20882权重 |
20883权重 |
最终结果 |
循环次数 |
0 |
1 |
2 |
3 |
20881 |
1 |
1 |
0 |
2 |
3 |
20882 |
2 |
2 |
0 |
1 |
3 |
20883 |
3 |
3 |
0 |
1 |
2 |
20882 |
4 |
4 |
0 |
0 |
2 |
20883 |
5 |
5 |
0 |
0 |
1 |
20883 |
6 |
6 |
1 |
2 |
3 |
20881 |
1 |
关于此问题的Github地址:https://github.com/apache/dubbo/issues/2578
RoundRobinLoadBalance-2.6.4优化第一版
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int length = invokers.size(); int maxWeight = 0; int minWeight = Integer.MAX_VALUE; final List<Invoker<T>> nonZeroWeightedInvokers = new ArrayList<>(); for (int i = 0; i < length; i++) { int weight = NoahLeastActiveLoadBalance.getOriginalWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); minWeight = Math.min(minWeight, weight); if (weight > 0) { nonZeroWeightedInvokers.add(invokers.get(i)); } } AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); }
if (maxWeight > 0 && minWeight < maxWeight) { AtomicPositiveInteger indexSeq = indexSeqs.get(key); if (indexSeq == null) { indexSeqs.putIfAbsent(key, new AtomicPositiveInteger(-1)); indexSeq = indexSeqs.get(key); } length = nonZeroWeightedInvokers.size(); AtomicInteger atomicCount=new AtomicInteger(1);
while (true) { int index = indexSeq.incrementAndGet() % length; int currentWeight; if (index == 0) { currentWeight = sequence.incrementAndGet() % maxWeight; log.info("index:{},需要重新计算,currentWeight:{}", index, currentWeight); } else { currentWeight = sequence.get() % maxWeight; }
log.info("index:{},currentWeight:{}", index, currentWeight); atomicCount.incrementAndGet();
if (NoahLeastActiveLoadBalance.getOriginalWeight(nonZeroWeightedInvokers.get(index), invocation) > currentWeight) { log.info("sequences:{},选中的端口:{},循环次数:{}", sequence.get(), nonZeroWeightedInvokers.get(index).getUrl().getPort(), atomicCount.get()); log.info("==========over========"); return nonZeroWeightedInvokers.get(index); } } } return invokers.get(sequence.getAndIncrement() % length); }
|
执行结果:
但是本次的优化,还是有点不足,我们可以看到,前五次的请求都请求到20881端口上了,不够平滑。我们继续看大佬们怎么优化的。
RoundRobinLoadBalance-nginx
通过issue我们可以直接知道,这个大佬是借鉴了nginx的。平滑加权轮训。
我们直接上代码:
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>()); int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; WeightedRoundRobin selectedWRR = null; for (Invoker<T> invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString(); int weight = NoahLeastActiveLoadBalance.getWeight(invoker, invocation); WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> { WeightedRoundRobin wrr = new WeightedRoundRobin(); wrr.setWeight(weight); return wrr; }); if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } totalWeight += weight; } if (invokers.size() != map.size()) { map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); } if (selectedInvoker != null) { selectedWRR.sel(totalWeight); return selectedInvoker; } return invokers.get(0); }
|
大白话说下,代码执行过程。
每台服务器有2个weight,一个是配置的weight不会变化,另一个是currentWeight会动态调整的,初始值为0。当有新的请求进来时候,遍历服务器列表,让它的currentWeight加上自身权重,找到最大的currentWeight。然后减去权重总和,等到新一轮计算的currentWeight。
代码执行结果:
我们分析一下:前面2次的计算过程
第一次计算:
第二次的计算:
然后我们看下是怎么平滑的:
最后我觉得非常神奇,虽然我理解了每一步的计算过程,但是我不知道为什么最终形成了一个闭环。