发布于 

Fix Queue OOM - MemorySafeLinkedBlockingQueue

How to fix queue OOM

今天跟大家分享一个在dubbo-Github看到一个很棒的pr:add MemorySafeLinkedBlockingQueue

看到MemorySafe queue我顿时就充满了兴趣了。接下来听我细细道来。

image-20220731222350825

上面是阿里巴巴的P3C规范。都不建议使用Executors来创建线程池,因为可能导致OOM。老八股文了。

接下来我跟大家介绍什么是MemoryLimitedLinkedBlockingQueueMemorySafeLinkedBlockingQueue,我们统一下语言:MemoryLimitedLinkedBlockingQueue=MemoryLimitedLBQ,MemorySafeLinkedBlockingQueue=MemorySafeLBQ。

What is MemorySafeLinkedBlockingQueue

MemorySafeLinkedBlockingQueue PR link

image-20220809112544065

通过dragon-zhang大佬提交的pr描述,我们可以知道:

可以完美解决LinkedBlockingQueue的OOM问题,并且不依赖Instrumentation。并且比MemoryLimitedLinkedBlockingQueue更好用。

通过上面的描述,我们还需要了解2个知识点。

  1. 什么是MemoryLimitedLinkedBlockingQueue
  2. 什么是Instrumentation

image-20220809113316211

通过本次的PR链接,我们可以看到主要新增了2个文件,并且还帮我们写好了单元测试(直接拿来用就可以了)

public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

public static final long serialVersionUID = 42L;

/**
* 默认值,只有JVM可用内存>该值才可以往队列put/offer
*/
public static int THE_256_MB = 256 * 1024 * 1024;

/**
* 满足剩余总的可用内存大于该(特定值)
*/
private int maxFreeMemory;

public MemorySafeLinkedBlockingQueue() {
this(THE_256_MB);
}

public MemorySafeLinkedBlockingQueue(int maxFreeMemory) {
super(Integer.MAX_VALUE);
this.maxFreeMemory = maxFreeMemory;
}

public MemorySafeLinkedBlockingQueue(int capacity, int maxFreeMemory) {
super(capacity);
this.maxFreeMemory = maxFreeMemory;
}

public MemorySafeLinkedBlockingQueue(Collection<? extends E> c, int maxFreeMemory) {
super(c);
this.maxFreeMemory = maxFreeMemory;
}

public void setMaxFreeMemory(int maxFreeMemory) {
this.maxFreeMemory = maxFreeMemory;
}

public boolean hasRemainedMemory() {
return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
}

@Override
public void put(E e) throws InterruptedException {
if (hasRemainedMemory()) {
super.put(e);
}
}

@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

return hasRemainedMemory() && super.offer(e, timeout, unit);
}

@Override
public boolean offer(E e) {
return hasRemainedMemory() && super.offer(e);
}
}

我们通过上面的源码,在我们往MemorySafeLBQ队列新增元素之前,需要先调用hasRemainedMemory。需要满足条件才能往队列添加元素。

image-20220809114910423

通过上图我们需要到MemoryLimitCalculator#maxAvailable(获取当前JVM中剩余可用内存) > 特定值maxFreeMemory

我们也可以看到此配置默认值是256MB,换句话说:当前JVM中剩余可用内存大于256MB,才可以往队列增加元素。

image-20220809115738994

获取当前JVM中剩余可用内存:我们通过上面的图可以很容易得知【第一点】,直接调用该方法:Runtime.getRuntime().freeMemory()。但是在Github上面是经过了激烈讨论才得出的结论。

然后启动一个定时刷新线程,刷新【第一点】的值。最后我们需要实现优雅关闭线程池,需要注册一个shutdownHook。严谨~

到此我们就实现了能够解决OOM的queue。是不是很简单,接下来我们写个单元测试。

image-20220809161817101

第一步,我们声明要拥有所有的内存才允许插入,第一次offer的时候返回的是false。

当我们设置足够多的内存的时候,第二次返回true。

好了~到此我们了解完了MemorySafeLBQ,但是一开始提出的2个问题我们还是没回答,我们接下来进行:

对MemoryLimitedLBQ的了解。

What is MemoryLimitedLinkedBlockingQueue

MemoryLimitedLinkedBlockingQueue PR link

image-20220809163843777

MemoryLimitedLBQ也是大佬提交的解决queue的OOM问题的第一个版本。

是比MemorySafeLBQ稍早的一个产物,是另外一个思路解决OOM。

image-20220809164602784

MemoryLimitedLBQ从构造函数,我们可以得知,当我们初始化该队列的时候,需要传入

  1. inst:用于初始化memoryLimiter的参数,本质是计算对象暂用内存大小的。
  2. memoryLimit:限制该队列能够容纳最大的内存值。

那接下来我们先重点看下MemoryLimiter的实现:

image-20220809165803494

从该类的字段属性,我们可以看出:他长得”很像”LinkedBlockingQueue。思想:通过两把锁控制一个变量的增加或者减少,并且支持阻塞和非阻塞的增加/减少变量。

我们看个put元素的例子,看LinkedBlockingQueueMemoryLimiter是怎么长得”很像”的?

image-20220809171005133

这里想展开说一个bug。在红色框第164行,获取队列memory已经使用的内存的时候,导致的死循环问题。

Snipaste_2022-08-09_17-18-43

我们假设,我们限制的内存是100,已经使用的内存是80,此时我们假设线程A进入accquireInterruptibly,此时线程A的对象大小是30,因为80+30>100,所以线程A挂起等待。

过了一段时间,线程B释放了40内存。并且唤醒了线程A

  1. 此时线程A如果是memory.sum(),获取的是:80-40=40
  2. 此时线程A如果是memory,获取的是:80

如果是第二种实现,此时还是80+30>100,的这种就有出现死循环了!!!

关于这个死循环我们可以点击下这个issues

image-20220809173853091

接下来,我们来讲讲释放内存的操作。

  1. 获取释放锁,格式化指定等待超时时间
  2. 如果当前内存使用量<=0,等待指定超时,还是不满足跳出
  3. 获取当前对象e的内存占用量,memory扣减e内存暂用量
  4. 如果memory的内存占用量>0,则通知非空的等待线程
  5. 最后释放锁
  6. 如果释放后内存占用量<限制值,则通知等待不要限制的线程

image-20220809175759263

接下来,我们把Instrumentation#getObjectSize攻克完,就能完全理解MemoryLimitedLBQ了。

Instrumentation#getObjectSize的注释,获取对象在JVM中占用内存的近似值。

public class MemoryLimitedLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
/**
* 阻塞加入队列
*
* @param e
* @throws InterruptedException
*/
@Override
public void put(E e) throws InterruptedException {
memoryLimiter.acquireInterruptibly(e);
super.put(e);
}

/**
* 阻塞移除元素
*
* @return
* @throws InterruptedException
*/
@Override
public E take() throws InterruptedException {

final E take = super.take();
memoryLimiter.releaseInterruptibly(take);
return take;
}
}

接下来我们看下MemoryLimitedLBQ,是怎么增加/移除元素的。

实现很容易,直接重载put/take方法,调用memoryLimiter相关的方法。