本系列Netty源码解析文章基于 4.1.56.Final版本,公众号:bin的技术小屋

前文回顾
在前边的系列文章中,我们从内核如何收发网络数据开始以一个C10K的问题作为主线详细从内核角度阐述了网络IO模型的演变,最终在此基础上引出了Netty的网络IO模型如下图所示:

详细内容可回看《从内核角度看IO模型的演变》
后续我们又围绕着Netty的主从Reactor网络IO线程模型,在《Reactor模型在Netty中的实现》一文中详细阐述了Netty的主从Reactor模型的创建,以及介绍了Reactor模型的关键组件。搭建了Netty的核心骨架如下图所示:

在核心骨架搭建完毕之后,我们随后又在《详细图解Reactor启动全流程》一文中阐述了Reactor启动的全流程,一个非常重要的核心组件NioServerSocketChannel开始在这里初次亮相,承担着一个网络框架最重要的任务--高效接收网络连接。我们介绍了NioServerSocketChannel的创建,初始化,向Main Reactor注册并监听OP_ACCEPT事件的整个流程。在此基础上,Netty得以整装待发,枕戈待旦开始迎接海量的客户端连接。

随后紧接着我们在《Netty如何高效接收网络连接》一文中详细介绍了Netty高效接收客户端网络连接的全流程,在这里Netty的核心重要组件NioServerSocketChannel开始正是登场,在NioServerSocketChannel中我们创建了客户端连接NioSocketChannel,并详细介绍了NioSocketChannel的初始化过程,随后通过在NioServerSocketChannel的pipeline中触发ChannelRead事件,并最终在ServerBootstrapAcceptor中将客户端连接NioSocketChannel注册到Sub Reactor中开始监听客户端连接上的OP_READ事件,准备接收客户端发送的网络数据也就是本文的主题内容。

自此Netty的核心组件全部就绪并启动完毕,开始起飞~~~

之前文章中的主角是Netty中主Reactor组中的Main Reactor以及注册在Main Reactor上边的NioServerSocketChannel,那么从本文开始,我们文章中的主角就切换为Sub Reactor以及注册在SubReactor上的NioSocketChannel了。
下面就让我们正式进入今天的主题,看一下Netty是如何处理OP_READ事件以及如何高效接收网络数据的。
1. Sub Reactor处理OP_READ事件流程总览

客户端发起系统IO调用向服务端发送数据之后,当网络数据到达服务端的网卡并经过内核协议栈的处理,最终数据到达Socket的接收缓冲区之后,Sub Reactor轮询到NioSocketChannel上的OP_READ事件就绪,随后Sub Reactor线程就会从JDK Selector上的阻塞轮询APIselector.select(timeoutMillis)调用中返回。转而去处理NioSocketChannel上的OP_READ事件。
注意这里的Reactor为负责处理客户端连接的Sub Reactor。连接的类型为NioSocketChannel,处理的事件为OP_READ事件。
在之前的文章中笔者已经多次强调过了,Reactor在处理Channel上的IO事件入口函数为NioEventLoop#processSelectedKey 。- public final class NioEventLoop extends SingleThreadEventLoop {
- private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
- final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
- ..............省略.................
- try {
- int readyOps = k.readyOps();
- if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
- ..............处理OP_CONNECT事件.................
- }
- if ((readyOps & SelectionKey.OP_WRITE) != 0) {
- ..............处理OP_WRITE事件.................
- }
- if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
- //本文重点处理OP_ACCEPT事件
- unsafe.read();
- }
- } catch (CancelledKeyException ignored) {
- unsafe.close(unsafe.voidPromise());
- }
- }
- }
复制代码 这里需要重点强调的是,当前的执行线程现在已经变成了Sub Reactor,而Sub Reactor上注册的正是netty客户端NioSocketChannel负责处理连接上的读写事件。
所以这里入口函数的参数AbstractNioChannel ch则是IO就绪的客户端连接NioSocketChannel。
开头通过ch.unsafe()获取到的NioUnsafe操作类正是NioSocketChannel中对底层JDK NIO SocketChannel的Unsafe底层操作类。实现类型为NioByteUnsafe定义在下图继承结构中的AbstractNioByteChannel父类中。

