Fix Queue OOM - MemorySafeLinkedBlockingQueue
How to fix queue OOM
今天跟大家分享一个在dubbo-Github看到一个很棒的pr:add MemorySafeLinkedBlockingQueue
看到MemorySafe queue我顿时就充满了兴趣了。接下来听我细细道来。
上面是阿里巴巴的P3C规范。都不建议使用Executors
来创建线程池,因为可能导致OOM。老八股文了。
接下来我跟大家介绍什么是MemoryLimitedLinkedBlockingQueue
和MemorySafeLinkedBlockingQueue
,我们统一下语言:MemoryLimitedLinkedBlockingQueue=MemoryLimitedLBQ,MemorySafeLinkedBlockingQueue=MemorySafeLBQ。
What is MemorySafeLinkedBlockingQueue
MemorySafeLinkedBlockingQueue PR link
通过dragon-zhang
大佬提交的pr描述,我们可以知道:
可以完美解决
LinkedBlockingQueue
的OOM问题,并且不依赖Instrumentation
。并且比MemoryLimitedLinkedBlockingQueue
更好用。
通过上面的描述,我们还需要了解2个知识点。
- 什么是
MemoryLimitedLinkedBlockingQueue
- 什么是
Instrumentation
通过本次的PR链接,我们可以看到主要新增了2个文件,并且还帮我们写好了单元测试(直接拿来用就可以了)
public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> { |
我们通过上面的源码,在我们往MemorySafeLBQ队列新增元素之前,需要先调用hasRemainedMemory
。需要满足条件才能往队列添加元素。
通过上图我们需要到MemoryLimitCalculator#maxAvailable(获取当前JVM中剩余可用内存)
> 特定值maxFreeMemory
。
我们也可以看到此配置默认值是256MB,换句话说:当前JVM中剩余可用内存大于256MB,才可以往队列增加元素。
获取当前JVM中剩余可用内存:我们通过上面的图可以很容易得知【第一点】,直接调用该方法:Runtime.getRuntime().freeMemory()
。但是在Github上面是经过了激烈讨论才得出的结论。
然后启动一个定时刷新线程,刷新【第一点】的值。最后我们需要实现优雅关闭线程池,需要注册一个shutdownHook。严谨~
到此我们就实现了能够解决OOM的queue。是不是很简单,接下来我们写个单元测试。
第一步,我们声明要拥有所有的内存才允许插入,第一次offer的时候返回的是false。
当我们设置足够多的内存的时候,第二次返回true。
好了~到此我们了解完了MemorySafeLBQ,但是一开始提出的2个问题我们还是没回答,我们接下来进行:
对MemoryLimitedLBQ的了解。
What is MemoryLimitedLinkedBlockingQueue
MemoryLimitedLinkedBlockingQueue PR link
MemoryLimitedLBQ也是大佬提交的解决queue的OOM问题的第一个版本。
是比MemorySafeLBQ稍早的一个产物,是另外一个思路解决OOM。
MemoryLimitedLBQ从构造函数,我们可以得知,当我们初始化该队列的时候,需要传入
- inst:用于初始化
memoryLimiter
的参数,本质是计算对象暂用内存大小的。 - memoryLimit:限制该队列能够容纳最大的内存值。
那接下来我们先重点看下MemoryLimiter
的实现:
从该类的字段属性,我们可以看出:他长得”很像”LinkedBlockingQueue
。思想:通过两把锁控制一个变量的增加或者减少,并且支持阻塞和非阻塞的增加/减少变量。
我们看个put元素的例子,看LinkedBlockingQueue
和MemoryLimiter
是怎么长得”很像”的?
这里想展开说一个bug。在红色框第164行,获取队列memory已经使用的内存的时候,导致的死循环问题。
我们假设,我们限制的内存是100,已经使用的内存是80,此时我们假设线程A进入accquireInterruptibly
,此时线程A的对象大小是30,因为80+30>100,所以线程A挂起等待。
过了一段时间,线程B释放了40内存。并且唤醒了线程A
- 此时线程A如果是memory.sum(),获取的是:80-40=40
- 此时线程A如果是memory,获取的是:80
如果是第二种实现,此时还是80+30>100,的这种就有出现死循环了!!!
接下来,我们来讲讲释放内存的操作。
- 获取释放锁,格式化指定等待超时时间
- 如果当前内存使用量<=0,等待指定超时,还是不满足跳出
- 获取当前对象e的内存占用量,memory扣减e内存暂用量
- 如果memory的内存占用量>0,则通知非空的等待线程
- 最后释放锁
- 如果释放后内存占用量<限制值,则通知等待不要限制的线程
接下来,我们把Instrumentation#getObjectSize攻克完,就能完全理解MemoryLimitedLBQ了。
Instrumentation#getObjectSize的注释,获取对象在JVM中占用内存的近似值。
public class MemoryLimitedLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> { |
接下来我们看下MemoryLimitedLBQ,是怎么增加/移除元素的。
实现很容易,直接重载put/take方法,调用memoryLimiter
相关的方法。