Netty 怎样自动探测内存泄露的发生

打印 上一主题 下一主题

主题 868|帖子 868|积分 2604

本文基于 Netty 4.1.112.Final 版本举行讨论
本文是 Netty 内存管理系列的最后一篇文章,在第一篇文章 《聊一聊 Netty 数据搬运工 ByteBuf 体系的设计与实现》 中,笔者以 UnpooledByteBuf 为例,从整个内存管理的外围对 ByteBuf 的整个设计体系举行了详细的拆解分析,随后在第二篇文章 《谈一谈 Netty 的内存管理 —— 且看 Netty 怎样实现 Java 版的 Jemalloc》 中,笔者又带大家深入到 Netty 内存池的内部,对整个池化内存的管理举行了详细拆解。
不知大家有没有注意到,无论好坏池化内存 —— UnpooledByteBuf 的分配还是池化内存 —— PooledByteBuf 的分配,最后都会被 Netty 包装成一个 LeakAwareBuffer 返回。
  1. public final class UnpooledByteBufAllocator {
  2.     @Override
  3.     protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  4.         final ByteBuf buf;
  5.         if (PlatformDependent.hasUnsafe()) {
  6.             buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
  7.                     new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
  8.         } else {
  9.             buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  10.         }
  11.         // 是否启动内存泄露探测,如果启动则额外用 LeakAwareByteBuf 进行包装返回
  12.         return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
  13.     }
  14. }
复制代码
  1. public class PooledByteBufAllocator {
  2.     // 线程本地缓存
  3.     private final PoolThreadLocalCache threadCache;
  4.     @Override
  5.     protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  6.         // 获取线程本地缓存,线程第一次申请内存的时候会在这里与 PoolArena 进行绑定
  7.         PoolThreadCache cache = threadCache.get();
  8.         // 获取与当前线程绑定的 PoolArena
  9.         PoolArena<ByteBuffer> directArena = cache.directArena;
  10.         final ByteBuf buf;
  11.         if (directArena != null) {
  12.             // 从固定的 PoolArena 中申请内存
  13.             buf = directArena.allocate(cache, initialCapacity, maxCapacity);
  14.         } else {
  15.             // 申请非池化内存
  16.             buf = PlatformDependent.hasUnsafe() ?
  17.                     UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
  18.                     new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  19.         }
  20.         // 如果内存泄露探测开启,则用 LeakAwareByteBuf 包装 PooledByteBuf 返回
  21.         return toLeakAwareBuffer(buf);
  22.     }
  23. }
复制代码
笔者之前曾提到过,相比于 JDK DirectByteBuffer 需要依靠 GC 机制来释放其背后引用的 Native Memory , Netty 更倾向于手动及时释放 DirectByteBuf 。由于 JDK DirectByteBuffer 的释放需要等到 GC 发生,由于 DirectByteBuffer 的对象实例所占的 JVM 堆内存太小了,所以一时很难触发 GC , 这就导致被引用的 Native Memory 的释放有了一定的延迟,严重的情况会越积越多,导致 OOM 。而且也会导致进程中对 DirectByteBuffer 的申请操纵有非常大的延迟。
而 Netty 为了避免这些情况的出现,选择在每次利用完之后手动释放 Native Memory ,但是不依靠 JVM 的话,总会有内存泄露的情况,比如在利用完了 ByteBuf 却忘记调用 release() 方法释放。
手动释放虽然及时可控,但是却很容易出现内存泄露。Netty 为了应对内存泄露的发生,从而引入了 LeakAwareBuffer,从命名上就可以看出,LeakAwareBuffer 重要是为了识别出被其包装的 ByteBuf 是否有内存泄露情况的发生。
现在大家是不是对这个 LeakAwareBuffer 非常的好奇,它毕竟拥有怎样的魔力,居然可以或许自动探测内存泄露,但现在我们先把 LeakAwareBuffer 丢在一边,先不消管它,由于它只是 ByteBuf 一个简单的套壳,背后真正核心的是与内存泄露相关的一些探测模型设计,所以笔者决定先从最核心的设计原理开始谈起~~~

1. 内存泄露探测的设计原理