下面我们到NioByteUnsafe#read方法中来看下Netty对OP_READ事件的具体处理过程:
2. Netty接收网络数据流程总览
我们直接按照老规矩,先从整体上把整个OP_READ事件的逻辑处理框架提取出来,让大家先总体俯视下流程全貌,然后在针对每个核心点位进行各个击破。

流程中相关置灰的步骤为Netty处理连接关闭时的逻辑,和本文主旨无关,我们这里暂时忽略,等后续笔者介绍连接关闭时,会单独开一篇文章详细为大家介绍。
从上面这张Netty接收网络数据总体流程图可以看出NioSocketChannel在接收网络数据的整个流程和我们在上篇文章《Netty如何高效接收网络连接》中介绍的NioServerSocketChannel在接收客户端连接时的流程在总体框架上是一样的。
NioSocketChannel在接收网络数据的过程处理中,也是通过在一个do{....}while(...)循环read loop中不断的循环读取连接NioSocketChannel上的数据。
同样在NioSocketChannel读取连接数据的read loop中也是受最大读取次数的限制。默认配置最多只能读取16次,超过16次无论此时NioSocketChannel中是否还有数据可读都不能在进行读取了。
这里read loop循环最大读取次数可在启动配置类ServerBootstrap中通过ChannelOption.MAX_MESSAGES_PER_READ选项设置,默认为16。- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定义次数)
复制代码 Netty这里为什么非得限制read loop的最大读取次数呢?为什么不在read loop中一次性把数据读取完呢?
这时候就是考验我们大局观的时候了,在前边的文章介绍中我们提到Netty的IO模型为主从Reactor线程组模型,在Sub Reactor Group中包含了多个Sub Reactor专门用于监听处理客户端连接上的IO事件。
为了能够高效有序的处理全量客户端连接上的读写事件,Netty将服务端承载的全量客户端连接分摊到多个Sub Reactor中处理,同时也能保证Channel上IO处理的线程安全性。
其中一个Channel只能分配给一个固定的Reactor。一个Reactor负责处理多个Channel上的IO就绪事件,Reactor与Channel之间的对应关系如下图所示:

而一个Sub Reactor上注册了多个NioSocketChannel,Netty不可能在一个NioSocketChannel上无限制的处理下去,要将读取数据的机会均匀分摊给其他NioSocketChannel,所以需要限定每个NioSocketChannel上的最大读取次数。
此外,Sub Reactor除了需要监听处理所有注册在它上边的NioSocketChannel中的IO就绪事件之外,还需要腾出事件来处理有用户线程提交过来的异步任务。从这一点看,Netty也不会一直停留在NioSocketChannel的IO处理上。所以限制read loop的最大读取次数是非常必要的。
关于Reactor的整体运转架构,对细节部分感兴趣的同学可以回看下笔者的《一文聊透Netty核心引擎Reactor的运转架构》这篇文章。
所以基于这个原因,我们需要在read loop循环中,每当通过doReadBytes方法从NioSocketChannel中读取到数据时(方法返回值会大于0,并记录在allocHandle.lastBytesRead中),都需要通过allocHandle.incMessagesRead(1)方法统计已经读取的次数。当达到16次时不管NioSocketChannel是否还有数据可读,都需要在read loop末尾退出循环。转去执行Sub Reactor上的异步任务。以及其他NioSocketChannel上的IO就绪事件。平均分配,雨露均沾!!- public abstract class MaxMessageHandle implements ExtendedHandle {
- //read loop总共读取了多少次
- private int totalMessages;
- @Override
- public final void incMessagesRead(int amt) {
- totalMessages += amt;
- }
- }
复制代码 本次read loop读取到的数据大小会记录在allocHandle.lastBytesRead中- public abstract class MaxMessageHandle implements ExtendedHandle {
- //本次read loop读取到的字节数
- private int lastBytesRead;
- //整个read loop循环总共读取的字节数
- private int totalBytesRead;
- @Override
- public void lastBytesRead(int bytes) {
- lastBytesRead = bytes;
- if (bytes > 0) {
- totalBytesRead += bytes;
- }
- }
- }
复制代码
- lastBytesRead < 0:表示客户端主动发起了连接关闭流程,Netty开始连接关闭处理流程。这个和本文的主旨无关,我们先不用管。后面笔者会专门用一篇文章来详解关闭流程。
- lastBytesRead = 0:表示当前NioSocketChannel上的数据已经全部读取完毕,没有数据可读了。本次OP_READ事件圆满处理完毕,可以开开心心的退出read loop。
- 当lastBytesRead > 0:表示在本次read loop中从NioSocketChannel中读取到了数据,会在NioSocketChannel的pipeline中触发ChannelRead事件。进而在pipeline中负责IO处理的ChannelHandelr中响应,处理网络请求。
 - public class EchoServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- .......处理网络请求,比如解码,反序列化等操作.......
