ToB企服应用市场:ToB评测及商务社交产业平台

标题: 抓到 Netty 一个隐藏很深的内存泄露 Bug | 详解 Recycler 对象池的精妙设计 [打印本页]

作者: 忿忿的泥巴坨    时间: 2022-8-23 18:48
标题: 抓到 Netty 一个隐藏很深的内存泄露 Bug | 详解 Recycler 对象池的精妙设计
欢迎关注公众号:bin的技术小屋,如果大家在看文章的时候发现图片加载不了,可以到公众号查看原文
本系列Netty源码解析文章基于 4.1.56.Final版本
最近在 Review Netty 代码的时候,不小心用我的肉眼抓到了一个隐藏很深很深的内存泄露 Bug。

于是笔者将这个故事....哦不 .....事故,详细的阐述出来分享给大家。

这将是一篇很长很长的故事,在本文中笔者会详细描述这个内存泄露 Bug 的发现,分析,修复过程。顺便将对象池在 Netty 中的一些精妙的设计方案及其源码实现一起详尽地展现给大家。
故事从何说起呢?让我们回到另一个月黑风高天空还是显得那么深邃遥远的夜晚,笔者再一次闲来无事捧起 Netty 对象池相关部分源码细细品读的时候,突然菊花一紧,虎躯一震。意外的用肉眼盯出了一个内存泄露Bug出来。
于是笔者顺手一个 Issue,反手一个修复 PR 提交了过去。
Issue11864 : https://github.com/netty/netty/issues/11864
PR : https://github.com/netty/netty/pull/11865

巧合的是 Netty 也意识到了对象池这块的问题,Netty 最近也正在重构这一块,因为 Recycler 整体设计的还是比较复杂的,这可以从我们接下来要分析的对象池源码实现中可以看的出来,Recycler 的复杂性在于它的使用场景混合了并发以及与 GC 相关的交互,这些相关的问题都比较难以定位,所以 Netty 决定将对象池这一块用一种更加容易被理解的方式重构掉。
这位说话特别好听的 chrisvest 大佬也说了 笔者发现的这个 Bug 也间接证明了 Netty 要简化对象池设计的正确性和必要性。

随口提一句,这个大牛 chrisvest 是大名鼎鼎的图数据库 Neo4j 的核心commitor,同时也是Netty Buffer相关 API 的设计者。
这里先不详细解释这个 Issue,也不建议大家现在就打开这个 Issue 查看,笔者会在本文的介绍中随着源码深入的解读慢慢的为大家一层一层地拨开迷雾。
下面就让我们一起带着怀疑,审视,欣赏,崇敬,敬畏的态度来一起品读世界顶级程序员编写的代码。由衷的感谢他们在这一领域做出的贡献。

1. 池化思想的应用

在我们日常开发工作中我们经常会遇到各种池化技术的设计思想,比如连接池,内存池,对象池,还有我们在业务开发过程中经常会缓存一些业务计算结果数据这也同样运用到了池化技术的设计思想,我们可以叫它为结果池。
池化技术的应用场景就是当一个对象的创建和销毁需要付出比较大的性能开销时,我们就需要将这些重量级对象放在一个池子里管理,当需要时直接从池子里获取避免重复创建和销毁的开销从而达到了复用的效果。
比如连接池里面保存管理的都是一些网络连接对象,这些对象创建和销毁的代价比较大。通过连接池将这些重量级的网络连接对象统一管理起来,业务线程可以直接复用,避免了重新创建,释放连接的性能开销以及等待时间。
还有我们在日常开发中遇到的一些计算逻辑复杂的业务,我们通常会先从数据库中查询数据,然后经过复杂的计算得到结果,为了避免下次在重复计算,我们会将计算结果放入缓存中,我们可以称做结果池。也是一种池化思想。
再比如我们在《Netty如何高效接收网络数据》一文中提到的内存池,为了避免不必要的数据拷贝以及JVM垃圾回收对性能的影响,Netty 选择使用堆外内存存储网络通信数据。在 Netty 申请堆外内存之前,首先会在 JVM 堆中创建一个用于引用 native memory 的引用对象 DirectByteBuffer ,随后会使用 native 方法 unsafe.allocateMemory 通过底层 malloc 系统调用申请一块堆外内存。
这里就涉及到到两个重要开销:
而在 Netty 面对的高并发网络通信场景下,申请堆外内存是一个非常频繁的操作,基于以上提到的两个重要性能开销,这种大量频繁的内存申请释放操作对程序的性能影响是巨大的,所以 Netty 就引入了内存池对内存相关的操作进行统一的管理。
2. 对象池简介

以上内容的介绍就是池化思想的应用以及它所解决的问题,本文我们的主题是介绍对象池,对象池的引入是为了在需要大量创建对象以及销毁对象的场景下,将对象进行池化以达到复用池中对象,避免大量地重复创建对象以及销毁对象的性能开销,
前边我们在提到内存池的时候说到,在 Netty 所要面对的高并发网络通信场景下,需要大量的申请堆外内存用来存储通信数据。在 Netty 中,我们通过 PooledDirectByteBuf 对象来引用堆外内存。所以 Netty 在处理网络 IO 的时候是需要大量频繁的创建 PooledDirectByteBuf 对象。
为了避免在高并发的场景下大量的创建对象所引来的性能开销,我们可以引入对象池来池化创建出来的 PooledDirectByteBuf 对象,需要用的时候直接从对象池中获取,用完之后在回收到对象池中。
另外这里提前向大家透露一点的是我们下篇文章中即将要介绍的 Netty 发送数据流程涉及到的对象池的应用。我们都知道 Netty 是一个异步事件驱动的高性能网络框架,当在业务线程中处理完业务逻辑准备响应业务结果到客户端的时候,我们会向对应 channel 写入业务结果,此时业务线程会立即返回,这是一个异步的过程。
原因是在底层实现中,Netty 会将用户的响应结果数据暂时写入到每个 Channel 特有的一个发送缓冲队列 ChannelOutboundBuffer 中,也就是说这个 ChannelOutboundBuffer 缓存着 Channel 中的待发送数据。最终会通过 flush 方法,将 ChannelOutboundBuffer 中的这些待发送数据写入到底层 Socket 中,从而发送给客户端。
而这个发送缓冲队列 ChannelOutboundBuffer 中的队列元素是一个 Entry 类型的,每次的写入操作需要创建一个 Entry 对象来包裹发送数据,并将这个 Entry 对象缓存在发送缓冲队列 ChannelOutboundBuffer 中。
这里大家只需要知道 ChannelOutboundBuffer 是个啥,它的大概作用,以及这个缓冲队列缓存的对象是 Entry 类型的就可以了,我们会在下篇文章为大家详细介绍,这里引出只是为了介绍对象池的应用场景。
所以Netty在面对海量网络 IO 的场景下,必定会大量频繁地去创建 Entry 对象,那么每一次的网络 IO 都要重新创建这些对象,并且用完又要被垃圾回收掉这样无疑会大量增加 JVM 的负担以及 GC 的时间,这对于最求极致性能的 Netty 来说肯定是不可接受的。
基于以上这几种情况,对象池被用来管理那些需要频繁创建使用的对象,在使用完后并不立即将它们释放,而是将它们在对象池中缓存起来,以供后续的应用程序重复使用,从而减少创建对象和释放对象的开销,进而改善应用程序的性能。
从另一方面来看,对象池还可以将对象限制在一定的数量内从而可以有效减少应用程序在内存上的开销。
通过前边关于对象池的简要介绍之后,我想大家现在可能比较好奇这些对象在创建和回收的过程中到底需要哪些开销呢?
接下来笔者就为大家介绍下这些开销方面的内容方便大家更加全面清晰地理解对象池。
3. 对象在JVM中创建和回收开销

3.1 对象的创建开销

在 Java 程序中我们可以通过一个 new 关键字来创建对象,而当JVM遇到一条 new 的字节码指令后,会发生什么呢?
关于如何确定对象所需内存大小,对这方面细节感兴趣的同学可以回看下笔者的《对象在JVM中的内存布局》这篇文章。
大家这里需要记住这种利用TLAB的分配方式,因为Netty中的对象池Recycler也是利用这种思想避免多线程获取对象的同步开销。
3.2 对象的回收开销