首先我们来看第一个核心的问题,我们毕竟该选择一个什么样的机遇来对内存泄露举行探测 ?
正在利用的内存肯定不能算是泄露,别管我已经斲丧了多么大的内存,但这些内存确实是正在利用的,你不能说我是内存泄露对吧。当我不需要这些内存了,但仍旧继续持有着不释放,这种情况,我们才华定义为内存泄露。
所以当内存不再被利用的时间,才是我们举行内存泄露探测的机遇,而正在利用的内存,压根就没有内存泄露,自然也不需要举行探测,那么接下来的问题就是,我们怎样判断某一块内存是正在被利用的 ? 还是已经不在被利用了 ?
那肯定得靠 GC 啊!对吧。当一个 DirectByteBuf 已经没有任何强引用或者软引用的时间,那就说明它已经不在被利用了,GC 就会回收它。当它还存在强引用或者软引用的时间,说明它还在被利用,那么 GC 就不会回收它。
但是内存泄露探测的功能是在 JVM 之外实现的,JVM 不会意识到我们到底想要干嘛,它只管无脑回收 DirectByteBuf,对于 DirectByteBuf 背后引用的 Native Memory 是否发生泄露,JVM 压根就不会 Care 。
看上去靠 GC 是靠不住了,但如果我们可以或许在 DirectByteBuf 被 GC 的时间得到一个 JVM 的通知,然后在这个通知中,触发内存泄露的探测,是不是就可以了 ?那我们怎样得到这个通知呢 ?
还记不记得笔者在 《以 ZGC 为例,谈一谈 JVM 是怎样实现 Reference 语义的》 一文中介绍的 WeakReference 和 PhantomReference 以及 FinalReference ? 它们都可以拿到这个通知。
比如 JDK 中的 DirectByteBuffer ,其背后引用的 Native Memory 的回收需要依靠 Cleaner 机制,而 Cleaner 就是一个 PhantomReference 对象。
  1. public class Cleaner extends PhantomReference<Object>
复制代码

Cleaner 虚引用了 DirectByteBuffer,这样一来当这个 DirectByteBuffer 没有任何强引用或者软引用的时间,也就是不会再被利用了,后面就会被 GC 回收掉,与此同时 JVM 会将它的虚引用 Cleaner 放入 JVM 内部一个叫做 _reference_pending_list 的链表中。
随后 JVM 会唤醒 JDK 中的 1 号线程 —— ReferenceHandler。
  1.         Thread handler = new ReferenceHandler(tg, "Reference Handler");
  2.         // 设置 ReferenceHandler 线程的优先级为最高优先级
  3.         handler.setPriority(Thread.MAX_PRIORITY);
  4.         handler.setDaemon(true);
复制代码
ReferenceHandler 线程会从 JVM 的 _reference_pending_list 中挨个将全部的 Cleaner 摘下,调用它的 clean() 方法,最终在 Deallocator 中释放 Native Memory 。
  1.   private static class Deallocator implements Runnable {
  2.         public void run() {   
  3.             // 底层调用 free 来释放 native memory
  4.             UNSAFE.freeMemory(address);
  5.         }
  6.   }
复制代码

再比如 Netty 内存池中的线程本地缓存 PoolThreadCache,其背后缓存的池化 Native Memory 的回收依靠的是 Finalizer 机制。
  1.     private static final class FreeOnFinalize {
  2.         // 待释放的 PoolThreadCache
  3.         private volatile PoolThreadCache cache;
  4.         private FreeOnFinalize(PoolThreadCache cache) {
  5.             this.cache = cache;
  6.         }
  7.         @Override
  8.         protected void finalize() throws Throwable {
  9.             try {
  10.                 super.finalize();
  11.             } finally {
  12.                 PoolThreadCache cache = this.cache;
  13.                 this.cache = null;
  14.                 // 当 FreeOnFinalize 实例要被回收的时候,触发 PoolThreadCache 的释放
  15.                 if (cache != null) {
  16.                     cache.free(true);
  17.                 }
  18.             }
  19.         }
  20.     }
复制代码
FreeOnFinalize 的作用重要就是为了回收 PoolThreadCache , 内部重写了 finalize() 方法,JVM 会为其创建一个 Finalizer 对象(FinalReference 范例),Finalizer 引用了 FreeOnFinalize ,但这种引用关系是一种 FinalReference 范例。
  1. final class Finalizer extends FinalReference<Object> {
  2.     private static ReferenceQueue<Object> queue = new ReferenceQueue<>();
  3.     private Finalizer(Object finalizee) {
  4.         // 这里的 finalizee 就是 FreeOnFinalize 对象,被 FinalReference 引用
  5.         super(finalizee, queue);
  6.               ......
  7.     }
  8. }
复制代码