- }
- }
复制代码 最后会在read loop循环的末尾调用allocHandle.continueReading()判断是否结束本次read loop循环。这里的结束循环条件的判断会比我们在介绍NioServerSocketChannel接收连接时的判断条件复杂很多,笔者会将这个判断条件的详细解析放在文章后面细节部分为大家解读,这里大家只需要把握总体核心流程,不需要关注太多细节。
总体上在NioSocketChannel中读取网络数据的read loop循环结束条件需要满足以下几点:
- 当前NioSocketChannel中的数据已经全部读取完毕,则退出循环。
- 本轮read loop如果没有读到任何数据,则退出循环。
- read loop的读取次数达到16次,退出循环。
当满足这里的read loop退出条件之后,Sub Reactor线程就会退出循环,随后会调用allocHandle.readComplete()方法根据本轮read loop总共读取到的字节数totalBytesRead来决定是否对用于接收下一轮OP_READ事件数据的ByteBuffer进行扩容或者缩容。
最后在NioSocketChannel的pipeline中触发ChannelReadComplete事件,通知ChannelHandler本次OP_READ事件已经处理完毕。
 - public class EchoServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- .......处理网络请求,比如解码,反序列化等操作.......
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) {
- ......本次OP_READ事件处理完毕.......
- ......决定是否向客户端响应处理结果......
- }
- }
复制代码 2.1 ChannelRead与ChannelReadComplete事件的区别
有些小伙伴可能对Netty中的一些传播事件触发的时机,或者事件之间的区别理解的不是很清楚,概念容易混淆。在后面的文章中笔者也会从源码的角度出发给大家说清楚Netty中定义的所有异步事件,以及这些事件之间的区别和联系和触发时机,传播机制。
这里我们主要探讨本文主题中涉及到的两个事件:ChannelRead事件与ChannelReadComplete事件。
从上述介绍的Netty接收网络数据流程总览中我们可以看出ChannelRead事件和ChannelReadComplete事件是不一样的,但是对于刚接触Netty的小伙伴来说从命名上乍一看感觉又差不多。
下面我们来看这两个事件之间的差别:
Netty服务端对于一次OP_READ事件的处理,会在一个do{}while()循环read loop中分多次从客户端NioSocketChannel中读取网络数据。每次读取我们分配的ByteBuffer容量大小,初始容量为2048。
- ChanneRead事件:一次循环读取一次数据,就触发一次ChannelRead事件。本次最多读取在read loop循环开始分配的DirectByteBuffer容量大小。这个容量会动态调整,文章后续笔者会详细介绍。
- ChannelReadComplete事件:当读取不到数据或者不满足continueReading 的任意一个条件就会退出read loop,这时就会触发ChannelReadComplete事件。表示本次OP_READ事件处理完毕。
这里需要特别注意下触发ChannelReadComplete事件并不代表NioSocketChannel中的数据已经读取完了,只能说明本次OP_READ事件处理完毕。因为有可能是客户端发送的数据太多,Netty读了16次还没读完,那就只能等到下次OP_READ事件到来的时候在进行读取了。
以上内容就是Netty在接收客户端发送网络数据的全部核心逻辑。目前为止我们还未涉及到这部分的主干核心源码,笔者想的是先给大家把核心逻辑讲解清楚之后,这样理解起来核心主干源码会更加清晰透彻。
经过前边对网络数据接收的核心逻辑介绍,笔者在把这张流程图放出来,大家可以结合这张图在来回想下主干核心逻辑。

下面笔者会结合这张流程图,给大家把这部分的核心主干源码框架展现出来,大家可以将我们介绍过的核心逻辑与主干源码做个一一对应,还是那句老话,我们要从主干框架层面把握整体处理流程,不需要读懂每一行代码,文章后续笔者会将这个过程中涉及到的核心点位给大家拆开来各个击破!!