然而在高并发的网络IO处理场景下,这些单个对象的创建和回收开销会被无限放大,于是Netty引入了一个轻量级的对象池 Recycler 来负责将这些需要频繁创建的对象进行池化,统一分配,回收管理。
在为大家详细介绍对象池 Recycler 的实现之前,笔者想先从对象池的使用上先让大家可以直观地感受一下 Recycler 对外提供的功能入口。
4. 对象池Recycler的使用

这里我们直接看下Netty源码中是如何使用Recycler对象池的,首先我们来看下对象池在 PooledDirectByteBuf 类中是如何使用的。
大家这里先不用去管这个PooledDirectByteBuf类是干吗的,只需要明白这个类是会被频繁创建的,我们这里主要是演示对象池的使用。

4.1 对象池在PooledDirectByteBuf类中的使用
  1. final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
  2.     //创建对象池
  3.     private static final ObjectPool<PooledDirectByteBuf> RECYCLER = ObjectPool.newPool(
  4.             new ObjectCreator<PooledDirectByteBuf>() {
  5.         @Override
  6.         public PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
  7.             return new PooledDirectByteBuf(handle, 0);
  8.         }
  9.     });
  10.     //对象在对象池中的回收句柄
  11.     private final Handle<PooledByteBuf<T>> recyclerHandle;
  12.     static PooledDirectByteBuf newInstance(int maxCapacity) {
  13.         //从对象池中获取对象
  14.         PooledDirectByteBuf buf = RECYCLER.get();
  15.         buf.reuse(maxCapacity);
  16.         return buf;
  17.     }
  18.    
  19.     private void recycle() {
  20.          //回收对象
  21.         recyclerHandle.recycle(this);
  22.     }
  23.     ................省略和对象池无关的代码..................
  24. }
复制代码
前边我们提到在Netty中需要大量频繁的创建PooledDirectByteBuf对象,为了避免在高并发场景下频繁创建对象的开销从而引入了对象池来统一管理PooledDirectByteBuf对象。
Netty中每个被池化的对象中都会引用对象池的实例ObjectPool  RECYCLER ,这个对象池的实例就是专门用来分配和管理被池化对象的。
这里我们创建出来的对象池是专门用来管理PooledDirectByteBuf对象的(通过泛型指定对象池需要管理的具体对象)。泛型类ObjectPool是Netty为对象池设计的一个顶层抽象。对象池的行为功能均定义在这个泛型抽象类中。我们可以通过 ObjectPool#newPool 方法创建指定的对象池。其参数 ObjectCreator 接口用来定义创建池化对象的行为。当对象池中需要创建新对象时,就会调用该接口方法 ObjectCreator#newObject 来创建对象。
其中每个池化对象中都会包含一个recyclerHandle,这个recyclerHandle是池化对象在对象池中的句柄。里边封装了和对象池相关的一些行为和信息,recyclerHandle是由对象池在创建对象后传递进来的。
当我们需要PooledDirectByteBuf对象时,我们直接通过RECYCLER.get()从PooledDirectByteBuf对象池中获取对象即可。
当我们使用完毕后,直接调用PooledDirectByteBuf对象在对象池中的句柄recyclerHandle.recycle(this) 把对象回收到对象池中。
4.2 对象池在Channel写入缓冲队列中的使用

前边提到,每个Channel都会有一个独立的写入缓冲队列ChannelOutboundBuffer,用来暂时存储用户的待发送数据。这样用户可以在调用channel的write方法之后立马返回,实现异步发送流程。
在发送数据时,Channel首先会将用户要发送的数据缓存在自己的写缓存队列ChannelOutboundBuffer中。而ChannelOutboundBuffer中的元素类型为Entry。在Netty中会大量频繁的创建Entry对象。所以Entry对象同样也需要被对象池管理起来。
在上小节介绍PooledDirectByteBuf对象池的过程中,我想大家已经对对象池的使用套路已经有了大概的了解。这里我们借助Entry对象池将使用步骤总结如下:
创建对象池
  1.    static final class Entry {
  2.         private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
  3.             @Override
  4.             public Entry newObject(Handle<Entry> handle) {
  5.                 return new Entry(handle);
  6.             }
  7.         });
  8.         //recyclerHandle用于回收对象
  9.         private  Handle<Entry> handle;
  10.         
  11.         private Entry(Handle<Entry> handle) {
  12.             this.handle = handle;
  13.         }
  14.    }
复制代码
前边我们介绍到每一个要被池化的对象都需要一个静态变量来引用其对应的对象池。
  1. static final ObjectPool<Entry> RECYCLER
复制代码
匿名实现 ObjectCreator 接口来定义对象创建的行为方法。
  1.     public interface ObjectCreator<T> {
  2.         T newObject(Handle<T> handle);
  3.     }
复制代码
通过ObjectPool#newPool 创建用于管理Entry对象的对象池。
在对象池创建对象时,会为池化对象创建其在对象池中的句柄Handler,随后将Handler传入创建好的池化对象中。当对象使用完毕后,我们可以通过Handler来将对象回收至对象池中等待下次继续使用。
从对象池中获取对象

由于Entry对象在设计上是被对象池管理的,所以不能对外提供public构造函数,无法在外面直接创建Entry对象。
所以池化对象都会提供一个获取对象实例的 static 方法 newInstance。在该方法中通过RECYCLER.get()从对象池中获取对象实例。
  1.       static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
  2.             Entry entry = RECYCLER.get();
  3.             
  4.             .........省略无关代码..............
  5.             return entry;
  6.         }
复制代码
使用完毕回收对象

池化对象都会提供一个 recycle 方法,当对象使用完毕后,调用该方法将对象回收进对象池中。
  1.         void recycle() {
  2.             next = null;
  3.             bufs = null;
  4.             buf = null;
  5.             msg = null;
  6.             promise = null;
  7.             progress = 0;
  8.             total = 0;
  9.             pendingSize = 0;
  10.             count = -1;
  11.             cancelled = false;
  12.             handle.recycle(this);
  13.         }
复制代码
从上边所列举的Netty中使用对象池的例子,我们可以直观的感受到对象池的使用非常简单。无非就是从对象池获取对象,将对象回收至对象池这两个核心步骤。
同时我们也注意到池化对象的设计和普通对象是有所不同的,不过,我们只需要遵循本小节中所列举的几个步骤进行设计即可。
5. Recycler总体设计

Recycler对象池的设计还是比较复杂的但是却很精妙,所以笔者这里继续采用总 - 分 - 总的结构来为大家介绍对象池的设计与实现。
一开始我们先不要去追求太过细节的内容,先要从总体上摸清楚对象池的设计架构,以及各个功能模块之间的关联。
当我们从整体上理解了对象池的设计架构后,笔者后面会分模块来各个击破它的实现细节。
在理清楚各个模块的实现细节之后,笔者将在从细节着手再次将对象池的整体设计架构为大家串联起来。
我们按照这个思路先来看一下Recycler对象池的总体架构设计图,从整体直观上来感受下它的设计,以及包含的一些重要模块。

5.1 多线程获取对象无锁化设计

首先我们从外部整体来看,对象池对于我们来说它就是一个存储对象的池子,当我们需要对象时会从这个池子里直接获取,用完对象时在把对象归还回池子中方便下一次重复使用。
但我们俯瞰整个对象池的设计架构时,我们发现整个设计还是比较复杂其中蕴含了不少精妙的细节。
对象池中最重要的两个结构分别是 Stack 和 WeakOrderQueue。
Stack 中包含一个用数组实现的栈结构(图中绿色部分),这个栈结构正是对象池中真正用于存储池化对象的地方,我们每次从对象池中获取对象都会从这个栈结构中弹出栈顶元素。同样我们每次将使用完的对象归还到对象池中也是将对象压入这个栈结构中。
这里有一个精妙的设计,我们从图中可以看到每个线程都会拥有一个属于自己的Stack。在我们介绍《对象创建的开销》这一小节内容时,提到为了避免多线程并发申请内存时的同步锁定开销,JVM为每个线程预先申请了一块内存(TLAB),这样当线程创建对象时都是从自己的TLAB中为对象分配内存。从而避免了多线程之间的同步竞争。
同样当多线程并发从对象池中获取对象时, 如果整个对象池只有一个Stack结构的话,为了保证多线程获取对象的线程安全性,我们只能同步地来访问这个Stack,这样就为对象池的设计引入了多线程同步竞争的开销。
为了避免这种不必要的同步竞争,Netty也采用了类似TLAB分配内存的方式,每个线程拥有一个独立Stack,这样当多个线程并发从对象池中获取对象时,都是从自己线程中的Stack中获取,全程无锁化运行。大大提高了多线程从对象池中获取对象的效率
这种多线程并发无锁化的设计思想,在Netty中比比皆是
5.2 Stack的设计