Finalizer 中有一个全局的 ReferenceQueue,这个 ReferenceQueue 非常的重要,由于 JVM 中的 _reference_pending_list 是属于 JVM 内部的,除了 ReferenceHandler 线程,别的普通的 Java 线程是访问不了的,所以我们要想在 JVM 的外部处理这些 Reference(其引用的对象已经被回收),就需要用到一个外部队列,这个外部队列就是 Finalizer 中的 ReferenceQueue。
  1.    Reference(T referent, ReferenceQueue<? super T> queue) {
  2.         // FreeOnFinalize 对象
  3.         this.referent = referent;
  4.         //  Finalizer 中的 ReferenceQueue 实例(全局)
  5.         this.queue = (queue == null) ? ReferenceQueue.NULL : queue;
  6.     }
复制代码

之所以这里采用 Set 结构就是为了实现 “引用计数是否为 0” 的这层语义,那么怎样实现呢 ?
Netty 在分配一个 DirectByteBuf 的同时也会创建一个 DefaultResourceLeak 对象来弱引用这个 DirectByteBuf,随后会将这个 DefaultResourceLeak 对象放入到 allLeaks 集合中。
当我们利用完 DirectByteBuf 并调用 release() 方法释放其 Native Memory 的时间,如果它的引用计数为 0 ,那么 Netty 就会将它的 DefaultResourceLeak 对象从 allLeaks 集合中删除。
如果我们利用完 DirectByteBuf 忘记调用 release() 方法,那么它的引用计数就会一直大于 0 ,同时也意味着它对应的 DefaultResourceLeak 对象会一直停顿在 allLeaks 集合中。
从另一个层面上来说,只要是停顿在 allLeaks 集合中的 DefaultResourceLeak 对象,那么被其弱引用的 DirectByteBuf 的引用计数一定是大于 0 的。
当这个 DirectByteBuf 给 GC 回收之后,JVM 会将其对应的  DefaultResourceLeak 插入到 _reference_pending_list 中,随后 ReferenceHandler 线程会再一次将 DefaultResourceLeak 对象从 _reference_pending_list 中转移到 ReferenceQueue 中。
当某一个普通的 Java 线程在向 Netty 申请 DirectByteBuf 的时间,这个申请内存的线程就会顺带到 ReferenceQueue 中检察一下是否有 DefaultResourceLeak 对象,如果有,那么就证明被其弱引用的 DirectByteBuf 已经被 GC 了。
紧接着,就会检察这个 DefaultResourceLeak 对象是否仍旧停顿在 allLeaks 集合中 ,如果还在,那么就说明 DirectByteBuf 背后的 Native Memory 仍旧没有被释放,这样一来 Netty 就探测到了内存泄露的发生。
好了,现在我们已经清楚了 Netty 内存泄露探测的核心设计原理,那么下面的内容就很简单了,我们把视角在切换一下,从内存泄露探测的内部在转换到外部,站在应用的角度再来从团体上完整地看一下整个内存泄露探测机制。
2. Netty 的内存泄露探测机制

从总体上来讲,触发内存泄露的探测需要同时满足以下五个条件:

  • 应用必须开启内存泄露探测功能。
  • 必须要等到 ByteBuf 被 GC 之后,内存泄露才华探测的到,如果 GC 一直没有触发,那么即使是 ByteBuf 没有任何强引用或者软引用了,内存泄露的探测也将无从谈起。
  • 当 GC 发生之后,必须是要等到下一次分配内存的时间,才会触发内存泄露的探测。如果没有内存申请的举动发生,那么内存泄露的探测也不会发生。
  • Netty 并不会探测每一个 ByteBuf 的泄露情况,而是根据一定的采样间隔,举行采样探测。所以要想触发内存泄露的探测,还需要达到一定的采样间隔。
  • 应用的日记级别必须开启 Error  级别,由于内存泄露的报告,Netty 是以 Error  级别的日记打印出来的,如果日记级别在 Error   以下,那么内存泄露的报告则无法输出。
除此之外,Netty 还为内存泄露的探测设置了四种级别:
  1.         Thread finalizer = new FinalizerThread(tg);
  2.         finalizer.setPriority(Thread.MAX_PRIORITY - 2);
  3.         finalizer.setDaemon(true);
  4.         finalizer.start();
复制代码
我们可以通过 JVM 参数 -Dio.netty.leakDetection.level 为应用设置不同的探测级别,其中 DISABLED 表现禁用内存泄露探测,由于内存泄露探测开启之后,应用对于 ByteBuf 的访问链路会变长,而且 Netty 需要记录 ByteBuf 的创建位置堆栈,以及访问链路堆栈,这样在内存泄露报告中,我们才可以清楚的知道泄露的 ByteBuf 是在哪里创建的,又是在哪里泄露的,它的访问路径有哪些。