3. 源码核心框架总览
- @Override
- public final void read() {
- final ChannelConfig config = config();
- ...............处理半关闭相关代码省略...............
- //获取NioSocketChannel的pipeline
- final ChannelPipeline pipeline = pipeline();
- //PooledByteBufAllocator 具体用于实际分配ByteBuf的分配器
- final ByteBufAllocator allocator = config.getAllocator();
- //自适应ByteBuf分配器 AdaptiveRecvByteBufAllocator ,用于动态调节ByteBuf容量
- //需要与具体的ByteBuf分配器配合使用 比如这里的PooledByteBufAllocator
- final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
- //allocHandler用于统计每次读取数据的大小,方便下次分配合适大小的ByteBuf
- //重置清除上次的统计指标
- allocHandle.reset(config);
- ByteBuf byteBuf = null;
- boolean close = false;
- try {
- do {
- //利用PooledByteBufAllocator分配合适大小的byteBuf 初始大小为2048
- @Override
- public void reset(ChannelConfig config) {
- this.config = config;
- //默认每次最多读取16次
- maxMessagePerRead = maxMessagesPerRead();
- totalMessages = totalBytesRead = 0;
- }
- //记录本次读取了多少字节数
- allocHandle.lastBytesRead(doReadBytes(byteBuf));
- //如果本次没有读取到任何字节,则退出循环 进行下一轮事件轮询
- if (allocHandle.lastBytesRead() <= 0) {
- // nothing was read. release the buffer.
- byteBuf.release();
- byteBuf = null;
- close = allocHandle.lastBytesRead() < 0;
- if (close) {
- ......表示客户端发起连接关闭.....
- }
- break;
- }
- //read loop读取数据次数+1
- allocHandle.incMessagesRead(1);
- //客户端NioSocketChannel的pipeline中触发ChannelRead事件
- pipeline.fireChannelRead(byteBuf);
- //解除本次读取数据分配的ByteBuffer引用,方便下一轮read loop分配
- byteBuf = null;
- } while (allocHandle.continueReading());//判断是否应该继续read loop
- //根据本次read loop总共读取的字节数,决定下次是否扩容或者缩容
- allocHandle.readComplete();
- //在NioSocketChannel的pipeline中触发ChannelReadComplete事件,表示一次read事件处理完毕
- //但这并不表示 客户端发送来的数据已经全部读完,因为如果数据太多的话,这里只会读取16次,剩下的会等到下次read事件到来后在处理
- pipeline.fireChannelReadComplete();
- .........省略连接关闭流程处理.........
- } catch (Throwable t) {
- ...............省略...............
- } finally {
- ...............省略...............
- }
- }
- }
复制代码 每次循环从NioSocketChannel中读取数据之后,都会调用allocHandle.incMessagesRead(1)。统计当前已经读取了多少次。如果超过了最大读取限制此时16次,就需要退出read loop。去处理其他NioSocketChannel上的IO事件。- final ByteBufAllocator allocator = config.getAllocator();
- final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
复制代码 在每次read loop循环的末尾都需要通过调用allocHandle.continueReading()来判断是否继续read loop循环读取NioSocketChannel中的数据。- public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
- @Override
- public Handle newHandle() {
- return new HandleImpl(minIndex, maxIndex, initial);
- }
-
- private final class HandleImpl extends MaxMessageHandle {
- .................省略................
- }
- }
复制代码
- attemptedBytesRead :表示当前ByteBuffer预计尝试要写入的字节数。
- lastBytesRead :表示本次read loop真实读取到了多少个字节。
defaultMaybeMoreSupplier 用于判断经过本次read loop读取数据后,ByteBuffer是否满载而归。如果是满载而归的话(attemptedBytesRead == lastBytesRead),表明可能NioSocketChannel里还有数据。如果不是满载而归,表明NioSocketChannel里没有数据了已经。
是否继续进行read loop需要同时满足以下几个条件:
- totalMessages < maxMessagePerRead 当前读取次数是否已经超过16次,如果超过,就退出do(...)while()循环。进行下一轮OP_READ事件的轮询。因为每个Sub Reactor管理了多个NioSocketChannel,不能在一个NioSocketChannel上占用太多时间,要将机会均匀地分配给Sub Reactor所管理的所有NioSocketChannel。
- totalBytesRead > 0 本次OP_READ事件处理是否读取到了数据,如果已经没有数据可读了,那么就直接退出read loop。
- !respectMaybeMoreData || maybeMoreDataSupplier.get() 这个条件比较复杂,它其实就是通过respectMaybeMoreData字段来控制NioSocketChannel中可能还有数据可读的情况下该如何处理。
- maybeMoreDataSupplier.get():true表示本次读取从NioSocketChannel中读取数据,ByteBuffer满载而归。说明可能NioSocketChannel中还有数据没读完。fasle表示ByteBuffer还没有装满,说明NioSocketChannel中已经没有数据可读了。
- respectMaybeMoreData = true表示要对可能还有更多数据进行处理的这种情况要respect认真对待,如果本次循环读取到的数据已经装满ByteBuffer,表示后面可能还有数据,那么就要进行读取。如果ByteBuffer还没装满表示已经没有数据可读了那么就退出循环。

