发布于 

Dubbo loadbalance(1) - LeastActive

LeastActiveLoadBalance

本文源码解析,dubbo最少活跃优先 + 加权随机的负载均衡。

但是不仅仅只是介绍源码,更多的跟大家分享我读源码的方式,和要怎么读源码。

栗子搭建

前置说明下,因为我想引导大家对LeastActiveLoadBalance这个负载策略有更加深入的印象,我打算从旧版本的LeastActiveLoadBalance复制源码出来一行行带大家走读,自定义负载均衡策略。但是跟原来的dubbo版本的LeastActiveLoadBalance源码一致,只是加上了自己的注释。

在贴代码之前跟大家分享一下,关于dubbo版本的额外知识。就是关于版本意识,这点很重要!!!

划重点

如果你想研究dubbo-2.6.x(包含)之前的,只能引入com.alibaba包下的。大于dubbo-2.7.x(包含)必须要引org.apache.dubbo包下的。本文使用的依赖是:

<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.8</version>
</dependency>

maven链接如下:apachealibaba

接下来我自定义了一个跟dubbo(2.6.x)源码一样的NoahLeastActiveLoadBalance(最少活跃优先 + 加权随机的负载均衡)

package com.noah.dubbo.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcStatus;
import org.apache.dubbo.rpc.cluster.Constants;
import org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Random;

import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_SERVICE_REFERENCE_PATH;
import static org.apache.dubbo.rpc.cluster.Constants.*;

/**
* LeastActiveLoadBalance,客户端的负载均衡
* dubbo2.6.0版本的实现
* <p>
* {@link LeastActiveLoadBalance}
*/
@Component
public class NoahLeastActiveLoadBalance extends AbstractLoadBalance {

public static final String NAME = "leastactive";

private final Random random = new Random();

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, org.apache.dubbo.common.URL url, Invocation invocation) {

//服务提供者数量
int length = invokers.size(); // Number of invokers

//最小活跃次数,活跃次数一定是>=0的数,设置为-1,就一定会被if给替换了
int leastActive = -1; // The least active value of all invokers

//具有相同最小活跃次数的invoker的数量
int leastCount = 0; // The number of invokers having the same least active value (leastActive)

//具有相同最小活跃数的invoker数组下标
int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)

//最小活跃数的权重之和
int totalWeight = 0; // The sum of weights

//记录最小活跃数的权重,和判断具有相同的最小活跃数的权重是否相同
int firstWeight = 0; // Initial value, used for comparision

//具有相同最小活跃数的invoker,权重是否相同
boolean sameWeight = true; // Every invoker has the same weight value?

//遍历invoker列表,获取当前连接数和配置的权重
for (int i = 0; i < length; i++) {

Invoker<T> invoker = invokers.get(i);

//问题0:active为什么返回0
//返回的是invoker的活跃数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number

//问题1:1-1:应该要返回预热后的权重。1-2:不应该重复计算,使用weight[]来存储
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight


if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.

//初始化或者找到一个活跃数更小invoker
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexs[0] = i; // Reset
totalWeight = weight; // Reset
firstWeight = weight; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.

//具有相同活跃数的invoker,累加他们的权重
leastIndexs[leastCount++] = i; // Record index number of this invoker
totalWeight += weight; // Add this invoker's weight to totalWeight.

//问题2:优化判断i>0
// If every invoker has the same weight?
if (sameWeight && i > 0 && weight != firstWeight) {
sameWeight = false;
}
}
}

//最小活跃数invoker只有一个,直接返回
// assert(leastCount > 0)
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexs[0]);
}

//加权随机实现:具备相同最小活跃数的不同权重invoker列表获取
if (!sameWeight && totalWeight > 0) {

//问题3:使用ThreadLocalRandom.current()替换
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offsetWeight = random.nextInt(totalWeight);

// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {

int leastIndex = leastIndexs[i];
//等价问题1
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);

//问题4:<=0可能没有遍历完数组就返回了,导致最后一个invoker没有机会被选到。栗子:5,2,1。offsetWeight=7
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}

//随机返回一个invoker
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}

/**
* 获取预热后invoker的权重
*
* @param invoker
* @param invocation
* @return
*/
int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight;
URL url = invoker.getUrl();
// Multiple registry scenario, load balance among multiple registries.
if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
} else {
weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
if (weight > 0) {

//获取启动时间戳
long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);

if (timestamp > 0L) {

//获取应用启动了多久
long uptime = System.currentTimeMillis() - timestamp;
if (uptime < 0) {
return 1;
}

//获取预热时间:默认值为10min
int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);

//启动时间<预热时间,重新计算权重
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight((int) uptime, warmup, weight);
}
}
}
}
return Math.max(weight, 0);
}

/**
* 获取还没到预热时间的权重
*
* @param uptime
* @param warmup
* @param weight
* @return
*/
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
//除于一个数等于乘以一个数的倒数,所以:(uptime/warmup)*weight
int ww = (int) (uptime / ((float) warmup / weight));
return ww < 1 ? 1 : (Math.min(ww, weight));
}
}

读到这里肯定有些老板们肯定有疑惑,你为什么不直用dubbo-2.6.x的依赖呢?还要自己写一个自定义负载均衡器。最重要的一个原因是因为我懒,因为有一个现成的代码框架了。第二点呢,是把源码贴出来,方便加注释。

接下来继续我们栗子的搭建,我们需要创建一个api模块,consumer模块,provider模块。相信各位老板们那么优秀,很容易就能搭建出来。后面会把源码和github地址给出。

如果老板,你问为什么要有三个模块,别问,问就是dubbo官网栗子都是这样搭建的。贴图:

image-20220502220005378

需要说明下注意点:

  1. 自定义负载均衡,要在META-INF.dubbo目录下创建一个文件:org.apache.dubbo.rpc.cluster.LoadBalance,因为这是dubbo-spi定义的规范,会描述这些文件。

image-20220502220449971

noahleastactiveloadbalance=com.noah.dubbo.loadbalance.NoahLeastActiveLoadBalance
  1. 额外技能科普,因为我们需要启动多个provider,我们可以直接在idea如图直接配置,还能设置不同weight,后面会重点介绍。

image-20220502221311639

接下来我们需要做一件非常重要的事情,就是下载dubbo的源码并且导入到idea中,接下来会用到。

地址:dubbo-github

好了,到这里我们栗子的环境搭建准备的差不多了。如果老板们想偷懒,后面会贴出源码和github地址。进入到我们的栗子演示环境。

栗子演示

我们启动三个provider,分别配置权重为默认,200,300。

具体代码配置如下:

//最少链接负载均衡策略

//8081端口配置
@DubboService(timeout = 10 * 60 * 1000, loadbalance = "noahleastactiveloadbalance")

//8082端口配置
//@DubboService(timeout = 10 * 60 * 1000, loadbalance = "noahleastactiveloadbalance", weight = 200)

//8083端口配置
//@DubboService(timeout = 10 * 60 * 1000, loadbalance = "noahleastactiveloadbalance", weight = 300)
@Slf4j
public class HelloServiceImpl implements HelloService {

/**
* 服务提供者:sleep 10min,保持链接
*
* @param name
* @return
*/
@Override
public String greeting(String name) {
log.info("log info for greeting" + name);
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello " + name;
}
}

服务消费者,先直接异步20个请求到三个provider。我们用第21个请求来debug我们源码,来解决我们一个个的问题。

@Test
public void contextLoads() {
for (int i = 0; i < 20; i++) {
int fi = i;
new Thread(() -> {
helloService.greeting("noahTest-" + fi);
}).start();
}

helloService.greeting("noahTest-" + 100);
}

实验结果我们可以看到:默认权重正在执行了3个请求,200权重正在执行9个请求,300权重正在执行8个请求。

image-20220503113005663

image-20220503113020984

image-20220503113033737

接着我们放行第21个请求,进入我们debug的断点,我们可以看到。

image-20220503113850409

在1号标记处,为什么链接数是0呢。在标记处为2的地方,告诉了我们默认权重为100(因为20880端口是启动在我们8081端口上的),因此我们顺利解决问题0。

image-20220503113750663

但是最终第21个请求没有到最少连接数的provier-8081端口节点上。而是直接到8083端口的provide上。

为什么负载均衡失效了呢?接下来开始解决我们问题0:active为什么返回0。其实这时候我们上网随便一搜都能得到答案:最少活跃数负载均衡算法必须配合ActiveLimitFilter使用。其实这里我们跟着源码看下调用链路也很容易知道。需要在客户端增加@DubboReference(filter = "activelimit")

image-20220503115212150

到此为止,我们的栗子搭建和演示都完成了。进入我们喜闻乐见的代码分析阶段。

源码分析

  1. 问题0:active总是返回0,因为我们没有在消费端配置filter为:activelimit
  2. 问题1:应该要返回预热的权重,因为加权随机也要用到该wegiht。我们认真讨论下加权随机的代码:

image-20220503164320097

image-20220503163524086

image-20220503164426672

区间(权重)越大的服务器,就越可能会承担这个请求

在AbstractLoadBalance类中提到了一个预热的概念。官网中是这样的介绍该功能的:

权重的计算过程主要用于保证当服务运行时长小于服务预热时间时,对服务进行降权,避免让服务在启动之初就处于高负载状态。服务预热是一个优化手段,与此类似的还有 JVM 预热。主要目的是让服务启动后“低功率”运行一段时间,使其效率慢慢提升至最佳状态。

  1. 问题3:通过我们前面准备的dubbo源码的idea,可以看到该类不断演进和优化的过程。可以学习到的东西可太多了。三步走,我们可以看到对应提交的pr。

image-20220503171244389

  1. 问题4:make loadbalance robust,同问题3,可以看到不断优化的过程。

撒花

好了,到这里我们把源码都过了一遍了。我们再来总结下最小活跃数负载均衡算法的实现:

  1. 遍历 invokers 列表,寻找活跃数最小的 Invoker
  2. 如果有多个 Invoker 具有相同的最小活跃数,此时记录下这些 Invoker 在 invokers 集合中的下标,并累加它们的权重,比较它们的权重值是否相等
  3. 如果只有一个 Invoker 具有最小的活跃数,此时直接返回该 Invoker 即可
  4. 如果有多个 Invoker 具有最小活跃数,且它们的权重不相等,此时处理方式和 RandomLoadBalance 一致
  5. 如果有多个 Invoker 具有最小活跃数,但它们的权重相等,此时随机返回一个即可

最小活跃数

最小活跃数负载均衡全称:有最小活跃数用最小活跃数,没有最小活跃数根据权重选择,权重一样则随机返回的负载均衡算法。