而报告中的每一个堆栈在内存中占用 2K 大小,所以内存斲丧还好坏常可观的,所以笔者一般发起在生产环境中,要将 Netty 的内存泄露探测关闭掉。而在测试环境中,则仍旧开启内存泄露探测。
当内存泄露探测开启之后,Netty 为我们提供了三种不同的探测级别,级别越高,斲丧越大,信息也越详细。第一种探测级别是 SIMPLE  , 这也是 Netty 默认的探测级别。
SIMPLE  级别下,Netty 并不会探测每一个 ByteBuf 的泄露情况,而是选择举行采样探测,默认的采样间隔是 128 。
  1. final class DefaultResourceLeak<T> extends WeakReference<Object>
复制代码
我们可以通过 JVM 参数 -Dio.netty.leakDetection.samplingInterval 来设置内存泄露探测的采样间隔。那么 Netty 怎样根据这个采样间隔来决定到底为哪一个具体的 ByteBuf 探测内存泄露呢 ?
事实上,这个探测频率的实现也很简单,在每一次内存申请之后,Netty 都会天生 [ 0 , samplingInterval )  之间的一个随机数,如果这个随机数是 0 ,Netty 将会为本次申请到的 ByteBuf 举行内存泄露探测,如果这个随机数不为 0 ,Netty 将放弃探测。
  1. public class ResourceLeakDetector<T> {
  2.     private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
  3. }
复制代码
从结果上来看,就是每申请 samplingInterval 个 ByteBuf , Netty 就会触发一次内存泄露的探测。
除了受到这个采用频率的限制之外,SIMPLE 级别下的内存泄露报告信息是最少的,只会包罗 ByteBuf 的创建位置,后面针对 ByteBuf 的访问堆栈信息 Netty 就不会跟踪了,也就是日记中的 Recent access records:  信息,在 SIMPLE 级别下是没有的。

ADVANCED 级别和 SIMPLE 级别一样,在这两种探测级别下,Netty 都会选择举行采样探测,而不是为每一个 ByteBuf 举行探测,同样都会受到采样频率的限制。
那么 ADVANCED 毕竟比 SIMPLE 高级在哪里呢 ?SIMPLE 级别只会报告泄露的 ByteBuf 是在哪里创建的, ADVANCED 级别则除了泄露 ByteBuf 的创建位置之外,还会跟踪 ByteBuf 的每一次访问堆栈,也就是下面内存泄露报告日记中的 Recent access records 相关信息。

前面笔者也提过,追踪 ByteBuf 的访问堆栈是需要斲丧非常可观的内存的,对于 ByteBuf 的每一次访问堆栈,如果要记录的话,每个堆栈占用 2K 的内存,堆栈信息 Netty 会记录在一个 TraceRecord 结构中。
如果一个 ByteBuf 被访问了多次,那么就会对应多个 TraceRecord 结构,ByteBuf 的这些 TraceRecord , 被 Netty 构造在对应 DefaultResourceLeak 里的一个栈结构中,位于栈底的 TraceRecord 记录的是 ByteBuf 的创建堆栈,位于栈顶的 TraceRecord 记录的是 ByteBuf 最近一次被访问的堆栈。
  1. public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
  2.    // 引用计数
  3.    private volatile int refCnt;
  4. }
复制代码
由于每个 TraceRecord 中记录的访问堆栈信息占用 2K 的内存,因此无论在什么探测级别下,Netty 都不可能为 ByteBuf 的每一次访问都记录下堆栈信息,所以要对 DefaultResourceLeak 栈中 TraceRecord 的个数举行限制。默认栈中的 TraceRecord 最大个数为 4 , 我们可以通过 -Dio.netty.leakDetection.targetRecords 参数举行调节。
  1. public interface ReferenceCounted {
  2.      int refCnt();
  3.      ReferenceCounted retain();
  4.      boolean release();
  5. }
复制代码
但更加精确的说,targetRecords 只是对栈中的 TraceRecord 个数举行限制,避免无穷的增长,但不会限制死。事实上, 栈中 TraceRecord 个数有一定的概率会超过  targetRecords 的限制。
比如,默认情况下 targetRecords 的值为 4 ,如果我们将栈中 TraceRecord 个数限制成 4 个的话,当一个 ByteBuf 的访问链路很长的话,那么栈中就只能记录前三个最远的 TraceRecord 和一个最近的 TraceRecord。中间的访问堆栈就丢失了。这样不利于我们排查 ByteBuf 的完整泄露路径。
事实上 targetRecords 的真正语义是,当 ByteBuf 的访问堆栈记录 TraceRecord 个数达到 targetRecords 的限定时,Netty 会根据一定的概率来丢弃当前栈顶 TraceRecord,并将新的 TraceRecord 作为栈顶。这个丢弃的概率好坏常高的,从而避免了 TraceRecord 个数疯狂地增长。
但如果恰恰命中了不丢弃的概率(非常低),那么原来栈顶的 TraceRecord 将不会被丢弃而是继续保留在栈中,新的 TraceRecord 作为栈顶加入到栈中,这样一来栈中 TraceRecord 个数就超过了 targetRecords 的限制。但是可以尽可能多的保留 ByteBuf 中间的访问堆栈记录。使得 ByteBuf 的泄露路径更加完整一些。
PARANOID  是 Netty 内存泄露探测的最高级别,信息最全,斲丧也最大,它在 ADVANCED 的基础之上,绕开了采样频率的限制,会对每一个 ByteBuf 举行详细地泄露探测。一般用于需要在测试环境定位告急的内存泄露问题才会开启。
3. 内存泄露探测相关的设计模型

现在我们已经清楚了内存泄露探测的设计原理以及相关应用,那么在本末节中就该正式介绍实现细节了,Netty 一共设计了 4 种探测模型,不同的模型封装不同的探测职责。
3.1 ResourceLeakDetector

首先第一个模型是 ResourceLeakDetector 。顾名思义,它重要负责内存泄露的探测,第一末节中介绍的原理实现,就是在这个模型中完成的。
  1. public class Cleaner extends PhantomReference<Object>
  2. {
  3.     private static Cleaner first = null;
  4.     private Cleaner next = null, prev = null;
  5. }
复制代码
ResourceLeakDetector 中封装了内存泄露探测所需要的全部信息,其中最重要的就是 allLeaks 和 refQueue 这两个集合,allLeaks 重要用于保存全部未被释放的 ByteBuf 对应的弱引用 DefaultResourceLeak,在 ByteBuf 被创建之后,Netty 就会为其创建一个 DefaultResourceLeak 实例来弱引用 ByteBuf,同时这个 DefaultResourceLeak 会被添加到这里的 allLeaks 中。
如果应用程序及时的释放了 ByteBuf , 那么对应的 DefaultResourceLeak 也会从 allLeaks 中删除,如果 ByteBuf 被 GC 之后,其对应的 DefaultResourceLeak 仍旧停顿在 allLeaks 中,那么就说明该 ByteBuf 发生泄露了。

refQueue 重要用于收集被 GC 的 ByteBuf 对应的弱引用 DefaultResourceLeak,当一个 ByteBuf 被 GC 之后,那么其对应的 DefaultResourceLeak 就会被 JVM 放入到一个内部的 _reference_pending_list 中,随后 ReferenceHandler 线程被唤醒,将 DefaultResourceLeak 从 _reference_pending_list 中转移到这里的 refQueue。

后续 ResourceLeakDetector 就会从 refQueue 中将 DefaultResourceLeak 摘下,然后检查这个 DefaultResourceLeak 是否仍旧停顿在 allLeaks 集合中。如果存在,就说明对应的 ByteBuf 发生了泄露,最后将泄露路径以 ERROR 级别的日记打印出来。
除此之外,Netty 还提供了一个内存泄露监听器,让我们可以在内存泄露发生之后实现自主的处理逻辑。
  1. final class Finalizer extends FinalReference<Object> {
  2.     // 双向链表,保存 JVM 堆中所有的 Finalizer 对象,防止 Finalizer 被 GC 掉
  3.     private static Finalizer unfinalized = null;
  4.     private Finalizer next, prev;
  5. }
复制代码
我们可以通过 ByteBufUtil.setLeakListener 方法来向 ResourceLeakDetector 注册 LeakListener。
  1. public class ResourceLeakDetector<T> {
  2.     private final Set<DefaultResourceLeak<?>> allLeaks =
  3.             Collections.newSetFromMap(new ConcurrentHashMap<DefaultResourceLeak<?>, Boolean>());
  4. }
复制代码
一旦 ResourceLeakDetector 探测到内存泄露的发生,Netty 就会回调我们注册的 LeakListener。
Netty 在全局范围内只会有一个 ResourceLeakDetector 实例,被 AbstractByteBuf 的静态字段 leakDetector 所引用。
  1.     public enum Level {
  2.         DISABLED,
  3.         SIMPLE,
  4.         ADVANCED,
  5.         PARANOID;
  6.     }
复制代码
内存泄露探测器的默认实现是 ResourceLeakDetector,但我们也可以自定义实现内存泄露探测器,只需要继承  ResourceLeakDetector 类,并覆盖实现相关的核心探测方法,最后通过 JVM 参数 -Dio.netty.customResourceLeakDetector={className} 指定即可。
ResourceLeakDetector 最核心的方法莫过于 track(T obj) 和 reportLeak() 这两个方法。
  1. public class ResourceLeakDetector {    public final ResourceLeakTracker track(T obj) {        return track0(obj, false);    }    // 采样频率,默认 128    private final int samplingInterval;    // 对 obj 举行资源泄露的探测    // force 表现是否强制探测    private DefaultResourceLeak track0(T obj, boolean force) {        Level level = ResourceLeakDetector.level;        if (force ||                level == Level.PARANOID ||                (level != Level.DISABLED && public class ResourceLeakDetector<T> {
  2.     private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
  3. })) {            // 触发内存泄露探测,如果发生内存泄露,则在日记中 report            reportLeak();            // 创建 ByteBuf (obj) 对应的弱引用 DefaultResourceLeak            // ResourceLeakDetector 中的全局 refQueue , allLeaks 会在这里注册进去            return new DefaultResourceLeak(obj, refQueue, allLeaks, getInitialHint(resourceType));        }        return null;    }}
复制代码
其中 track 方法用于触发内存泄露的探测,这里是对第二末节中的内容实现,如果我们设置的内存泄露探测级别为 PARANOID  ,  那么 Netty 就会对系统中全部的 ByteBuf 举行全量探测,内存泄露发生之后的报告日记也会包罗详细的泄露堆栈路径。
如果内存泄露探测级别为 SIMPLE  或者 ADVANCED  , 那么 Netty 就会对系统中的 ByteBuf 举行采样探测,采样间隔 SAMPLING_INTERVAL  = 128 , 我们可以通过 JVM 参数 -Dio.netty.leakDetection.samplingInterval 举行设置。
具体的采样逻辑是,Netty 会天生 [ 0 , samplingInterval ) 之间的一个随机数,如果这个随机数是 0 ,那么就举行内存泄露探测,如果这个随机数不为 0 ,则放弃探测。从结果上来看,就是每申请 samplingInterval 个 ByteBuf , Netty 就会触发一次内存泄露的探测。
  1. public class ResourceLeakDetector<T> {
  2.     private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
  3. }
复制代码
当符合内存泄露的探测条件之后,Netty 将会在 reportLeak() 方法中举行内存泄露的探测,如果有内存泄露的发生,那么就将泄露的 ByteBuf 相关访问路径以 ERROR 的日记级别打印出来。
既然内存泄露的日记级别是 ERROR , 那么在举行内存泄露探测之前,我们首先必须检查一下用户是否开启了  ERROR 日记级别。
  1. private static final class DefaultResourceLeak<T> {
  2.     // 栈顶指针
  3.     private volatile TraceRecord head; // 栈结构,存放对应 ByteBuf 的访问堆栈
  4. }
  5. private static class TraceRecord extends Throwable {
  6.   // 栈底
  7.   private static final TraceRecord BOTTOM = new TraceRecord()
  8. }
复制代码
如果用户选择的日记级别比较低,那么即使发生了内存泄露,相关的 ERROR 日记也不会打印,这种情况下内存泄露的探测也就没必要举行了。Netty 会调用 clearRefQueue() 方法,将 refQueue 中收集到的全部 DefaultResourceLeak 实例清空,而且将 DefaultResourceLeak 从 allLeaks 集合中删除。
  1. public class ResourceLeakDetector<T> {
  2.     // ByteBuf 访问堆栈记录个数限制,默认为 4
  3.     private static final int TARGET_RECORDS;
  4.     private static final String PROP_TARGET_RECORDS = "io.netty.leakDetection.targetRecords";
  5.     private static final int DEFAULT_TARGET_RECORDS = 4;
  6.     TARGET_RECORDS = SystemPropertyUtil.getInt(PROP_TARGET_RECORDS, DEFAULT_TARGET_RECORDS);
  7. }
复制代码
如果用户的日记级别选择的是 ERROR  ,  Netty 就会继续后面的内存泄露探测流程,首先一个 ByteBuf 如果被 GC 回收的话,那么与其弱引用关联的 DefaultResourceLeak 就会被 ReferenceHandler 线程转移到 refQueue 中。
也就是说当前 refQueue 中保留的全部 DefaultResourceLeak 其对应的 ByteBuf 已经被 GC 回收了,而内存泄露探测针对地就是这些被回收的 ByteBuf。
Netty 会从 refQueue 中将这些收集到的 DefaultResourceLeak 挨个摘下。
  1. public class ResourceLeakDetector<T> {
  2.     // 探测级别
  3.     private static Level level;
  4.     // 未被释放的 ByteBuf 对应的弱引用 DefaultResourceLeak 集合
  5.     private final Set<DefaultResourceLeak<?>> allLeaks =
  6.             Collections.newSetFromMap(new ConcurrentHashMap<DefaultResourceLeak<?>, Boolean>());
  7.     // 用于接收 ByteBuf 被回收的通知
  8.     private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
  9.     // 探测的资源类型,这里是 ByteBuf
  10.     private final String resourceType;
  11.     // 采样间隔
  12.     private final int samplingInterval;
  13.     // 内存泄露监听器,一旦探测到内存泄露,Netty 就会回调 LeakListener
  14.     private volatile LeakListener leakListener;
  15. }
复制代码
然后调用 dispose()  方法检查 DefaultResourceLeak 实例是否仍旧停顿在 allLeaks 集合中。
  1.     public interface LeakListener {
  2.         /**
  3.          * Will be called once a leak is detected.
  4.          */
  5.         void onLeak(String resourceType, String records);
  6.     }
复制代码
如果仍旧停顿在 allLeaks 中,那么就说明该 DefaultResourceLeak 实例对应的 ByteBuf 出现内存泄露了。在探测到内存泄露发生之后,调用 getReportAndClearRecords() 方法获取 ByteBuf 相关的访问堆栈路径,然后通过 reportTracedLeak 方法将 ByteBuf 的泄露路径以 ERROR 级别的日记打印出来,最后回调内存泄露监听器 LeakListener。
  1. public final class ByteBufUtil {
  2.     public static void setLeakListener(ResourceLeakDetector.LeakListener leakListener) {
  3.         AbstractByteBuf.leakDetector.setLeakListener(leakListener);
  4.     }
  5. }
复制代码
reportLeak() 方法的实现逻辑正是笔者在第一末节中介绍的全部内容:
  1.     private void reportLeak() {        // 日记级别必须是 Error 级别        if (!needReport()) {            clearRefQueue();            return;        }        // Detect and report previous leaks.        for (;;) {            // 对应的 ByteBuf 必须已经被 GC 回收,才会触发内存泄露的探测            public class ResourceLeakDetector<T> {
  2.     // 探测级别
  3.     private static Level level;
  4.     // 未被释放的 ByteBuf 对应的弱引用 DefaultResourceLeak 集合
  5.     private final Set<DefaultResourceLeak<?>> allLeaks =
  6.             Collections.newSetFromMap(new ConcurrentHashMap<DefaultResourceLeak<?>, Boolean>());
  7.     // 用于接收 ByteBuf 被回收的通知
  8.     private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
  9.     // 探测的资源类型,这里是 ByteBuf
  10.     private final String resourceType;
  11.     // 采样间隔
  12.     private final int samplingInterval;
  13.     // 内存泄露监听器,一旦探测到内存泄露,Netty 就会回调 LeakListener
  14.     private volatile LeakListener leakListener;
  15. }            if (ref == null) {                break;            }            // 检查 ByteBuf 对应的 DefaultResourceLeak 是否仍旧停顿在 allLeaks 集合中            if (!ref.dispose()) {                // 如果不存在,则说明 ByteBuf 已经被及时的释放了,不存在内存泄露                continue;            }            // 当探测到 ByteBuf 发生内存泄露之后,这里会获取 ByteBuf 相关的访问堆栈             String records = ref.getReportAndClearRecords();            if (reportedLeaks.add(records)) { // 去重泄露日记                // 打印泄露的堆栈路径                if (records.isEmpty()) {                    reportUntracedLeak(resourceType);                } else {                    reportTracedLeak(resourceType, records);                }                // 回调 LeakListener                LeakListener listener = leakListener;                if (listener != null) {                    listener.onLeak(resourceType, records);                }            }        }    }
复制代码
3.2 ResourceLeakTracker

上一末节介绍的 ResourceLeakDetector 只是负责内存泄露的探测,但如果探测到了内存泄露,相关的泄露路径信息从哪里来的呢 ?Netty 是怎样收集的 ?这就引入了第二个探测模型 —— ResourceLeakTracker。
Netty 对 ResourceLeakTracker 的默认实现是 DefaultResourceLeak,它是一个 WeakReference ,被 Netty 用来弱引用关联 ByteBuf , 目标是接收 ByteBuf 被 GC 回收的通知,从而可以判断是否有内存泄露的情况发生。

除此之外,ResourceLeakTracker 承担的另一个重要职责就是负责收集 ByteBuf 的访问链路堆栈,一旦 ByteBuf 发生泄露,ResourceLeakDetector 就会从 ResourceLeakTracker 中获取相关的泄露堆栈 —— getReportAndClearRecords() 方法,并在日记中打印出来。
每一条 ByteBuf 相关的访问链路堆栈信息,Netty 用一个 TraceRecord 结构来封装,而一个 ByteBuf 会有多条访问链路,那么在它的 ResourceLeakTracker 结构中就对应多个 TraceRecords,这些 TraceRecords 被 Netty 构造在一个栈的结构中。
  1. public class ResourceLeakDetector<T> {
  2.     public final ResourceLeakTracker<T> track(T obj) {
  3.         return track0(obj, false);
  4.     }
  5.     // 采样频率,默认 128
  6.     private final int samplingInterval;
  7.     // 对 obj 进行资源泄露的探测
  8.     // force 表示是否强制探测
  9.     private DefaultResourceLeak track0(T obj, boolean force) {
  10.         Level level = ResourceLeakDetector.level;
  11.         if (force ||
  12.                 level == Level.PARANOID ||
  13.                 (level != Level.DISABLED && PlatformDependent.threadLocalRandom().nextInt(samplingInterval) == 0)) {
  14.             // 触发内存泄露探测,如果发生内存泄露,则在日志中 report
  15.             reportLeak();
  16.             // 创建 ByteBuf (obj) 对应的弱引用 DefaultResourceLeak
  17.             // ResourceLeakDetector 中的全局 refQueue , allLeaks 会在这里注册进去
  18.             return new DefaultResourceLeak(obj, refQueue, allLeaks, getInitialHint(resourceType));
  19.         }
  20.         return null;
  21.     }
  22. }
复制代码
当 Netty 新分配一个 ByteBuf 之后,如果符合 ResourceLeakDetector.track  中的探测条件,那么就会创建一个 DefaultResourceLeak 来弱引用这个 ByteBuf。同时将这个 DefaultResourceLeak 加入到 allLeaks 集合中,这里正是判断一个 ByteBuf 是否发生内存泄露的关键依据。
无论什么样的探测级别,DefaultResourceLeak 都会至少保留一个 TraceRecord , 这个 TraceRecord 用于保存 ByteBuf 的创建位置堆栈,在构建  DefaultResourceLeak 的时间会被加入到栈底。
  1. PlatformDependent.threadLocalRandom().nextInt(samplingInterval) == 0
复制代码
另外我们可以通过 record 相关方法,来向 DefaultResourceLeak 添加 ByteBuf 的当前访问堆栈。
  1.     protected boolean needReport() {
  2.         return logger.isErrorEnabled();
  3.     }
复制代码
通过 record(Object hint) 添加的堆栈,会在泄露日记中出现我们自定义的提示信息。

而通过 record() 添加的堆栈,在泄露日记中就没有这个提示信息。

向 DefaultResourceLeak 添加新 TraceRecord 的逻辑也很简单,就是将 ByteBuf 当前最新的访问堆栈信息 —— TraceRecord 入栈即可。但也不能无穷制的向栈中添加 TraceRecord。
第二末节笔者介绍过,每个 TraceRecord 中记录的访问堆栈信息占用 2K 的内存,Netty 不可能为 ByteBuf 的每一次访问都记录下堆栈信息,所以 DefaultResourceLeak 栈中的个数会受到 TARGET_RECORDS 的限制,默认为 4 , 我们可以通过 -Dio.netty.leakDetection.targetRecords 参数举行调节。
当 DefaultResourceLeak 栈中记录的 TraceRecord 个数达到 TARGET_RECORDS 的限定时,Netty 会根据一定的概率(比较高)来丢弃当前栈顶 TraceRecord,并将新的 TraceRecord 作为栈顶。从而避免了 TraceRecord 个数疯狂地增长。
但如果恰恰命中了不丢弃的概率(非常低),那么原来栈顶的 TraceRecord 将不会丢弃而是继续保留在栈中,新的 TraceRecord 作为栈顶加入到栈中,这样一来栈中 TraceRecord 个数就超过了 TARGET_RECORDS 的限制。但是可以尽可能多的保留 ByteBuf 中间的访问堆栈记录。使得 ByteBuf 的泄露路径更加完整一些。

丢弃概率的盘算逻辑也很简单,Netty 仍旧是通过盘算一个 [ 0 , 1 = TARGET_RECORDS) {                        final int backOffFactor = Math.min(numElements - TARGET_RECORDS, 30);                        // numElements 超出 TARGET_RECORDS 的限制越多,当前栈顶就越容易被 drop                        if (dropped = PlatformDependent.threadLocalRandom().nextInt(1

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

飞不高

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表