- respectMaybeMoreData = false表示对可能还有更多数据的这种情况不认真对待 not respect。不管本次循环读取数据ByteBuffer是否满载而归,都要继续进行读取,直到读取不到数据在退出循环,属于无脑读取。
同时满足以上三个条件,那么read loop继续进行。继续从NioSocketChannel中读取数据,直到读取不到或者不满足三个条件中的任意一个为止。
3.2 从NioSocketChannel中读取数据
- public abstract class MaxMessageHandle implements ExtendedHandle {
- private ChannelConfig config;
- //用于控制每次read loop里最大可以循环读取的次数,默认为16次
- //可在启动配置类ServerBootstrap中通过ChannelOption.MAX_MESSAGES_PER_READ选项设置。
- private int maxMessagePerRead;
- //用于统计read loop中总共接收的连接个数,NioSocketChannel中表示读取数据的次数
- //每次read loop循环后会调用allocHandle.incMessagesRead增加记录接收到的连接个数
- private int totalMessages;
- //用于统计在read loop中总共接收到客户端连接上的数据大小
- private int totalBytesRead;
- //表示本次read loop 尝试读取多少字节,byteBuffer剩余可写的字节数
- private int attemptedBytesRead;
- //本次read loop读取到的字节数
- private int lastBytesRead;
-
- //预计下一次分配buffer的容量,初始:2048
- private int nextReceiveBufferSize;
- ...........省略.............
- }
复制代码 这里会直接调用底层JDK NIO的SocketChannel#read方法将数据读取到DirectByteBuffer中。读取数据大小为本次分配的DirectByteBuffer容量,初始为2048。
4. ByteBuffer动态自适应扩缩容机制
由于我们一开始并不知道客户端会发送多大的网络数据,所以这里先利用PooledByteBufAllocator分配一个初始容量为2048的DirectByteBuffer用于接收数据。- @Override
- public void reset(ChannelConfig config) {
- this.config = config;
- //默认每次最多读取16次
- maxMessagePerRead = maxMessagesPerRead();
- totalMessages = totalBytesRead = 0;
- }
复制代码 这就好比我们需要拿着一个桶去排队装水,但是第一次去装的时候,我们并不知道管理员会给我们分配多少水,桶拿大了也不合适拿小了也不合适,于是我们就先预估一个差不多容量大小的桶,如果分配的多了,我们下次就拿更大一点的桶,如果分配少了,下次我们就拿一个小点的桶。
在这种场景下,我们需要ByteBuffer可以自动根据每次网络数据的大小来动态自适应调整自己的容量。
而ByteBuffer动态自适应扩缩容机制依赖于AdaptiveRecvByteBufAllocator类的实现。让我们先回到AdaptiveRecvByteBufAllocator类的创建起点开始说起~~
4.1 AdaptiveRecvByteBufAllocator的创建
在前文《Netty是如何高效接收网络连接》中我们提到,当Main Reactor监听到OP_ACCPET事件活跃后,会在NioServerSocketChannel中accept完成三次握手的客户端连接。并创建NioSocketChannel,伴随着NioSocketChannel的创建其对应的配置类NioSocketChannelConfig类也会随之创建。- public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
- //扩容步长
- private static final int INDEX_INCREMENT = 4;
- //缩容步长
- private static final int INDEX_DECREMENT = 1;
- //RecvBuf分配容量表(扩缩容索引表)按照表中记录的容量大小进行扩缩容
- private static final int[] SIZE_TABLE;
- static {
- //初始化RecvBuf容量分配表
- List<Integer> sizeTable = new ArrayList<Integer>();
- //当分配容量小于512时,扩容单位为16递增
- for (int i = 16; i < 512; i += 16) {
- sizeTable.add(i);
- }
- //当分配容量大于512时,扩容单位为一倍
- for (int i = 512; i > 0; i <<= 1) {
- sizeTable.add(i);
- }
- //初始化RecbBuf扩缩容索引表
- SIZE_TABLE = new int[sizeTable.size()];
- for (int i = 0; i < SIZE_TABLE.length; i ++) {
- SIZE_TABLE[i] = sizeTable.get(i);
- }
- }
- }
复制代码 最终会在NioSocketChannelConfig的父类DefaultChannelConfig的构造器中创建AdaptiveRecvByteBufAllocator 。并保存在RecvByteBufAllocator rcvBufAllocator字段中。- @Override
- public ByteBuf allocate(ByteBufAllocator alloc) {
- return alloc.ioBuffer(guess());
- }
- @Override
- public int guess() {
- //预计下一次分配buffer的容量,一开始为2048
- return nextReceiveBufferSize;
- }
复制代码 在new AdaptiveRecvByteBufAllocator()创建AdaptiveRecvByteBufAllocator类实例的时候会先触发AdaptiveRecvByteBufAllocator类的初始化。
我们先来看下AdaptiveRecvByteBufAllocator类的初始化都做了些什么事情:
4.2 AdaptiveRecvByteBufAllocator类的初始化
- @Override
- public void lastBytesRead(int bytes) {
- lastBytesRead = bytes;
- if (bytes > 0) {
- totalBytesRead += bytes;
- }
- }
复制代码经常刷LeetCode的小伙伴肯定一眼就看出这个是二分查找的模板了。
它的目的就是根据给定容量,在扩缩容索引表SIZE_TABLE中,通过二分查找找到最贴近给定size的容量的索引下标(第一个大于等于 size的容量)
4.6 RecvByteBufAllocator.Handle
前边我们提到最终动态调整ByteBuffer容量的是由AdaptiveRecvByteBufAllocator中的Handler负责的,我们来看下这个allocHandle的创建过程。- @Override
- public final void incMessagesRead(int amt) {
- totalMessages += amt;
- }
复制代码 从allocHandle的获取过程我们看到最allocHandle的创建是由AdaptiveRecvByteBufAllocator#newHandle方法执行的。- @Override
- public boolean continueReading() {
- return continueReading(defaultMaybeMoreSupplier);
- }
- private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
- @Override
- public boolean get() {
- //判断本次读取byteBuffer是否满载而归
- return attemptedBytesRead == lastBytesRead;
- }
- };
- @Override
- public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
- return config.isAutoRead() &&
- (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
- totalMessages < maxMessagePerRead &&
- totalBytesRead > 0;
- }
复制代码 这里我们看到Netty中用于动态调整ByteBuffer容量的allocHandle 的实际类型为MaxMessageHandle 。
下面我们来介绍下HandleImpl 中的核心字段,它们都和ByteBuffer的容量有关:
- minIndex :最小容量在扩缩容索引表SIZE_TABE中的index。默认是3。
- maxIndex :最大容量在扩缩容索引表SIZE_TABE中的index。默认是38。
- index :当前容量在扩缩容索引表SIZE_TABE中的index。初始是33。
- nextReceiveBufferSize :预计下一次分配buffer的容量,初始为2048。在每次申请内存分配ByteBuffer的时候,采用nextReceiveBufferSize的值指定容量。
- decreaseNow : 是否需要进行缩容。
5. 使用堆外内存为ByteBuffer分配内存
AdaptiveRecvByteBufAllocator类只是负责动态调整ByteBuffer的容量,而具体为ByteBuffer申请内存空间的是由PooledByteBufAllocator负责。
5.1 类名前缀Pooled的来历
在我们使用Java进行日常开发过程中,在为对象分配内存空间的时候我们都会选择在JVM堆中为对象分配内存,这样做对我们Java开发者特别的友好,我们只管使用就好而不必过多关心这块申请的内存如何回收,因为JVM堆完全受Java虚拟机控制管理,Java虚拟机会帮助我们回收不再使用的内存。
但是JVM在进行垃圾回收时候的stop the world会对我们应用程序的性能造成一定的影响。
除此之外我们在《聊聊Netty那些事儿之从内核角度看IO模型》一文中介绍IO模型的时候提到,当数据达到网卡时,网卡会通过DMA的方式将数据拷贝到内核空间中,这是第一次拷贝。当用户线程在用户空间发起系统IO调用时,CPU会将内核空间的数据再次拷贝到用户空间。这是第二次拷贝。
于此不同的是当我们在JVM中发起IO调用时,比如我们使用JVM堆内存读取Socket接收缓冲区中的数据时,会多一次内存拷贝,CPU在第二次拷贝中将数据从内核空间拷贝到用户空间时,此时的用户空间站在JVM角度是堆外内存,所以还需要将堆外内存中的数据拷贝到堆内内存中。这就是第三次内存拷贝。
同理当我们在JVM中发起IO调用向Socket发送缓冲区写入数据时,JVM会将IO数据先拷贝到堆外内存,然后才能发起系统IO调用。
那为什么操作系统不直接使用JVM的堆内内存进行IO操作呢?
因为JVM的内存布局和操作系统分配的内存是不一样的,操作系统不可能按照JVM规范来读写数据,所以就需要第三次拷贝中间做个转换将堆外内存中的数据拷贝到JVM堆中。
所以基于上述内容,在使用JVM堆内内存时会产生以下两点性能影响:
- JVM在垃圾回收堆内内存时,会发生stop the world导致应用程序卡顿。
- 在进行IO操作的时候,会多产生一次由堆外内存到堆内内存的拷贝。
基于以上两点使用JVM堆内内存对性能造成的影响,于是对性能有卓越追求的Netty采用堆外内存也就是DirectBuffer来为ByteBuffer分配内存空间。
采用堆外内存为ByteBuffer分配内存的好处就是:
- 堆外内存直接受操作系统的管理,不会受JVM的管理,所以JVM垃圾回收对应用程序的性能影响就没有了。
- 网络数据到达之后直接在堆外内存上接收,进程读取网络数据时直接在堆外内存中读取,所以就避免了第三次内存拷贝。
所以Netty在进行 I/O 操作时都是使用的堆外内存,可以避免数据从 JVM 堆内存到堆外内存的拷贝。但是由于堆外内存不受JVM的管理,所以就需要额外关注对内存的使用和释放,稍有不慎就会造成内存泄露,于是Netty就引入了内存池对堆外内存进行统一管理。
PooledByteBufAllocator类的这个前缀Pooled就是内存池的意思,这个类会使用Netty的内存池为ByteBuffer分配堆外内存。
5.2 PooledByteBufAllocator的创建
创建时机
在服务端NioServerSocketChannel的配置类NioServerSocketChannelConfig以及客户端NioSocketChannel的配置类NioSocketChannelConfig实例化的时候会触发PooledByteBufAllocator的创建。- public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
- @Override
- protected int doReadBytes(ByteBuf byteBuf) throws Exception {
- final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
- allocHandle.attemptedBytesRead(byteBuf.writableBytes());
- return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
- }
- }
复制代码 创建出来的PooledByteBufAllocator实例保存在DefaultChannelConfig类中的ByteBufAllocator allocator字段中。
创建过程
- public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
- //扩容步长
- private static final int INDEX_INCREMENT = 4;
- //缩容步长
- private static final int INDEX_DECREMENT = 1;
- //RecvBuf分配容量表(扩缩容索引表)按照表中记录的容量大小进行扩缩容
- private static final int[] SIZE_TABLE;
- static {
- //初始化RecvBuf容量分配表
- List<Integer> sizeTable = new ArrayList<Integer>();
- //当分配容量小于512时,扩容单位为16递增
- for (int i = 16; i < 512; i += 16) {
- sizeTable.add(i);
- }
- //当分配容量大于512时,扩容单位为一倍
- for (int i = 512; i > 0; i <<= 1) {
- sizeTable.add(i);
- }
- //初始化RecbBuf扩缩容索引表
- SIZE_TABLE = new int[sizeTable.size()];
- for (int i = 0; i < SIZE_TABLE.length; i ++) {
- SIZE_TABLE[i] = sizeTable.get(i);
- }
- }
- }
复制代码- public NioSocketChannel(Channel parent, SocketChannel socket) {
- super(parent, socket);
- config = new NioSocketChannelConfig(this, socket.socket());
- }
复制代码 从ByteBufUtil类的初始化过程我们可以看出,在为ByteBuffer分配内存的时候是否使用内存池在Netty中是可以配置的。
- 通过系统变量-D io.netty.allocator.type 可以配置是否使用内存池为ByteBuffer分配内存。默认情况下是需要使用内存池的。但是在安卓系统中默认是不使用内存池的。
- 通过PooledByteBufAllocator.DEFAULT获取内存池ByteBuffer分配器。
- public class DefaultChannelConfig implements ChannelConfig {
- //用于Channel接收数据用的buffer分配器 AdaptiveRecvByteBufAllocator
- private volatile RecvByteBufAllocator rcvBufAllocator;
- public DefaultChannelConfig(Channel channel) {
- this(channel, new AdaptiveRecvByteBufAllocator());
- }
- }
复制代码由于本文的主线是介绍Sub Reactor处理OP_READ事件的完整过程,所以这里只介绍主线相关的内容,这里只是简单介绍下在接收数据的时候为什么会用PooledByteBufAllocator来为ByteBuffer分配内存。而内存池的架构设计比较复杂,所以笔者后面会单独写一篇关于Netty内存管理的文章。
总结
本文介绍了Sub Reactor线程在处理OP_READ事件的整个过程。并深入剖析了AdaptiveRecvByteBufAllocator类动态调整ByteBuffer容量的原理。
同时也介绍了Netty为什么会使用堆外内存来为ByteBuffer分配内存,并由此引出了Netty的内存池分配器PooledByteBufAllocator 。
在介绍AdaptiveRecvByteBufAllocator类和PooledByteBufAllocator一起组合实现动态地为ByteBuffer分配容量的时候,笔者不禁想起了多年前看过的《Effective Java》中第16条 复合优先于继承。
Netty在这里也遵循了这条军规,首先两个类设计的都是单一的功能。
- AdaptiveRecvByteBufAllocator类只负责动态的调整ByteBuffer容量,并不管具体的内存分配。
- PooledByteBufAllocator类负责具体的内存分配,用内存池的方式。
这样设计的就比较灵活,具体内存分配的工作交给具体的ByteBufAllocator,可以使用内存池的分配方式PooledByteBufAllocator,也可以不使用内存池的分配方式UnpooledByteBufAllocator。具体的内存可以采用JVM堆内内存(HeapBuffer),也可以使用堆外内存(DirectBuffer)。
而AdaptiveRecvByteBufAllocator只需要关注调整它们的容量工作就可以了,而并不需要关注它们具体的内存分配方式。
最后通过io.netty.channel.RecvByteBufAllocator.Handle#allocate方法灵活组合不同的内存分配方式。这也是装饰模式的一种应用。- public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
- //扩容步长
- private static final int INDEX_INCREMENT = 4;
- //缩容步长
- private static final int INDEX_DECREMENT = 1;
- //RecvBuf分配容量表(扩缩容索引表)按照表中记录的容量大小进行扩缩容
- private static final int[] SIZE_TABLE;
- static {
- //初始化RecvBuf容量分配表
- List<Integer> sizeTable = new ArrayList<Integer>();
- //当分配容量小于512时,扩容单位为16递增
- for (int i = 16; i < 512; i += 16) {
- sizeTable.add(i);
- }
- //当分配容量大于512时,扩容单位为一倍
- for (int i = 512; i > 0; i <<= 1) {
- sizeTable.add(i);
- }
- //初始化RecbBuf扩缩容索引表
- SIZE_TABLE = new int[sizeTable.size()];
- for (int i = 0; i < SIZE_TABLE.length; i ++) {
- SIZE_TABLE[i] = sizeTable.get(i);
- }
- }
- }
复制代码 好了,今天的内容就到这里,我们下篇文章见~~~~
阅读原文
欢迎关注公众号:bin的技术小屋
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |