发布于 

Dubbo loadbalance(2) - RoundRobin

RoundRobinLoadBalance

本文讨论的加权轮训算法:RoundRobinLoadBalance,介绍加权轮训算法怎么一步一步演进的。版本:2.6.5版本开始说起。

RoundRobinLoadBalance-2.6.4

RoundRobinLoadBalance-2.6.4,假设我们启动了三个服务提供者实例,权重配置分别1,2,3

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // 总个数
int maxWeight = 0; // 最大权重
int minWeight = Integer.MAX_VALUE; // 最小权重
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;
for (int i = 0; i < length; i++) {
int weight = NoahLeastActiveLoadBalance.getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // 累计最大权重
minWeight = Math.min(minWeight, weight); // 累计最小权重
if (weight > 0) {
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
weightSum += weight;
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
int currentSequence = sequence.getAndIncrement();
if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
int mod = currentSequence % weightSum;
for (int i = 0; i < maxWeight; i++) {
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
final Invoker<T> k = each.getKey();
final IntegerWrapper v = each.getValue();
if (mod == 0 && v.getValue() > 0) {
return k;
}
if (v.getValue() > 0) {
v.decrement();
mod--;
}
}
}
}
// 取模轮循
return invokers.get(currentSequence % length);
}

代码结果运行如下:

mod 20881权重 20882权重 20883权重 最终结果 循环次数
0 1 2 3 20881 1
1 0 2 3 20882 2
2 0 1 3 20883 3
3 0 1 2 20882 4
4 0 0 2 20883 5
5 0 0 1 20883 6
6 1 2 3 20881 1

加权轮训第一版的问题

关于此问题的Github地址:https://github.com/apache/dubbo/issues/2578

RoundRobinLoadBalance-2.6.4优化第一版

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // Number of invokers
int maxWeight = 0; // The maximum weight
int minWeight = Integer.MAX_VALUE; // The minimum weight
final List<Invoker<T>> nonZeroWeightedInvokers = new ArrayList<>();
for (int i = 0; i < length; i++) {
int weight = NoahLeastActiveLoadBalance.getOriginalWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
minWeight = Math.min(minWeight, weight); // Choose the minimum weight
if (weight > 0) {
nonZeroWeightedInvokers.add(invokers.get(i));
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}

if (maxWeight > 0 && minWeight < maxWeight) {
AtomicPositiveInteger indexSeq = indexSeqs.get(key);
if (indexSeq == null) {
indexSeqs.putIfAbsent(key, new AtomicPositiveInteger(-1));
indexSeq = indexSeqs.get(key);
}
length = nonZeroWeightedInvokers.size();
AtomicInteger atomicCount=new AtomicInteger(1);

//每个最大循环次数:invoker.size()
while (true) {
int index = indexSeq.incrementAndGet() % length;
int currentWeight;
if (index == 0) {
currentWeight = sequence.incrementAndGet() % maxWeight;
log.info("index:{},需要重新计算,currentWeight:{}", index, currentWeight);
} else {
currentWeight = sequence.get() % maxWeight;
}

log.info("index:{},currentWeight:{}", index, currentWeight);
atomicCount.incrementAndGet();

if (NoahLeastActiveLoadBalance.getOriginalWeight(nonZeroWeightedInvokers.get(index), invocation) > currentWeight) {
log.info("sequences:{},选中的端口:{},循环次数:{}", sequence.get(), nonZeroWeightedInvokers.get(index).getUrl().getPort(), atomicCount.get());
log.info("==========over========");
return nonZeroWeightedInvokers.get(index);
}
}
}
// Round robin,所有的invoker权重相同,则进行普通轮训即可
return invokers.get(sequence.getAndIncrement() % length);
}

执行结果:

加权轮训优化-1

但是本次的优化,还是有点不足,我们可以看到,前五次的请求都请求到20881端口上了,不够平滑。我们继续看大佬们怎么优化的。

RoundRobinLoadBalance-nginx

通过issue我们可以直接知道,这个大佬是借鉴了nginx的。平滑加权轮训。

image-20220517091002921

我们直接上代码:

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 获取url到weightRoundRobin映射表,如果为空,则创建一个新的
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
int weight = NoahLeastActiveLoadBalance.getWeight(invoker, invocation);
//检测当前Invoker是否有相应的WeightedRoundRobin,没有则创建
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
WeightedRoundRobin wrr = new WeightedRoundRobin();
// 设置Invoker 权重
wrr.setWeight(weight);
return wrr;
});
// Invoker 权重不等于weightedRoundRobin中保存的权重,说明权重变化,进行更新
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
// CAS操作 cur = current+weight
long cur = weightedRoundRobin.increaseCurrent();
// 设置 lastUpdate,表示近期更新过
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
// 将具有最大current 权重的Invoker赋值给selectedInvoker
selectedInvoker = invoker;
// 将Invoker对应的weightedRoundRobin赋值给selectedWRR,后面用到
selectedWRR = weightedRoundRobin;
}
//计算权重之和
totalWeight += weight;
}
// 如果Invoker个数与ConcurrentMap<String, WeightedRoundRobin>的key个数不一致
if (invokers.size() != map.size()) {
//去除长时间未被更新的节点(60s)
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
if (selectedInvoker != null) {
// CAS操作,将最大权重的Invoker减去权重总和
selectedWRR.sel(totalWeight);
// 返回具有最大current的Invoker
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}

大白话说下,代码执行过程。

每台服务器有2个weight,一个是配置的weight不会变化,另一个是currentWeight会动态调整的,初始值为0。当有新的请求进来时候,遍历服务器列表,让它的currentWeight加上自身权重,找到最大的currentWeight。然后减去权重总和,等到新一轮计算的currentWeight。

代码执行结果:

nginx-加权轮训

我们分析一下:前面2次的计算过程

第一次计算:

nginx-加权轮训-1

第二次的计算:

nginx-加权轮训-2

然后我们看下是怎么平滑的:

平滑-nginx加权轮训

最后我觉得非常神奇,虽然我理解了每一步的计算过程,但是我不知道为什么最终形成了一个闭环。