从Recycler对象池的整体设计架构图中我们可以看到,Stack的设计主要分为两个重要的部分:
这里我们先不需要管WeakOrderQueue的具体结构
那么Stack结构在设计上为什么要引入这个WeakOrderQueue链表呢
让我们考虑一种多线程回收对象的场景,我们还是以Recycler对象池的整体设计架构图为例。thread1 为当前线程,剩下的thread2 , thread3 , thread4为其他线程。让我们把视角先聚焦在当前线程上。
我们先假设Stack结构中只有一个数组栈,并没有WeakOrderQueue链表。看看这样会产生什么后果?

当前线程 thread1 在处理业务逻辑时,创建了一个对象(注意:这个对象是由thread1创建的)如果这是一个单线程处理业务的场景,那么对象会在thread1处理完业务逻辑后被回收至thread1对应的stack1中的数组栈中。当`hread1再次需要创建对象时,会直接从其对应的stack1中的数组栈(图中绿色部分)中直接获取上次回收的对象。
由这一点可以看出Stack中的数组栈(绿色部分)存放的是真正被回收的对象,是可以直接被再次获取使用的。
但如果这是一个多线程处理业务场景的话,很可能由thread1创建出来的对象,会被交给thread2或者thread3去处理剩下的业务逻辑,那么当thread2或者thread3这些其他线程处理完业务逻辑时,此时对象的释放并不是在thread1中,而是在其他线程中。
其他线程现在面对的任务就是要将由thread1创建出来的对象,释放回收至thread1对应的stack1中的数组栈中。如果此时多个其他线程并发的向stack1释放回收对象,势必会导致多线程之前的同步竞争,Netty将不得不把Stack结构中的数组栈的访问设计成一个同步过程
那么如果此时更不巧的是当前线程thread1又要同时向自己的Stack1获取对象,thread1就只能同步等待,因为此时其他线程正在向Stack1释放对象。
本来我们引入对象池的目的就是为了抵消创建对象的开销加快获取对象的速度,减少GC的压力。结果由于Stack的同步访问设计又引入了同步开销。这个同步的开销甚至会比创建对象的开销还要大,那么对象池的引入就变得得不偿失了。
那么Netty该如何化解这种情况呢?答案还是之前反复强调的无锁化设计思想。
既然多线程的回收对象场景,会引入多线程之间的同步锁定开销,那么我们就继续采用无锁化的设计思想,为每个线程(注意:这里指的是非创建对象的线程也就是图中的thead2 , thread3 ....)单独分配一个WeakOrderQueue节点,每个线程在为创建线程回收对象时,会将这些对象暂时存放到自己对应的WeakOrderQueue节点中。
注意:存放进WeakOrderQueue中的对象我们称为待回收对象,这些待回收对象并不在Stack结构中的数组栈中,因此并不能被直接获取使用。
为了方便后续描述,我们把创建对象的线程称作创建线程(示例中的thread1),将为创建线程回收对象的其他线程称作回收线程(示例中的thread2 , thread3 , thead4 .....)。
我们在将视角拉回到创建线程thread1对应的stack1中,每个回收线程将待回收对象放入与自己对应的WeakOrderQueue节点中,这样就避免了在多线程回收场景中的同步竞争。当所有回收线程都在为stack1回收对象时,这样在stack1中就形成了一个WeakOrderQueue链表。每个回收线程只操作与自己对应的节点。在Stack结构中通过head,prev,cursor将这些WeakOrderQueue节点组成了一个链表。
每一个WeakOrderQueue节点对应一个回收线程。
而当创建线程thread1再次从自己对应的Stack1中获取对象时,只会从Stack结构的数组栈中获取,因为是单线程操作数组栈,自然是不会存在同步竞争的。
当Stack结构中的数组栈没有任何对象时,那么创建线程就会根据 cursor 指针遍历Stack结构中的WeakOrderQueue链表,将当前WeakOrderQueue节点存放的待回收对象转移至数组栈中。如果WeakOrderQueue链表中也没有任何待回收对象可以转移。那么创建线程在对象池中就直接创建一个对象出来返回。
对象池回收对象的一个原则就是对象由谁创建的,最终就要被回收到创建线程对应的Stack结构中的数组栈中。数组栈中存放的才是真正被回收的池化对象,可以直接被取出复用。回收线程只能将待回收对象暂时存放至创建线程对应的Stack结构中的WeakOrderQueue链表中。当数组栈中没有对象时,由创建线程将WeakOrderQueue链表中的待回收对象转移至数组栈中。
正是由于对象池的这种无锁化设计,对象池在多线程获取对象和多线程回收对象的场景下,均是不需要同步的
大家在体会下这张图中蕴含的这种无锁化设计思想

5.3 WeakOrderQueue的设计

在我们介绍完对象池在多线程回收对象场景下的设计时,我们再来看下用于回收线程存储待回收对象的WeakOrderQueue是如何设计的?
注意:这里的回收线程,待回收对象这些概念是我们站在创建线程的视角提出的相对概念。

大家一开始可能从WeakOrderQueue字面意思上以为它的结构是一个队列,但实际上从图中我们可以看出WeakOrderQueue的结构其实是一个链表结构。
其中包含了链表的头结点 Head,以及链表尾结点指针 Tail。链表中的元素类型为 Link 类型。
Link 类型中包含了一个 elements 数组,该数组用来存放回收线程收集的待回收对象。
除此之外Link类型中还包含了readIndex用来指示当前elements数组中的读取位置。writeIndex用来指示elements数组的写入位置。elements数组中的容量默认为16,也就是说一个Link节点最多可以存放16个待回收对象。当回收线程收集的待回收对象超过16个时,就会新创建一个Link节点插入到Link链表的尾部。
当需要将WeakoOrderQueue节点中所存放的待回收对象回收转移至其对应的Stack结构中的数组栈中时,创建线程会遍历当前WeakOrderQueue节点中的Link链表,然后从链表的Head节点开始,将Head节点中包裹的Link链表头结点中存放的待回收对象回收至创建线程对应的Stack中。一次最多转移一个Link大小的待回收对象(16个)。
当Link节点中的待回收对象全部转移至创建线程对应的Stack中时,会立马将这个Link节点从当前WeakOrderQueue节点中的Link链表里删除,随后Head节点向后移动指向下一个Link节点。
head指针始终指向第一个未被转移完毕的Link节点,创建线程从head节点处读取转移待回收对象,回收线程从Tail节点处插入待回收对象。这样转移操作和插入操作互不影响、没有同步的开销
注意这里会存在线程可见性的问题,也就是说回收线程刚插入的待回收对象,在创建线程转移这些待回收对象时,创建线程可能会看不到由回收线程刚刚插入的待回收对象。
Netty这里为了不引入多线程同步的开销,只会保证待回收对象的最终可见性。 因为如果要保证待回收对象的实时可见性,就要插入一些内存屏障指令,执行这些内存屏障指令也是需要开销的。
事实上这里也并不需要保证实时可见性,创建线程暂时看不到WeakOrderQueue节点中的待回收对象也是没关系的,大不了就新创建一个对象。这里还是遵循无锁化的设计思想
维护线程之间操作的原子性,可见性都是需要开销的,我们在日常多线程程序设计中一定要根据业务场景来综合考虑,权衡取舍。尽量遵循我们这里多次强调的多线程无锁化设计思想。提高多线程的运行效率。避免引入不必要的同步开销。
综合以上 Netty Recycler 对象池的设计原理,我们看到多线程从对象池中获取对象,以及多线程回收对象至对象池中,还有创建线程从WeakOrderQueue链表中转移待回收对象到对象池中。这些步骤均是无锁化进行的,没有同步竞争。
在理解了对象池的基本设计原理后,下面就该介绍对象池在Netty中的源码实现环节了。
6. Recycler对象池的实现

在小节《4. 对象池Recycler的使用》中我们介绍了Recycler对象池的两个使用案例:
从这两个案例中,我们看到在设计池化对象时,都需要在池化对象内部持有一个对象池的静态引用从而可以与对象池进行交互,引用类型为 ObjectPool ,ObjectPool 是Netty对象池的顶层设计,其中定义了对象池的行为,以及各种顶层接口。
在介绍对象池的整体实现之前,我们先来看下对象池的这个顶层接口设计。
6.1 对象池的顶层设计ObjectPool
  1. public abstract class ObjectPool {    ObjectPool() { }    public abstract T get();    public interface Handle {        void recycle(T self);    }    public interface ObjectCreator<T> {
  2.         T newObject(Handle<T> handle);
  3.     }    ......................省略............}
复制代码
我们首先看到 ObjecPool 被设计成为一个泛型的抽象类,之所以使用泛型,是因为我们在创建对象池的时候需要指定对象池中被池化对象的类型。
比如《4. 对象池Recycler的使用》小节中的这两个案例:
  1. static final class Entry {
  2.     private static final ObjectPool<Entry> RECYCLER
  3. }
复制代码
  1. final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
  2.     private static final ObjectPool<PooledDirectByteBuf> RECYCLER
  3. }
复制代码
ObjecPool 定义了从对象池中获取对象的行为:
  1. public abstract T get();
复制代码
将池化对象回收至对象池中的行为被定义在 Handler 内部接口中:
  1.      public interface Handle<T> {
  2.         void recycle(T self);
  3.     }
复制代码
Handler是池化对象在对象池中的一个模型,Handler里面包裹了池化对象,并包含了池化对象的一些回收信息,以及池化对象的回收状态。它的默认实现是DefaultHandle,后面我们会详细介绍。
我们前边介绍到的Stack结构中的数组栈里边存放的就是DefaultHandle,以及WeakOrderQueue结构里的Link节点中的elements数组里存放的也是DefaultHandle。
那么为什么要将池化对象的回收行为recycle定义在Handler中,而不是ObejctPool中呢
让我们站在业务线程的角度来看,其实业务线程处理的都是对象级别这个维度,并不需要感知到对象池的存在,使用完对象,直接调用对象的回收方法recycle将池化对象回收掉即可。
在《4. 对象池Recycler的使用》小节我们介绍过池化对象的设计方法,其中我们提到池化对象中需要引用其在对象池中的Handler,这个Handler会在对象池创建对象的时候传入。池化对象类型中需要定义recycle方法,recycle方法清空池化对象的所有属性,并调用Handler的recycle方法将池化对象回收至对象池中。
  1. static final class Entry {        void recycle() {
  2.             next = null;
  3.             bufs = null;
  4.             buf = null;
  5.             msg = null;
  6.             promise = null;
  7.             progress = 0;
  8.             total = 0;
  9.             pendingSize = 0;
  10.             count = -1;
  11.             cancelled = false;
  12.             handle.recycle(this);
  13.         }}
复制代码
ObjectPool 还定义了对象池创建对象的行为接口:
  1.     public interface ObjectCreator<T> {
  2.         T newObject(Handle<T> handle);
  3.     }
复制代码
用户在创建对象池的时候,需要通过ObjectCreator#newObject方法指定对象池创建对象的行为。Handler对象正是通过这个接口传入池化对象中的。
  1.   static final class Entry {      private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator() {            @Override            public Entry newObject(Handle handle) {                return new Entry(handle);            }        });      //Entry对象只能通过对象池获取,不可外部自行创建      private Entry(Handle handle) {            this.handle = handle;        }  }
复制代码
6.1.1 创建ObjectPool

  1. public abstract class ObjectPool<T> {
  2.     public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {
  3.         return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));
  4.     }
  5.     private static final class RecyclerObjectPool<T> extends ObjectPool<T> {
  6.         //recycler对象池实例
  7.         private final Recycler<T> recycler;
  8.         RecyclerObjectPool(final ObjectCreator<T> creator) {
  9.              recycler = new Recycler<T>() {
  10.                 @Override
  11.                 protected T newObject(Handle<T> handle) {
  12.                     return creator.newObject(handle);
  13.                 }
  14.             };
  15.         }
  16.         @Override
  17.         public T get() {
  18.             return recycler.get();
  19.         }
  20.     }
  21. }
复制代码
  1. public abstract class Recycler<T> {
  2.     protected abstract T newObject(Handle<T> handle);
  3.   
  4.     ........................省略.............
  5. }
复制代码
调用 ObjectPool#newPool 创建对象池时,返回的是 RecyclerObjectPool 实例。而真正的对象池 Recycler 被包裹在 RecyclerObjectPool 中。
对象池Recycler创建对象的行为定义在用户在创建对象池时指定的ObjectCreator 中。
7. Recycler对象池属性详解

在介绍完对象池的顶层设计之后,接下来我们介绍下Recycler对象池相关的一些重要属性。相信大家在看过前边关于对象池设计原理的介绍之后,现在应该能够比较容易的理解即将介绍的这些属性概念,这里涉及到的属性比较多,笔者把这些属性的介绍放到源码实现之前的目的也是先让大家混个眼熟,先有一个感性的认识,等到介绍源码实现时,笔者还会将涉及到的属性再次拿出来介绍。

7.1 创建线程,回收线程的Id标识
  1. public abstract class Recycler<T> {
  2.     //用于产生池化对象中的回收Id,主要用来标识池化对象被哪个线程回收
  3.     private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
  4.     //用于标识创建池化对象的线程Id 注意这里是static final字段 也就意味着所有的创建线程OWN_THREAD_ID都是相同的
  5.     //这里主要用来区分创建线程与非创建线程。多个非创建线程拥有各自不同的Id
  6.     //这里的视角只是针对池化对象来说的:区分创建它的线程,与其他回收线程
  7.     private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
  8. }
复制代码
这里有的同学可能会有疑问了,在多线程从对象池中获取对象的场景中,创建线程会有很多个(比如下图中的thread1, thread2, thread3.....),既然所有的Recycler 对象池实例中的 OWN_THREAD_ID 都是一样的,那么如何区分不同的创建线程呢?

事实上在对象池中我们并不需要区分创建线程与创建线程之间的Id,因为Netty在设计对象池的时候采用了无锁化设计,创建线程与创建线程之间并不需要交互,每个线程只需要关注自己线程内的对象管理工作即可,所以从一个线程的内部视角来看,只会有一个创建线程就是它自己本身,剩下的线程均是回收线程。所以我们对象池的设计中只需要区分创建线程与回收线程就可以了,当然每个回收线程的Id是不一样的。
回收线程的Id是由其对应的 WeakOrderQueue 节点来分配的,一个 WeakOrderQueue 实例对应一个回收线程Id。
  1. private static final class WeakOrderQueue extends WeakReference<Thread> {
  2.     //回收线程回收Id,每个weakOrderQueue分配一个,同一个stack下的一个回收线程对应一个weakOrderQueue节点
  3.    private final int id = ID_GENERATOR.getAndIncrement();
  4. }
复制代码
7.2 对象池中的容量控制

  1.     //对象池中每个线程对应的Stack中可以存储池化对象的默认初始最大个数 默认为4096个对象
  2.     private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
  3.     // 对象池中线程对应的Stack可以存储池化对象默认最大个数 4096
  4.     private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
  5.     // 初始容量 min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256) 初始容量不超过256个
  6.     private static final int INITIAL_CAPACITY;
复制代码
Recycler 对象池中定义了以上三个属性用于控制对象池中可以池化的对象容量。这些属性对应的初始化逻辑如下:
  1.     static {
  2.         int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
  3.                 SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
  4.         if (maxCapacityPerThread < 0) {
  5.             maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
  6.         }
  7.         DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
  8.         INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
  9.     }
复制代码
7.3 回收线程可回收对象的容量控制
  1.    //用于计算回收线程可帮助回收的最大容量因子  默认为2  
  2.     private static final int MAX_SHARED_CAPACITY_FACTOR;
  3.     //每个回收线程最多可以帮助多少个创建线程回收对象 默认:cpu核数 * 2
  4.     private static final int MAX_DELAYED_QUEUES_PER_THREAD;
  5.     //回收线程对应的WeakOrderQueue节点中的Link链表中的节点存储待回收对象的容量 默认为16
  6.     private static final int LINK_CAPACITY;
复制代码
Recycler 对象池除了对创建线程中的 Stack 容量进行限制外,还需要对回收线程可回收对象的容量进行限制。相关回收容量限制属性初始化逻辑如下:
  1.     static {
  2.         MAX_SHARED_CAPACITY_FACTOR = max(2,
  3.                 SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
  4.                         2));
  5.         MAX_DELAYED_QUEUES_PER_THREAD = max(0,
  6.                 SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
  7.                         // We use the same value as default EventLoop number
  8.                         NettyRuntime.availableProcessors() * 2));
  9.         LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
  10.                 max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
  11.     }
复制代码
为了方便大家理解这些容量控制的相关参数,笔者又在对象池架构设计图的基础上补充了容量控制相关的信息。大家可以对照上边介绍到的这些参数的含义形象体会下:

7.4 对象回收频率控制

对象池不能不考虑容量的限制而无脑的进行对象的回收,而是要对回收对象的频率进行限制。在我们日常架构设计和程序设计时,我们也一定要有托底的方案,比如限流,降级,熔断等托底方案。这样程序就不至于被突发的异常流量击垮。
在对象池的设计中,Netty用以下两个参数来控制对象回收的频率从而避免对象池迅速膨胀不可控制。
  1.     //创建线程回收对象时的回收比例,默认是8,表示只回收1/8的对象。也就是产生8个对象回收一个对象到对象池中
  2.     private static final int RATIO;
  3.     //回收线程回收对象时的回收比例,默认也是8,同样也是为了避免回收线程回收队列疯狂增长 回收比例也是1/8
  4.     private static final int DELAYED_QUEUE_RATIO;
复制代码
对象回收频率控制参数的初始化逻辑如下:
  1.     static {
  2.         RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
  3.         DELAYED_QUEUE_RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.delayedQueue.ratio", RATIO));
  4.     }
复制代码
通过前边对 Recycler 对象池的设计原理介绍,我们知道,在池化对象被回收的时候分别由两类线程来执行。
8. Recycler对象池的创建
  1.     private static final class RecyclerObjectPool<T> extends ObjectPool<T> {
  2.         //recycler对象池实例
  3.         private final Recycler<T> recycler;
  4.         RecyclerObjectPool(final ObjectCreator<T> creator) {
  5.              recycler = new Recycler<T>() {
  6.                 @Override
  7.                 protected T newObject(Handle<T> handle) {
  8.                     return creator.newObject(handle);
  9.                 }
  10.             };
  11.         }
  12.       
  13.         ..................省略............
  14.       }
复制代码
Netty 中的 Recycler 对象池是一个抽象类,里面封装了对象池的核心结构以及核心方法。在创建对象池的时候,我们往往会使用Recycler的匿名类来实现抽象方法 newObject 从而来定义对象池创建对象的行为。
  1. public abstract class Recycler<T> {
  2.    protected abstract T newObject(Handle<T> handle);
  3.    protected Recycler() {
  4.         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
  5.     }
  6.     protected Recycler(int maxCapacityPerThread) {
  7.         this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
  8.     }
  9.     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
  10.         this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
  11.     }
  12.     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
  13.                        int ratio, int maxDelayedQueuesPerThread) {
  14.         this(maxCapacityPerThread, maxSharedCapacityFactor, ratio, maxDelayedQueuesPerThread,
  15.                 DELAYED_QUEUE_RATIO);
  16.     }
  17.     //创建线程持有对象池的最大容量
  18.     private final int maxCapacityPerThread;
  19.     //所有回收线程可回收对象的总量(计算因子)
  20.     private final int maxSharedCapacityFactor;
  21.     //创建线程的回收比例
  22.     private final int interval;
  23.     //一个回收线程可帮助多少个创建线程回收对象
  24.     private final int maxDelayedQueuesPerThread;
  25.     //回收线程回收比例
  26.     private final int delayedQueueInterval;
  27.     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
  28.                        int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
  29.         interval = max(0, ratio);
  30.         delayedQueueInterval = max(0, delayedQueueRatio);
  31.         if (maxCapacityPerThread <= 0) {
  32.             this.maxCapacityPerThread = 0;
  33.             this.maxSharedCapacityFactor = 1;
  34.             this.maxDelayedQueuesPerThread = 0;
  35.         } else {
  36.             this.maxCapacityPerThread = maxCapacityPerThread;
  37.             this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
  38.             this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
  39.         }
  40.     }
  41. }
复制代码

当创建线程从Stack结构中的WeakOrderQueue链表中转移待回收对象到数组栈中后,availableSharedCapacity 的值也会相应增加。说白了这个值就是用来指示回收线程还能继续回收多少对象。已达到控制回收线程回收对象的总体容量。

介绍完Stack结构中的这些重要属性,创建的过程就很简单了。就是利用前边介绍过的已经初始化好的Recycler属性对Stack结构中的这些属性进行赋值。
  1.     //threadlocal保存每个线程对应的 stack结构
  2.     private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
  3.         @Override
  4.         protected Stack<T> initialValue() {
  5.             return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
  6.                     interval, maxDelayedQueuesPerThread, delayedQueueInterval);
  7.         }
  8.         
  9.         ..............省略..........
  10.     };
复制代码
  1. private static final class Stack<T> {
  2.         // 创建线程保存池化对象的stack结构所属对象池recycler实例
  3.         final Recycler<T> parent;
  4.         //用弱引用来关联当前stack对应的创建线程 因为用户可能在某个地方引用了defaultHandler -> stack -> thread,可能存在这个引用链
  5.         //当创建线程死掉之后 可能因为这个引用链的存在而导致thread无法被回收掉
  6.         final WeakReference<Thread> threadRef;
  7.         //所有回收线程能够帮助当前创建线程回收对象的总容量
  8.         final AtomicInteger availableSharedCapacity;
  9.         //当前Stack对应的创建线程作为其他创建线程的回收线程时可以帮助多少个线程回收其池化对象
  10.         private final int maxDelayedQueues;
  11.         //当前创建线程对应的stack结构中的最大容量。 默认4096个对象
  12.         private final int maxCapacity;
  13.         //当前创建线程回收对象时的回收比例
  14.         private final int interval;
  15.         //当前创建线程作为其他线程的回收线程时回收其他线程的池化对象比例
  16.         private final int delayedQueueInterval;
  17.         // 当前Stack中的数组栈 默认初始容量256,最大容量为4096
  18.         DefaultHandle<?>[] elements;
  19.         //数组栈 栈顶指针
  20.         int size;
  21.         //回收对象计数 与 interval配合 实现只回收一定比例的池化对象
  22.         private int handleRecycleCount;
  23.         //多线程回收的设计,核心还是无锁化,避免多线程回收相互竞争
  24.         //Stack结构中的WeakOrderQueue链表
  25.         private WeakOrderQueue cursor, prev;
  26.         private volatile WeakOrderQueue head;
  27. }
复制代码
9.2 从对象池中获取对象

  1.     private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
  2.         @Override
  3.         protected Stack<T> initialValue() {
  4.             return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
  5.                     interval, maxDelayedQueuesPerThread, delayedQueueInterval);
  6.         }
  7.       ..............省略............
  8.     }
复制代码
Recycler对外表现为一个整体的对象池,但是对象池内部是按照线程的维度来池化对象的,每个线程所池化的对象保存在对应的Stack结构中。
  1. static final class Entry {     private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator() {            @Override            public Entry newObject(Handle handle) {                return new Entry(handle);            }        });     private Entry(Handle handle) {            this.handle = handle;     }}
复制代码
9.3 DefaultHandler

前边我们在介绍对象池的设计原理时提到,池化对象在对象池中的存储模型为 Handler。
  1. public abstract class Recycler<T> {
  2.       //一个空的Handler,表示该对象不会被池化
  3.      private static final Handle NOOP_HANDLE = new Handle() {
  4.         @Override
  5.         public void recycle(Object object) {
  6.             // NOOP
  7.         }
  8.     };
  9.     public final T get() {
  10.         //如果对象池容量为0,则立马新创建一个对象返回,但是该对象不会回收进对象池
  11.         if (maxCapacityPerThread == 0) {
  12.             return newObject((Handle<T>) NOOP_HANDLE);
  13.         }
  14.         //获取当前线程 保存池化对象的stack
  15.         Stack<T> stack = threadLocal.get();
  16.         //从stack中pop出对象,handler是池化对象在对象池中的模型,包装了一些池化对象的回收信息和回收状态
  17.         DefaultHandle<T> handle = stack.pop();
  18.         //如果当前线程的stack中没有池化对象 则直接创建对象
  19.         if (handle == null) {
  20.             //初始化的handler对象recycleId和lastRecyclerId均为0
  21.             handle = stack.newHandle();
  22.             //newObject为对象池recycler的抽象方法,由使用者初始化内存池的时候 匿名提供
  23.             handle.value = newObject(handle);
  24.         }
  25.         return (T) handle.value;
  26.     }
  27. }
复制代码
在Recycler对象池中的默认实现是 DefaultHandler ,DefaultHandler 里面包裹了池化对象以及池化对象在对象池中的一些相关信息,(比如:池化对象的相关回收信息和回收状态)。
从结构设计角度上来说,池化对象是隶属于其创建线程对应的Stack结构的,由于这层结构关系的存在,池化对象的DefaultHandler应该由Stack来进行创建。
  1. static final class Entry {
  2.      private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
  3.             @Override
  4.             public Entry newObject(Handle<Entry> handle) {
  5.                 return new Entry(handle);
  6.             }
  7.         });
  8.      private Entry(Handle<Entry> handle) {
  9.             this.handle = handle;
  10.      }
  11. }
复制代码
我们来看下 DefaultHandler 的具体结构:
  1. public abstract class ObjectPool<T> {
  2.     public interface Handle<T> {
  3.         void recycle(T self);
  4.     }
  5. }
复制代码
DefaultHandler属性的第一部分信息,首先就是池化对象在对象池中的回收信息。
这里可能大家有疑问了,为什么池化对象的回收还要分最近回收和最终回收呢
因为对象池中的池化对象回收可以分为两种情况:
这两个字段 lastRecycledId ,recycleId 主要是用来标记池化对象所处的回收阶段,以及在这些回收阶段具体被哪个线程进行回收。
最后两个属性就比较容易理解了,一个是 Object value 用来包裹真正的池化对象。另一个是 Stack stack 用来强引用关联池化对象的Handler所属的Stack结构。
记不记得我们在介绍Stack结构的时候提到,Stack中持有其对应创建线程的弱引用。笔者在解释为什么持有创建线程的弱引用时,提到过这样一个引用链关系:池化对象 -> DefaultHandler -> Stack -> threadRef。这里大家明白了吗?
  1. private static final class Stack<T> {
  2.         DefaultHandle<T> newHandle() {
  3.             return new DefaultHandle<T>(this);
  4.         }
  5. }
复制代码
9.4 从Stack中获取池化对象

  1.    private static final class DefaultHandle<T> implements Handle<T> {
  2.         //用于标识最近被哪个线程回收,被回收之前均是0
  3.         int lastRecycledId;
  4.         //用于标识最终被哪个线程回收,在没被回收前是0
  5.         int recycleId;
  6.         //是否已经被回收
  7.         boolean hasBeenRecycled;
  8.         //强引用关联创建handler的stack
  9.         Stack<?> stack;
  10.         //池化对象
  11.         Object value;
  12.         DefaultHandle(Stack<?> stack) {
  13.             this.stack = stack;
  14.         }
  15.         @Override
  16.         public void recycle(Object object) {
  17.           ...................省略.............
  18.         }
  19.     }
复制代码
从代码结构上我们可以看出,Head结构的设计不只是作为头结点指针那么简单,其中还封装了很多链表操作以及回收的逻辑。
剩下Head结构中封装的相关逻辑处理方法,等到介绍到具体应用场景的时候,笔者在拿出来为大家介绍,这里先混个眼熟就行。先看懂个大概,脑海里朦朦胧胧有个粗浅的认识即可。
10.3 WeakOrderQueue中的重要属性
  1. static final class Entry {
  2.     //池化对象Entry强引用它的DefaultHandler
  3.     private  Handle<Entry> handle;
  4.   
  5. }
  6. private static final class DefaultHandle<T> implements Handle<T> {
  7.     // DefaultHandler强引用其所属的Stack
  8.     Stack<?> stack;
  9. }
  10. private static final class Stack<T> {
  11.     // Stack弱引用其对应的创建线程
  12.     final WeakReference<Thread> threadRef;
  13. }
复制代码
10.4 WeakOrderQueue结构的创建
  1.         DefaultHandle<T> pop() {
  2.             //普通出栈操作,从栈顶弹出一个回收对象
  3.             int size = this.size;
  4.             if (size == 0) {
  5.                 //如果当前线程所属stack已经没有对象可用,则遍历stack中的weakOrderQueue链表(其他线程帮助回收的对象存放在这里)将这些待回收对象回收进stack
  6.                 if (!scavenge()) {
  7.                     return null;
  8.                 }
  9.                 size = this.size;
  10.                 if (size <= 0) {
  11.                     // 如果WeakOrderQueue链表中也没有待回收对象可转移
  12.                     // 直接返回null 新创建一个对象
  13.                     return null;
  14.                 }
  15.             }
  16.             size --;
  17.             DefaultHandle ret = elements[size];
  18.             elements[size] = null;
  19.             this.size = size;
  20.             if (ret.lastRecycledId != ret.recycleId) {
  21.                 // 这种情况表示对象至少被一个线程回收了,要么是创建线程,要么是回收线程
  22.                 throw new IllegalStateException("recycled multiple times");
  23.             }
  24.             //对象初次创建以及回收对象再次使用时  它的 recycleId = lastRecycleId = 0
  25.             ret.recycleId = 0;
  26.             ret.lastRecycledId = 0;
  27.             return ret;
  28.         }
复制代码
在创建WeakOrderQueue结构的时候,首先会调用父类 WeakReference 的构造方法持有当前回收线程的弱应用。
然后创建第一个Link节点,head指针和tail指针同时指向这第一个节点。
用创建线程对应的Stack中的属性初始化WeakOrderQueue结构中的相关属性。
大家这里可能会问了,既然这里用Stack中的属性去初始化WeakOrderQueue结构中的相关属性,那为什么WeakOrderQueue不直接持有Stack的引用呢
之前我们提到,一个回收线程对应一个WeakOrderQueue节点,当回收线程挂掉的时候,需要清理WeakOrderQueue节点并将其从Stack结构中的WeakOrderQueue链表(头结点除外)中删除。使得WeakOrderQueue节点可以被GC回收掉。
如果Stack结构对应的创建线程挂掉,而此时WeakOrderQueue又持有了Stack的引用,这样就使得Stack结构无法被GC掉。
所以这里只会用Stack结构的相关属性去初始化WeakOrderQueue结构,在WeakOrderQueue中并不会持有Stack的引用。
在复杂程序结构的设计中,我们要时刻对对象之间的引用关系保持清晰的认识。防止内存泄露。
10.5 从WeakOrderQueue中转移回收对象

WeakOrderQueue的transfer方法用于将当前WeakOrderQueue节点中的待回收对象转移至创建线程对应的Stack中。
开始转移回收对象时会从WeakOrderQueue节点中的Link链表的头结点开始遍历,如果头结点中还有未被转移的对象,则将头结点剩余的未转移对象转移至Stack中。所以创建线程每次最多转移一个LINK_CAPACITY大小的对象至Stack中。只要成功转移了哪怕一个对象,transfer方法就会返回true。
如果头结点中存储的对象已经全部转移完毕,则更新head指针指向下一个Link节点,开始转移下一个Link节点。创建线程每次只会转移一个Link节点。如果Link链表是空的,没有转移成功一个对象,则transfer方法返回false。
由于transfer方法体比较大,笔者将其按照上述逻辑步骤拆分开来为大家讲解:

10.5.1 判断头结点中的待回收对象是否转移完毕
  1.        private boolean scavenge() {
  2.             //从其他线程回收的weakOrderQueue里 转移 待回收对像 到当前线程的stack中
  3.             if (scavengeSome()) {
  4.                 return true;
  5.             }
  6.             // 如果weakOrderQueue中没有待回收对象可转移,那么就重置stack中的cursor.prev
  7.             // 因为在扫描weakOrderQueue链表的过程中,cursor已经发生变化了
  8.             prev = null;
  9.             cursor = head;
  10.             return false;
  11.         }
复制代码
首先从Link链表的头结点开始转移,head == null 说明当前Link链表是空的并没有对象可被转移,直接返回false。
head.readIndex == LINK_CAPACITY 判断当前头结点中的对象是否已经被转移完毕,如果当前头结点中的对象已经被全部转移完毕,则将head指针更新 relink 为下一个节点,开始从下一个节点开始转移对象。如果此时Link链表已经为空了,直接返回false。
  1.         private boolean scavengeSome() {
  2.             WeakOrderQueue prev;
  3.             //获取当前线程stack 的weakOrderQueue链表指针(本次扫描起始节点)
  4.             WeakOrderQueue cursor = this.cursor;
  5.             //在stack初始化完成后,cursor,prev,head等指针全部是null,这里如果cursor == null 意味着当前stack第一次开始扫描weakOrderQueue链表
  6.             if (cursor == null) {
  7.                 prev = null;
  8.                 cursor = head;
  9.                 if (cursor == null) {
  10.                     //说明目前weakOrderQueue链表里还没有节点,并没有其他线程帮助回收的池化对象
  11.                     return false;
  12.                 }
  13.             } else {
  14.                 //获取prev指针,用于操作链表(删除当前cursor节点)
  15.                 prev = this.prev;
  16.             }
  17.             boolean success = false;
  18.             //循环遍历weakOrderQueue链表 转移待回收对象
  19.             do {
  20.                 //将weakOrderQueue链表中当前节点中包含的待回收对象,转移到当前stack中,一次转移一个link
  21.                 if (cursor.transfer(this)) {
  22.                     success = true;
  23.                     break;
  24.                 }
  25.                 //如果当前cursor节点没有待回收对象可转移,那么就继续遍历链表获取下一个weakOrderQueue节点
  26.                 WeakOrderQueue next = cursor.getNext();
  27.                 //如果当前weakOrderQueue对应的回收线程已经挂掉了,则
  28.                 if (cursor.get() == null) {
  29.                     // 判断当前weakOrderQueue节点是否还有可回收对象
  30.                     if (cursor.hasFinalData()) {
  31.                         //回收weakOrderQueue中最后一点可回收对象,因为对应的回收线程已经死掉了,这个weakOrderQueue不会再有任何对象了
  32.                         for (;;) {
  33.                             if (cursor.transfer(this)) {
  34.                                 success = true;
  35.                             } else {
  36.                                 break;
  37.                             }
  38.                         }
  39.                     }
  40.                     //回收线程以死,对应的weaoOrderQueue节点中的最后一点待回收对象也已经回收完毕,就需要将当前节点从链表中删除。unlink当前cursor节点
  41.                     //这里需要注意的是,netty永远不会删除第一个节点,因为更新头结点是一个同步方法,避免更新头结点而导致的竞争开销
  42.                     // prev == null 说明当前cursor节点是头结点。不用unlink,如果不是头结点 就将其从链表中删除,因为这个节点不会再有线程来收集池化对象了
  43.                     if (prev != null) {
  44.                         //确保当前weakOrderQueue节点在被GC之前,我们已经回收掉它所有的占用空间
  45.                         cursor.reclaimAllSpaceAndUnlink();
  46.                         //利用prev指针删除cursor节点
  47.                         prev.setNext(next);
  48.                     }
  49.                 } else {
  50.                     prev = cursor;
  51.                 }
  52.                 //向后移动prev,cursor指针继续遍历weakOrderQueue链表
  53.                 cursor = next;
  54.             } while (cursor != null && !success);
  55.             this.prev = prev;
  56.             this.cursor = cursor;
  57.             return success;
  58.         }
复制代码
10.5.2 根据本次转移对象容量评估是否应该对Stack进行扩容

此时Head节点已经校验完毕,可以执行正常的转移逻辑了。但在转移逻辑正式开始之前,还需要对本次转移对象的容量进行计算,并评估Stack的当前容量是否可以容纳的下,如果Stack的当前容量不够,则需要对Stack进行扩容。
  1.         if (cursor.transfer(this)) {
  2.             success = true;
  3.             break;
  4.         }
  5.         WeakOrderQueue next = cursor.getNext();
复制代码
获取Link链表头结点的readIndex和writeIndex,通过  writeIndex - readIndex 计算出当前头结点有多少可被转移的对象。
Stack的最终容量为: expectedCapacity = stack当前容量 + 转移对象的容量。
如果计算得出转移后Stack的最终容量 expectedCapacity 超过了Stack的当前容量则需要对Stack进行扩容。根据扩容后的容量最终决定本次转移多少对象: min(srcStart + actualCapacity - dstSize, srcEnd) ,确保不能超过Stack可容纳的空间。
  1. private static final class WeakOrderQueue extends WeakReference<Thread> {
  2.     ............WeakOrderQueue本身就是一个弱引用,引用对应的回收线程.........
  3. }
复制代码
DefaultHandler中的 recycle 方法逻辑比较简单,唯一不好理解的地方在于判断对象是否已经被回收的 if 条件语句。
忘记的同学可以在回看下《9.3 从Stack中获取池化对象》小节,那里详细介绍了 recycleId 和 lastRecycledId 之间各种关系的变化及其含义

11.1 回收对象至Stack中——啊哈!Bug!
  1.         if (prev != null) {
  2.               cursor.reclaimAllSpaceAndUnlink();
  3.               //利用prev指针删除cursor节点
  4.               prev.setNext(next);
  5.         }
复制代码
这里会进入到池化对象DefaultHandler中持有的Stack中,在Stack中进行对象的回收。
大家这里先不要看笔者下面的解释,试着自己着重分析下这个 if...else...逻辑判断,有没有发现什么问题??Bug就在这里!!
这里首先会判断当前回收线程是否为池化对象的创建线程:threadRef.get() == currentThread)。如果是,则由创建线程直接回收 pushNow(item) 。
如果 threadRef.get() != currentThread) 这里有两种情况:
Bug产生的场景如下如所示:

在第二种情况下,Netty还有一个重要的场景没有考虑到,会导致内存泄露!!
什么场景呢?大家再来回顾下池化对象与对象池之间的引用关系图:

这里我们看到池化对象会引用DefaultHandler,而DefaultHandler又强引用了Stack。于是就形成了这样一条引用链:

而池化对象是对外暴露的,用户可能在某个地方一直引用着这个池化对象,如果创建线程挂掉,并被GC回收之后,那么其在对象池中对应的Stack也应该被回收,因为Stack里保存的回收对象将再也不会被用到了。但是因为这条引用链的存在,导致Stack无法被GC回收从而造成内存泄露!
11.2 笔者反手一个PR,修复这个Bug!

现在Bug产生的原因和造成的影响,笔者为大家已经分析清楚了,那么接下来的解决方案就变得很简单了。
笔者先向Netty社区提了一个 Issue11864 来说明这个问题。
Issue11864 : https://github.com/netty/netty/issues/11864
然后直接提了 PR11865 来修复这个Bug。
PR : https://github.com/netty/netty/pull/11865
PR中主要的修改点分为以下两点:
以下代码为笔者提交的PR中的修复方案,主要增加了对 threadRef.get()  == null 情况的处理,并添加了详细注释。
  1.         //整个recycler对象池唯一的一个同步方法,而且同步块非常小,逻辑简单,执行迅速
  2.         synchronized void setHead(WeakOrderQueue queue) {
  3.             //始终在weakOrderQueue链表头结点插入新的节点
  4.             queue.setNext(head);
  5.             head = queue;
  6.         }
复制代码
11.3 PR的后续

当笔者提交了 PR11865之后,得到了相关作者如下回复。

巧合的是Netty也意识到了对象池这块的问题,Netty最近也正在重构 Recycler 这一块,因为Recycler整体设计的还是比较复杂的,这从我们这篇源码解析的文章中也可以看的出来,Recycler的复杂性在于它的使用场景混合了并发以及与GC相关的交互,这些相关的问题都比较难以定位,所以Netty决定将对象池这一块用一种更加容易被理解的方式重构掉。
相关的重构内容大家可以看作者的这个commit。
重构commit:https://github.com/netty/netty/commit/28b9834612638ffec4948c0c650d04f766f20690
重构后的Recycler对象池在4.1.71.Final版本已经发布。笔者后续也会为大家安排一篇重构后的Recycler对象池源码解析,但是本文还是聚焦于4.1.71.Final之前版本的对象池介绍,虽然被重构了,但是这里也有很多的设计思想和多线程程序设计细节非常值得我们学习!
4.1.71.Final版本发布之后,笔者想的是后面抽空看下重构后的对象池实现,哈哈,只要谓语动词出现—— ”想的是.....“ 类似这样的句式,估计就没有以后了,哈哈。笔者还是大意了,这个 Issue11864 : https://github.com/netty/netty/issues/11864 在过了几个月之后在社区里又被讨论了起来。有人发现在4.1.71.Final对象池重构后的版本中笔者提到的这些问题还是存在的。
于是作者 chrisvest 又 提了一个 PR11996 最终在 4.1.74.Final版本中修复了笔者提的这个 Issue11864。
PR11996 :https://github.com/netty/netty/pull/11996
随口提一句,这个大牛 chrisvest 是大名鼎鼎的图数据库 Neo4j 的核心commitor,同时也是Netty Buffer相关API的设计者。
这里笔者将这个Bug在 4.1.74.Final 版本中的最终修复方案和大家说明一下,收个尾。
在重构后的版本中引入了 LocalPool 来代替我们前边介绍的Stack。LocalPool中的pooledHandles大家可以简单认为类似Stack中数组栈的功能。
  1.     private static final class WeakOrderQueue extends WeakReference<Thread> {
  2.         // link结构是用于真正存储待回收对象的结构,继承AtomicInteger 本身可以用来当做writeindex使用
  3.         static final class Link extends AtomicInteger {
  4.             //数组用来存储待回收对象,容量为16
  5.             final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
  6.             int readIndex;
  7.             //weakOrderQueue中的存储结构时由link结构节点元素组成的链表结构
  8.             Link next;
  9.         }
  10. }
复制代码
  1.         // weakOrderQueue内部link链表的头结点
  2.         private static final class Head {
  3.             //所有回收线程能够帮助创建线程回收对象的总容量 reserveSpaceForLink方法中会多线程操作该字段
  4.             //用于指示当前回收线程是否继续为创建线程回收对象,所有回收线程都可以看到,这个值是所有回收线程共享的。以便可以保证所有回收线程回收的对象总量不能超过availableSharedCapacity
  5.             private final AtomicInteger availableSharedCapacity;
  6.             //link链表的头结点
  7.             Link link;
  8.             Head(AtomicInteger availableSharedCapacity) {
  9.                 this.availableSharedCapacity = availableSharedCapacity;
  10.             }
  11.             void reclaimAllSpaceAndUnlink() {
  12.                     ....回收head节点的所有空间,并从链表中删除head节点,head指针指向下一节点....
  13.             }
  14.             private void reclaimSpace(int space) {
  15.                 //所有回收线程都可以看到,这个值是所有回收线程共享的。以便可以保证所有回收线程回收的对象总量不能超过availableSharedCapacity
  16.                 availableSharedCapacity.addAndGet(space);
  17.             }
  18.             //参数link为新的head节点,当前head指针指向的节点已经被回收完毕
  19.             void relink(Link link) {
  20.                   ...回收当前头结点的容量,更新head节点为指定的Link节点...
  21.             }
  22.             Link newLink() {
  23.                   ....创建新的Link节点...
  24.             }
  25.             //此处目的是为接下来要创建的link预留空间容量
  26.             static boolean reserveSpaceForLink(AtomicInteger availableSharedCapacity) {               
  27.                   ...在创建新的Link节点之前需要调用该方法预订容量空间...
  28.             }
  29.         }
复制代码
通过以上两个措施 就保证了 当创建线程被GC掉之后,它对应的 在对象池中的回收缓存LocalPool(类比Stack)不会出现内存泄露,同时保证了多线程不在将回收对象至已经被清理的LocalPool中。
好了,这一块的Bug修改我们介绍完了,我们继续多线程回收对象主流程的介绍:
11.4 创建线程直接回收对象
  1. private static final class WeakOrderQueue extends WeakReference<Thread> {
  2.         //link链表的头结点,head指针始终指向第一个未被转移完毕的LinK节点
  3.         private final Head head;
  4.         //尾结点
  5.         private Link tail;
  6.         //站在stack的视角中,stack中包含一个weakOrderQueue的链表,每个回收线程为当前stack回收的对象存放在回收线程对应的weakOrderQueue中
  7.         //这样通过stack中的这个weakOrderQueue链表,就可以找到其他线程为该创建线程回收的对象
  8.         private WeakOrderQueue next;
  9.         //回收线程回收Id,每个weakOrderQueue分配一个,同一个stack下的一个回收线程对应一个weakOrderQueue节点
  10.         private final int id = ID_GENERATOR.getAndIncrement();
  11.         //回收线程回收比例 默认是8
  12.         private final int interval;
  13.         //回收线程回收计数 回收1/8的对象
  14.         private int handleRecycleCount;
  15. }
复制代码
而这个WeakHashMap 的size即表示当前回收线程已经在为多少个创建线程回收对象了,size的值不能超过 maxDelayedQueuesPerThread 。
这里为什么要用WeakHashMap呢?
其实我们前边多少也提到过了,考虑到一种极端的情况就是当创建线程挂掉并且被GC回收之后,其实这个创建线程对应的Stack结构已经没有用了,存储在Stack结构中的池化对象永远不会再被使用到,此时回收线程完全就没有必要在为挂掉的创建线程回收对象了。而这个Stack结构如果没有任何引用链存在的话,随后也会被GC回收。那么这个Stack结构在WeakHashMap中对应的Entry也会被自动删除。如果这里不采用WeakHashMap,那么回收线程为该Stack回收的对象就会一直停留在回收线程中。
介绍完这些背景知识,下面我们就来正式介绍下回收线程到底是如何帮助创建线程回收对象的:
  1. private static final class WeakOrderQueue extends WeakReference<Thread> {
  2.         //为了使stack能够被GC,这里不会持有其所属stack的引用
  3.         private WeakOrderQueue(Stack<?> stack, Thread thread) {
  4.             //weakOrderQueue持有对应回收线程的弱引用
  5.             super(thread);
  6.             //创建尾结点
  7.             tail = new Link();
  8.             // 创建头结点  availableSharedCapacity = maxCapacity / maxSharedCapacityFactor
  9.             head = new Head(stack.availableSharedCapacity);
  10.             head.link = tail;
  11.             interval = stack.delayedQueueInterval;
  12.             handleRecycleCount = interval;
  13.         }
  14. }
复制代码
而这里在回收线程向WeakOrderQueue节点添加回收对象时先将 handle.stack设置为 null,而在转移回收对象时又将 handle.stack 设置回来,这不是多此一举吗?
其实并不是多此一举,这样设计是非常有必要的,我们假设一种极端的情况,当创建线程挂掉并被GC回收之后,其实stack中存储的回收对象已经不可能在被使用到了,stack应该也被回收掉。但是如果这里回收线程在回收的时候不将对象持有的stack设置为null的话,直接添加到了WeakOrderQueue节点中,当创建被GC掉的时候,由于这条引用链的存在导致对应stack永远不会被GC掉,造成内存泄露。
所以笔者在本文中多次强调,当我们在设计比较复杂的程序结构时,对于对象之间的引用关系,一定要时刻保持清晰的认识,防止内存泄露。
第二:为什么最后使用lazySet来更新尾结点的writeIndex
当我们向Link链表的尾结点添加完回收对象之后,在更新尾结点的writeIndex时,使用到了延时更新,而延时更新并不会保证多线程的可见性,如果此时创建线程正在转移对象,那么将不会看到新添加进来的回收对象了。
而事实上,我们这里并不需要保证线程之间的实时可见性,只需要保证最终可见性即可。
确实在当创建线程转移对象的时候可能并不会看到刚刚被回收线程新添加进来的回收对象,看不到没关系,创建线程大不了在本次转移中不回收它不就完了么。因为只要创建线程Stack结构中的数组栈为空,创建线程就会从WeakOrderQueue链表中转移对象,以后会有很多次机会来WeakOrderQueu链表中转移对象,什么时候看见了,什么时候转移它。并不需要实时性。退一万步讲,即使全部看不到,大不了创建线程直接创建一个对象返回就行了。
而如果这里要保证线程之间的实时可见性,在更新尾结点的writeIndex的时候就不得不插入 LOCK 前缀内存屏障指令保证多线程之间的实时可见性,而执行内存屏障指令是需要开销的,所以为了保证WeakOrderQueue的写入性能,Netty这里选择了只保证最终可见性而不保证实时可见性。
总结

到这里关于Recycler对象池的整个设计与源码实现,笔者就为大家详细的剖析完毕了,在剖析的过程中,我们提炼出了很多多线程并发程序的设计要点和注意事项。大家可以在日常开发工作中多多体会并实践。
虽然本文介绍的Recycler对象池整体设计将会在4.1.71.Final版本被重构,但是在当前版本Recycler对象池的设计和实现中,我们还是可以学习到很多东西的。
笔者真心十分佩服能够耐心看到这里的大家,不知不觉已经唠叨了三万多字了,谢谢大家的观看~~,大家记得晚餐时给自己加餐个鸡腿奖励一下自己,哈哈!!
阅读原文
欢迎关注公众号:bin的技术小屋

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4