ToB企服应用市场:ToB评测及商务社交产业平台

标题: 抓到 Netty 一个 Bug,顺带来透彻地聊一下 Netty 是如何高效接收网络连接的 [打印本页]

作者: 立山    时间: 2022-8-23 02:16
标题: 抓到 Netty 一个 Bug,顺带来透彻地聊一下 Netty 是如何高效接收网络连接的
本系列Netty源码解析文章基于 4.1.56.Final版本
对于一个高性能网络通讯框架来说,最最重要也是最核心的工作就是如何高效的接收客户端连接,这就好比我们开了一个饭店,那么迎接客人就是饭店最重要的工作,我们要先把客人迎接进来,不能让客人一看人多就走掉,只要客人进来了,哪怕菜做的慢一点也没关系。
本文笔者就来为大家介绍下netty这块最核心的内容,看看netty是如何高效的接收客户端连接的。
下图为笔者在一个月黑风高天空显得那么深邃遥远的夜晚,闲来无事,于是捧起Netty关于如何接收连接这部分源码细细品读的时候,意外的发现了一个影响Netty接收连接吞吐的一个Bug。

于是笔者就在Github提了一个Issue#11708,阐述了下这个Bug产生的原因以及导致的结果并和Netty的作者一起讨论了下修复措施。如上图所示。
Issue#11708:https://github.com/netty/netty/issues/11708
这里先不详细解释这个Issue,也不建议大家现在就打开这个Issue查看,笔者会在本文的介绍中随着源码深入的解读慢慢的为大家一层一层地拨开迷雾。
之所以在文章的开头把这个拎出来,笔者是想让大家带着怀疑,审视,欣赏,崇敬,敬畏的态度来一起品读世界顶级程序员编写的代码。由衷的感谢他们在这一领域做出的贡献。
好了,问题抛出来后,我们就带着这个疑问来开始本文的内容吧~~~

前文回顾

按照老规矩,再开始本文的内容之前,我们先来回顾下前边几篇文章的概要内容帮助大家梳理一个框架全貌出来。
笔者这里再次想和读者朋友们强调的是本文可以独立观看,并不依赖前边系列文章的内容,只是大家如果对相关细节部分感兴趣的话,可以在阅读完本文之后在去回看相关文章。
在前边的系列文章中,笔者为大家介绍了驱动Netty整个框架运转的核心引擎Reactor的创建,启动,运行的全流程。从现在开始Netty的整个核心框架就开始运转起来开始工作了,本文要介绍的主要内容就是Netty在启动之后要做的第一件事件:监听端口地址,高效接收客户端连接。
《聊聊Netty那些事儿之从内核角度看IO模型》一文中,我们是从整个网络框架的基石IO模型的角度整体阐述了下Netty的IO线程模型。
而Netty中的Reactor正是IO线程在Netty中的模型定义。Reactor在Netty中是以Group的形式出现的,分为:
最后我们得出Netty的整个IO模型如下:

本文我们讨论的重点就是MainReactorGroup的核心工作上图中所示的步骤1,步骤2,步骤3。
在从整体上介绍完Netty的IO模型之后,我们又在《Reactor在Netty中的实现(创建篇)》中完整的介绍了Netty框架的骨架主从Reactor组的搭建过程,阐述了Reactor是如何被创建出来的,并介绍了它的核心组件如下图所示:

在骨架搭建完毕之后,我们随后又在在《详细图解Netty Reactor启动全流程》》一文中介绍了本文的主角服务端NioServerSocketChannel的创建,初始化,绑定端口地址,向main reactor注册监听OP_ACCEPT事件的完整过程

main reactor如何处理OP_ACCEPT事件将会是本文的主要内容。
自此Netty框架的main reactor group已经启动完毕,开始准备监听OP_accept事件,当客户端连接上来之后,OP_ACCEPT事件活跃,main reactor开始处理OP_ACCEPT事件接收客户端连接了。
而netty中的IO事件分为:OP_ACCEPT事件,OP_READ事件,OP_WRITE事件和OP_CONNECT事件,netty对于IO事件的监听和处理统一封装在Reactor模型中,这四个IO事件的处理过程也是我们后续文章中要单独拿出来介绍的,本文我们聚焦OP_ACCEPT事件的处理。
而为了让大家能够对IO事件的处理有一个完整性的认识,笔者写了《一文聊透Netty核心引擎Reactor的运转架构》这篇文章,在文章中详细介绍了Reactor线程的整体运行框架。

Reactor线程会在一个死循环中996不停的运转,在循环中会不断的轮询监听Selector上的IO事件,当IO事件活跃后,Reactor从Selector上被唤醒转去执行IO就绪事件的处理,在这个过程中我们引出了上述四种IO事件的处理入口函数。
  1.     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  2.         //获取Channel的底层操作类Unsafe
  3.         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  4.         if (!k.isValid()) {
  5.             ......如果SelectionKey已经失效则关闭对应的Channel......
  6.         }
  7.         try {
  8.             //获取IO就绪事件
  9.             int readyOps = k.readyOps();
  10.             //处理Connect事件
  11.             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
  12.                 int ops = k.interestOps();
  13.                 //移除对Connect事件的监听,否则Selector会一直通知
  14.                 ops &= ~SelectionKey.OP_CONNECT;
  15.                 k.interestOps(ops);
  16.                 //触发channelActive事件处理Connect事件
  17.                 unsafe.finishConnect();
  18.             }
  19.             //处理Write事件
  20.             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
  21.                 ch.unsafe().forceFlush();
  22.             }
  23.              //处理Read事件或者Accept事件
  24.             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  25.                 unsafe.read();
  26.             }
  27.         } catch (CancelledKeyException ignored) {
  28.             unsafe.close(unsafe.voidPromise());
  29.         }
  30.     }
复制代码
本文笔者将会为大家重点介绍OP_ACCEPT事件的处理入口函数unsafe.read()的整个源码实现。
当客户端连接完成三次握手之后,main reactor中的selector产生OP_ACCEPT事件活跃,main reactor随即被唤醒,来到了OP_ACCEPT事件的处理入口函数开始接收客户端连接。
1. Main Reactor处理OP_ACCEPT事件


当Main Reactor轮询到NioServerSocketChannel上的OP_ACCEPT事件就绪时,Main Reactor线程就会从JDK Selector上的阻塞轮询APIselector.select(timeoutMillis)调用中返回。转而去处理NioServerSocketChannel上的OP_ACCEPT事件。
  1. public final class NioEventLoop extends SingleThreadEventLoop {
  2.     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  3.         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  4.         ..............省略.................
  5.         try {
  6.             int readyOps = k.readyOps();
  7.             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
  8.                ..............处理OP_CONNECT事件.................
  9.             }
  10.             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
  11.               ..............处理OP_WRITE事件.................
  12.             }
  13.             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  14.                 //本文重点处理OP_ACCEPT事件
  15.                 unsafe.read();
  16.             }
  17.         } catch (CancelledKeyException ignored) {
  18.             unsafe.close(unsafe.voidPromise());
  19.         }
  20.     }
  21. }
复制代码
Unsafe接口是Netty对Channel底层操作行为的封装,比如NioServerSocketChannel的底层Unsafe操作类干的事情就是绑定端口地址,处理OP_ACCEPT事件。
这里我们看到,Netty将OP_ACCEPT事件处理的入口函数封装在NioServerSocketChannel里的底层操作类Unsafe的read方法中。

而NioServerSocketChannel中的Unsafe操作类实现类型为NioMessageUnsafe定义在上图继承结构中的AbstractNioMessageChannel父类中。
下面我们到NioMessageUnsafe#read方法中来看下Netty对OP_ACCPET事件的具体处理过程:
2. 接收客户端连接核心流程框架总览

我们还是按照老规矩,先从整体上把整个OP_ACCEPT事件的逻辑处理框架提取出来,让大家先总体俯视下流程全貌,然后在针对每个核心点位进行各个击破。

main reactor线程是在一个do...while{...}循环read loop中不断的调用JDK NIO serverSocketChannel.accept()方法来接收完成三次握手的客户端连接NioSocketChannel的,并将接收到的客户端连接NioSocketChannel临时保存在List readBuf集合中,后续会服务端NioServerSocketChannel的pipeline中通过ChannelRead事件来传递,最终会在ServerBootstrapAcceptor这个ChannelHandler中被处理初始化,并将其注册到Sub Reator Group中。
这里的read loop循环会被限定只能读取16次,当main reactor从NioServerSocketChannel中读取客户端连接NioSocketChannel的次数达到16次之后,无论此时是否还有客户端连接都不能在继续读取了。
因为我们在《一文聊透Netty核心引擎Reactor的运转架构》一文中提到,netty对reactor线程压榨的比较狠,要干的事情很多,除了要监听轮询IO就绪事件,处理IO就绪事件,还需要执行用户和netty框架本省提交的异步任务和定时任务。
所以这里的main reactor线程不能在read loop中无限制的执行下去,因为还需要分配时间去执行异步任务,不能因为无限制的接收客户端连接而耽误了异步任务的执行。所以这里将read loop的循环次数限定为16次。
如果main reactor线程在read loop中读取客户端连接NioSocketChannel的次数已经满了16次,即使此时还有客户端连接未接收,那么main reactor线程也不会再去接收了,而是转去执行异步任务,当异步任务执行完毕后,还会在回来执行剩余接收连接的任务。

main reactor线程退出read loop循环的条件有两个:
以上就是Netty在接收客户端连接时的整体核心逻辑,下面笔者将这部分逻辑的核心源码实现框架提取出来,方便大家根据上述核心逻辑与源码中的处理模块对应起来,还是那句话,这里只需要总体把握核心处理流程,不需要读懂每一行代码,笔者会在文章的后边分模块来各个击破它们。
  1. public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
  2.   private final class NioMessageUnsafe extends AbstractNioUnsafe {
  3.         //存放连接建立后,创建的客户端SocketChannel
  4.         private final List<Object> readBuf = new ArrayList<Object>();
  5.         @Override
  6.         public void read() {
  7.             //必须在Main Reactor线程中执行
  8.             assert eventLoop().inEventLoop();
  9.             //注意下面的config和pipeline都是服务端ServerSocketChannel中的
  10.             final ChannelConfig config = config();
  11.             final ChannelPipeline pipeline = pipeline();
  12.             //创建接收数据Buffer分配器(用于分配容量大小合适的byteBuffer用来容纳接收数据)
  13.             //在接收连接的场景中,这里的allocHandle只是用于控制read loop的循环读取创建连接的次数。
  14.             final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
  15.             allocHandle.reset(config);
  16.             boolean closed = false;
  17.             Throwable exception = null;
  18.             try {
  19.                 try {
  20.                     do {
  21.                         //底层调用NioServerSocketChannel->doReadMessages 创建客户端SocketChannel
  22.                         int localRead = doReadMessages(readBuf);
  23.                         //已无新的连接可接收则退出read loop
  24.                         if (localRead == 0) {
  25.                             break;
  26.                         }
  27.                         if (localRead < 0) {
  28.                             closed = true;
  29.                             break;
  30.                         }
  31.                         //统计在当前事件循环中已经读取到得Message数量(创建连接的个数)
  32.                         allocHandle.incMessagesRead(localRead);
  33.                     } while (allocHandle.continueReading());//判断是否已经读满16次
  34.                 } catch (Throwable t) {
  35.                     exception = t;
  36.                 }
  37.                 int size = readBuf.size();
  38.                 for (int i = 0; i < size; i ++) {
  39.                     readPending = false;
  40.                     //在NioServerSocketChannel对应的pipeline中传播ChannelRead事件
  41.                     //初始化客户端SocketChannel,并将其绑定到Sub Reactor线程组中的一个Reactor上
  42.                     pipeline.fireChannelRead(readBuf.get(i));
  43.                 }
  44.                 //清除本次accept 创建的客户端SocketChannel集合
  45.                 readBuf.clear();
  46.                 allocHandle.readComplete();
  47.                 //触发readComplete事件传播
  48.                 pipeline.fireChannelReadComplete();
  49.                 ....................省略............
  50.             } finally {
  51.                 ....................省略............
  52.             }
  53.         }
  54.     }
  55.   }
  56. }
复制代码
这里首先要通过断言 assert eventLoop().inEventLoop()确保处理接收客户端连接的线程必须为Main Reactor 线程。
而main reactor中主要注册的是服务端NioServerSocketChannel,主要负责处理OP_ACCEPT事件,所以当前main reactor线程是在NioServerSocketChannel中执行接收连接的工作。
所以这里我们通过config()获取到的是NioServerSocketChannel的属性配置类NioServerSocketChannelConfig,它是在Reactor的启动阶段被创建出来的。
  1.     public NioServerSocketChannel(ServerSocketChannel channel) {
  2.         //父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT
  3.         super(null, channel, SelectionKey.OP_ACCEPT);
  4.         //DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator
  5.         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
  6.     }
复制代码
同理这里通过pipeline()获取到的也是NioServerSocketChannel中的pipeline。它会在NioServerSocketChannel向main reactor注册成功之后被初始化。

前边提到main reactor线程会被限定只能在read loop中向NioServerSocketChannel读取16次客户端连接,所以在开始read loop之前,我们需要创建一个能够保存记录读取次数的对象,在每次read loop循环之后,可以根据这个对象来判断是否结束read loop。
这个对象就是这里的 RecvByteBufAllocator.Handle allocHandle 专门用于统计read loop中接收客户端连接的次数,以及判断是否该结束read loop转去执行异步任务。
当这一切准备就绪之后,main reactor线程就开始在do{....}while(...)循环中接收客户端连接了。
在 read loop中通过调用doReadMessages函数接收完成三次握手的客户端连接,底层会调用到JDK NIO ServerSocketChannel的accept方法,从内核全连接队列中取出客户端连接。

返回值localRead 表示接收到了多少客户端连接,客户端连接通过accept方法只会一个一个的接收,所以这里的localRead 正常情况下都会返回1,当localRead  0) {                totalBytesRead += bytes;            }        }[/code]MaxMessageHandler中还有一个非常重要的方法就是在每次read loop末尾会调用allocHandle.continueReading()方法来判断读取连接次数是否已满16次,来决定main reactor线程是否退出循环。
  1.     public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
  2.         try {
  3.             return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
  4.                 @Override
  5.                 public SocketChannel run() throws IOException {
  6.                     return serverSocketChannel.accept();
  7.                 }
  8.             });
  9.         } catch (PrivilegedActionException e) {
  10.             throw (IOException) e.getCause();
  11.         }
  12.     }
复制代码

红框中圈出来的两个判断条件和本文主题无关,我们这里不需要关注,笔者会在后面的文章详细介绍。
以上内容就是RecvByteBufAllocator.Handle在接收客户端连接场景下的作用,大家这里仔细看下这个allocHandle.continueReading()方法退出循环的判断条件,再结合整个do{....}while(...)接收连接循环体,感受下是否哪里有些不对劲?Bug即将出现~~~

4. 啊哈!!Bug ! !


netty不论是在本文中处理接收客户端连接的场景还是在处理接收客户端连接上的网络数据场景都会在一个do{....}while(...)循环read loop中不断的处理。
同时也都会利用在上一小节中介绍的RecvByteBufAllocator.Handle来记录每次read loop接收到的连接个数和从连接上读取到的网络数据大小。
从而在read loop的末尾都会通过allocHandle.continueReading()方法判断是否应该退出read loop循环结束连接的接收流程或者是结束连接上数据的读取流程。
无论是用于接收客户端连接的main reactor也好还是用于接收客户端连接上的网络数据的sub reactor也好,它们的运行框架都是一样的,只不过是具体分工不同。
所以netty这里想用统一的RecvByteBufAllocator.Handle来处理以上两种场景。
而RecvByteBufAllocator.Handle中的totalBytesRead字段主要记录sub reactor线程在处理客户端NioSocketChannel中OP_READ事件活跃时,总共在read loop中读取到的网络数据,而这里是main reactor线程在接收客户端连接所以这个字段并不会被设置。totalBytesRead字段的值在本文中永远会是0。
所以无论同时有多少个客户端并发连接到服务端上,在接收连接的这个read loop中永远只会接受一个连接就会退出循环,因为allocHandle.continueReading()方法中的判断条件totalBytesRead > 0永远会返回false。
  1.     public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
  2.         try {
  3.             return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
  4.                 @Override
  5.                 public SocketChannel run() throws IOException {
  6.                     return serverSocketChannel.accept();
  7.                 }
  8.             });
  9.         } catch (PrivilegedActionException e) {
  10.             throw (IOException) e.getCause();
  11.         }
  12.     }
复制代码
而netty的本意是在这个read loop循环中尽可能多的去接收客户端的并发连接,同时又不影响main reactor线程执行异步任务。但是由于这个Bug,main reactor在这个循环中只执行一次就结束了。这也一定程度上就影响了netty的吞吐
让我们想象下这样的一个场景,当有16个客户端同时并发连接到了服务端,这时NioServerSocketChannel上的OP_ACCEPT事件活跃,main reactor从Selector上被唤醒,随后执行OP_ACCEPT事件的处理。
  1.                 int size = readBuf.size();
  2.                 for (int i = 0; i < size; i ++) {
  3.                     readPending = false;
  4.                     //NioServerSocketChannel对应的pipeline中传播read事件
  5.                     //io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead
  6.                     //初始化客户端SocketChannel,并将其绑定到Sub Reactor线程组中的一个Reactor上
  7.                     pipeline.fireChannelRead(readBuf.get(i));
  8.                 }
复制代码
但是由于这个Bug的存在,main reactor在接收客户端连接的这个read loop中只接收了一个客户端连接就匆匆返回了。
  1.     public NioServerSocketChannel(ServerSocketChannel channel) {
  2.         super(null, channel, SelectionKey.OP_ACCEPT);
  3.         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
  4.     }
复制代码
然后根据下图中这个Reactor的运行结构去执行异步任务,随后绕一大圈又会回到NioEventLoop#run方法中重新发起一轮OP_ACCEPT事件轮询。

由于现在还有15个客户端并发连接没有被接收,所以此时Main Reactor线程并不会在selector.select()上阻塞,最终绕一圈又会回到NioMessageUnsafe#read方法的do{.....}while()循环。在接收一个连接之后又退出循环。
本来我们可以在一次read loop中把这16个并发的客户端连接全部接收完毕的,因为这个Bug,main reactor需要不断的发起OP_ACCEPT事件的轮询,绕了很大一个圈子。同时也增加了许多不必要的selector.select()系统调用开销

这时大家在看这个Issue#11708中的讨论是不是就清晰很多了~~
Issue#11708:https://github.com/netty/netty/issues/11708
4.1 Bug的修复

笔者在写这篇文章的时候,Netty最新版本是4.1.68.final,这个Bug在4.1.69.final中被修复。

由于该Bug产生的原因正是因为服务端NioServerSocketChannel(用于监听端口地址和接收客户端连接)和 客户端NioSocketChannel(用于通信)中的Config配置类混用了同一个ByteBuffer分配器AdaptiveRecvByteBufAllocator而导致的。
所以在新版本修复中专门为服务端ServerSocketChannel中的Config配置类引入了一个新的ByteBuffer分配器ServerChannelRecvByteBufAllocator,专门用于服务端ServerSocketChannel接收客户端连接的场景。


在ServerChannelRecvByteBufAllocator的父类DefaultMaxMessagesRecvByteBufAllocator中引入了一个新的字段ignoreBytesRead,用于表示是否忽略网络字节的读取,在创建服务端Channel配置类NioServerSocketChannelConfig的时候,这个字段会被赋值为true。

当main reactor线程在read loop循环中接收客户端连接的时候。
  1.     public NioServerSocketChannel(ServerSocketChannel channel) {
  2.         super(null, channel, SelectionKey.OP_ACCEPT);
  3.         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
  4.     }
复制代码
在read loop循环的末尾就会采用从ServerChannelRecvByteBufAllocator 中创建的MaxMessageHandle#continueReading方法来判断读取连接次数是否超过了16次。由于这里的ignoreBytesRead == true这回我们就会忽略totalBytesRead == 0的情况,从而使得接收连接的read loop得以继续地执行下去。在一个read loop中一次性把16个连接全部接收完毕。

以上就是对这个Bug产生的原因,以及发现的过程,最后修复的方案一个全面的介绍,因此笔者也出现在了netty 4.1.69.final版本发布公告里的thank-list中。哈哈,真是令人开心的一件事情~~~

通过以上对netty接收客户端连接的全流程分析和对这个Bug来龙去脉以及修复方案的介绍,大家现在一定已经理解了整个接收连接的流程框架。
接下来笔者就把这个流程中涉及到的一些核心模块在单独拎出来从细节入手,为大家各个击破~~~
5. doReadMessages接收客户端连接
  1. final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
复制代码
  1. protected abstract class AbstractUnsafe implements Unsafe {
  2.         @Override
  3.         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
  4.             if (recvHandle == null) {
  5.                 recvHandle = config().getRecvByteBufAllocator().newHandle();
  6.             }
  7.             return recvHandle;
  8.         }
  9. }
复制代码
  1. public class DefaultChannelConfig implements ChannelConfig {
  2.     //用于Channel接收数据用的buffer分配器  类型为AdaptiveRecvByteBufAllocator
  3.     private volatile RecvByteBufAllocator rcvBufAllocator;
  4. }
复制代码
这一步就是我们在《聊聊Netty那些事儿之从内核角度看IO模型》介绍到的调用监听Socket的accept方法,内核会基于监听Socket创建出来一个新的Socket专门用于与客户端之间的网络通信这个我们称之为客户端连接Socket。这里的ServerSocketChannel就类似于监听Socket。SocketChannel就类似于客户端连接Socket。
由于我们在创建NioServerSocketChannel的时候,会将JDK NIO 原生的ServerSocketChannel设置为非阻塞,所以这里当ServerSocketChannel上有客户端连接时就会直接创建SocketChannel,如果此时并没有客户端连接时accept调用就会立刻返回null并不会阻塞。
  1. public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
  2.     @Override
  3.     public Handle newHandle() {
  4.         return new HandleImpl(minIndex, maxIndex, initial);
  5.     }
  6.    
  7.     private final class HandleImpl extends MaxMessageHandle {
  8.                   .................省略................
  9.     }
  10. }
复制代码
5.1 创建客户端NioSocketChannel
  1.     public abstract class MaxMessageHandle implements ExtendedHandle {
  2.         private ChannelConfig config;
  3.         //每次事件轮询时,最多读取16次
  4.         private int maxMessagePerRead;
  5.         //本次事件轮询总共读取的message数,这里指的是接收连接的数量
  6.         private int totalMessages;
  7.         //本次事件轮询总共读取的字节数
  8.         private int totalBytesRead;
  9.        @Override
  10.         public void reset(ChannelConfig config) {
  11.             this.config = config;
  12.             //默认每次最多读取16次
  13.             maxMessagePerRead = maxMessagesPerRead();
  14.             totalMessages = totalBytesRead = 0;
  15.         }
  16.     }
复制代码
这里会根据ServerSocketChannel的accept方法获取到JDK NIO 原生的SocketChannel(用于底层真正与客户端通信的Channel),来创建Netty中的NioSocketChannel。
  1. ServerBootstrap b = new ServerBootstrap();
  2. b.group(bossGroup, workerGroup)
  3.   .channel(NioServerSocketChannel.class)
  4.   .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定义次数)
复制代码
创建客户端NioSocketChannel的过程其实和之前讲的创建服务端NioServerSocketChannel大体流程是一样的,我们这里只对客户端NioSocketChannel和服务端NioServerSocketChannel在创建过程中的不同之处做一个对比。
具体细节部分大家可以在回看下《详细图解Netty Reactor启动全流程》一文中关于NioServerSocketChannel的创建的详细细节。
5.3 对比NioSocketChannel与NioServerSocketChannel的不同

1:Channel的层次不同

在我们介绍Reactor的创建文章中,我们提到Netty中的Channel是具有层次的。由于客户端NioSocketChannel是在main reactor接收连接时在服务端NioServerSocketChannel中被创建的,所以在创建客户端NioSocketChannel的时候会通过构造函数指定了parent属性为NioServerSocketChanel。并将JDK NIO 原生的SocketChannel封装进Netty的客户端NioSocketChannel中。
而在Reactor启动过程中创建NioServerSocketChannel的时候parent属性指定是null。因为它就是顶层的Channel,负责创建客户端NioSocketChannel。
  1.         @Override
  2.         public final void incMessagesRead(int amt) {
  3.             totalMessages += amt;
  4.         }
复制代码
2:向Reactor注册的IO事件不同

客户端NioSocketChannel向Sub Reactor注册的是SelectionKey.OP_READ事件,而服务端NioServerSocketChannel向Main Reactor注册的是SelectionKey.OP_ACCEPT事件。
  1.         @Override
  2.         public void lastBytesRead(int bytes) {
  3.             lastBytesRead = bytes;
  4.             if (bytes > 0) {
  5.                 totalBytesRead += bytes;
  6.             }
  7.         }
复制代码
3: 功能属性不同造成继承结构的不同



客户端NioSocketChannel继承的是AbstractNioByteChannel,而服务端NioServerSocketChannel继承的是AbstractNioMessageChannel。
它们继承的这两个抽象类一个前缀是Byte,一个前缀是Message有什么区别吗??
客户端NioSocketChannel主要处理的是服务端与客户端的通信,这里涉及到接收客户端发送来的数据,而Sub Reactor线程从NioSocketChannel中读取的正是网络通信数据单位为Byte。
服务端NioServerSocketChannel主要负责处理OP_ACCEPT事件,创建用于通信的客户端NioSocketChannel。这时候客户端与服务端还没开始通信,所以Main Reactor线程从NioServerSocketChannel的读取对象为Message。这里的Message指的就是底层的SocketChannel客户端连接。
以上就是NioSocketChannel与NioServerSocketChannel创建过程中的不同之处,后面的过程就一样了。
  1.                   do {
  2.                         //底层调用NioServerSocketChannel->doReadMessages 创建客户端SocketChannel
  3.                         int localRead = doReadMessages(readBuf);
  4.                         if (localRead == 0) {
  5.                             break;
  6.                         }
  7.                         if (localRead < 0) {
  8.                             closed = true;
  9.                             break;
  10.                         }
  11.                         //统计在当前事件循环中已经读取到得Message数量(创建连接的个数)
  12.                         allocHandle.incMessagesRead(localRead);
  13.                     } while (allocHandle.continueReading());
复制代码
  1.                   do {
  2.                         //底层调用NioServerSocketChannel->doReadMessages 创建客户端SocketChannel
  3.                         int localRead = doReadMessages(readBuf);
  4.                         if (localRead == 0) {
  5.                             break;
  6.                         }
  7.                         if (localRead < 0) {
  8.                             closed = true;
  9.                             break;
  10.                         }
  11.                         //统计在当前事件循环中已经读取到得Message数量(创建连接的个数)
  12.                         allocHandle.incMessagesRead(localRead);
  13.                     } while (allocHandle.continueReading());
复制代码
  1. public final class NioEventLoop extends SingleThreadEventLoop {
  2.     @Override
  3.     protected void run() {
  4.         int selectCnt = 0;
  5.         for (;;) {
  6.             try {
  7.                 int strategy;
  8.                 try {
  9.                     strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
  10.                     switch (strategy) {
  11.                     case SelectStrategy.CONTINUE:                  
  12.                           ............省略.........
  13.                     case SelectStrategy.BUSY_WAIT:
  14.                           ............省略.........
  15.                     case SelectStrategy.SELECT:
  16.                             ............监听轮询IO事件.........
  17.                     default:
  18.                     }
  19.                 } catch (IOException e) {
  20.                     ............省略.........
  21.                 }
  22.                 ............处理IO就绪事件.........
  23.                 ............执行异步任务.........
  24.     }
  25. }
复制代码
在Bug修复后的版本中服务端NioServerSocketChannel的RecvByteBufAllocator类型设置为ServerChannelRecvByteBufAllocator
最终我们得到的客户端NioSocketChannel结构如下:

6. ChannelRead事件的响应


在前边介绍接收连接的整体核心流程框架的时候,我们提到main reactor线程是在一个do{.....}while(...)循环read loop中不断的调用ServerSocketChannel#accept方法来接收客户端的连接。
当满足退出read loop循环的条件有两个:
main reactor就会退出read loop循环,此时接收到的客户端连接NioSocketChannel暂存与List readBuf集合中。
  1.       private final class NioMessageUnsafe extends AbstractNioUnsafe {
  2.                     do {
  3.                         int localRead = doReadMessages(readBuf);
  4.                         .........省略...........
  5.                     } while (allocHandle.continueReading());
  6.      }
复制代码
随后main reactor线程会遍历List readBuf集合中的NioSocketChannel,并在NioServerSocketChannel的pipeline中传播ChannelRead事件。

最终ChannelRead事件会传播到ServerBootstrapAcceptor 中,这里正是Netty处理客户端连接的核心逻辑所在。
ServerBootstrapAcceptor 主要的作用就是初始化客户端NioSocketChannel,并将客户端NioSocketChannel注册到Sub Reactor Group中,并监听OP_READ事件。
在ServerBootstrapAcceptor 中会初始化客户端NioSocketChannel的这些属性。
比如:从Reactor组EventLoopGroup childGroup,用于初始化NioSocketChannel中的pipeline用到的ChannelHandler childHandler,以及NioSocketChannel中的一些childOptions 和childAttrs 。
  1.       private final class NioMessageUnsafe extends AbstractNioUnsafe {
  2.                     do {
  3.                         int localRead = doReadMessages(readBuf);
  4.                         .........省略...........
  5.                     } while (allocHandle.continueReading());
  6.      }
复制代码
正是在这里,netty会将我们在《详细图解Netty Reactor启动全流程》的启动示例程序中在ServerBootstrap中配置的客户端NioSocketChannel的所有属性(child前缀配置)初始化到NioSocketChannel中。
  1. public class NioServerSocketChannel extends AbstractNioMessageChannel
  2.                              implements io.netty.channel.socket.ServerSocketChannel {
  3.     @Override
  4.     protected int doReadMessages(List<Object> buf) throws Exception {
  5.         SocketChannel ch = SocketUtils.accept(javaChannel());
  6.         try {
  7.             if (ch != null) {
  8.                 buf.add(new NioSocketChannel(this, ch));
  9.                 return 1;
  10.             }
  11.         } catch (Throwable t) {
  12.             logger.warn("Failed to create a new channel from an accepted socket.", t);
  13.             try {
  14.                 ch.close();
  15.             } catch (Throwable t2) {
  16.                 logger.warn("Failed to close a socket.", t2);
  17.             }
  18.         }
  19.         return 0;
  20.     }
  21. }
复制代码
以上示例代码中通过ServerBootstrap配置的NioSocketChannel相关属性,会在Netty启动并开始初始化NioServerSocketChannel的时候将ServerBootstrapAcceptor 的创建初始化工作封装成异步任务,然后在NioServerSocketChannel注册到Main Reactor中成功后执行。
  1.     @Override
  2.     protected ServerSocketChannel javaChannel() {
  3.         return (ServerSocketChannel) super.javaChannel();
  4.     }
复制代码
在经过ServerBootstrapAccptor#chanelRead回调的处理之后,此时客户端NioSocketChannel中pipeline的结构为:

随后会将初始化好的客户端NioSocketChannel向Sub Reactor Group中注册,并监听OP_READ事件。
如下图中的步骤3所示:

7. 向SubReactorGroup中注册NioSocketChannel
  1.     public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
  2.         try {
  3.             return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
  4.                 @Override
  5.                 public SocketChannel run() throws IOException {
  6.                     return serverSocketChannel.accept();
  7.                 }
  8.             });
  9.         } catch (PrivilegedActionException e) {
  10.             throw (IOException) e.getCause();
  11.         }
  12.     }
复制代码
客户端NioSocketChannel向Sub Reactor Group注册的流程完全和服务端NioServerSocketChannel向Main Reactor Group注册流程一样。
关于服务端NioServerSocketChannel的注册流程,笔者已经在《详细图解Netty Reactor启动全流程》一文中做出了详细的介绍,对相关细节感兴趣的同学可以在回看下。
这里笔者在带大家简要回顾下整个注册过程并着重区别对比客户端NioSocetChannel与服务端NioServerSocketChannel注册过程中不同的地方。
7.1 从Sub Reactor Group中选取一个Sub Reactor进行绑定
  1.     protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
  2.         super(parent);
  3.         this.ch = ch;
  4.         this.readInterestOp = readInterestOp;
  5.         try {
  6.             //设置Channel为非阻塞 配合IO多路复用模型
  7.             ch.configureBlocking(false);
  8.         } catch (IOException e) {
  9.           ..........省略.............
  10.         }
  11.     }
复制代码
7.2 向绑定的Sub Reactor上注册NioSocketChannel
  1. public class NioServerSocketChannel extends AbstractNioMessageChannel
  2.                              implements io.netty.channel.socket.ServerSocketChannel {
  3.     @Override
  4.     protected int doReadMessages(List<Object> buf) throws Exception {
  5.         SocketChannel ch = SocketUtils.accept(javaChannel());
  6.         try {
  7.             if (ch != null) {
  8.                 buf.add(new NioSocketChannel(this, ch));
  9.                 return 1;
  10.             }
  11.         } catch (Throwable t) {
  12.           .........省略.......
  13.         }
  14.         return 0;
  15.     }
  16. }
复制代码
  1. public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
  2.     public NioSocketChannel(Channel parent, SocketChannel socket) {
  3.         super(parent, socket);
  4.         config = new NioSocketChannelConfig(this, socket.socket());
  5.     }
  6. }
复制代码
注意此时传递进来的EventLoop eventLoop为Sub Reactor
但此时的执行线程为Main Reactor线程,并不是Sub Reactor线程(此时还未启动)
所以这里的eventLoop.inEventLoop()返回的是false。

在else分支中向绑定的Sub Reactor提交注册NioSocketChannel的任务。
当注册任务提交后,此时绑定的Sub Reactor线程启动。
7.3 register0

我们又来到了Channel注册的老地方register0方法。在《详细图解Netty Reactor启动全流程》中我们花了大量的篇幅介绍了这个方法。这里我们只对比NioSocketChannel与NioServerSocketChannel不同的地方。
  1.     public NioServerSocketChannel(ServerSocketChannel channel) {
  2.         super(null, channel, SelectionKey.OP_ACCEPT);
  3.         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
  4.     }
复制代码
这里 doRegister()方法将NioSocketChannel注册到Sub Reactor中的Selector上。
  1. public abstract class AbstractNioByteChannel extends AbstractNioChannel {
  2.     protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
  3.         super(parent, ch, SelectionKey.OP_READ);
  4.     }
  5. }
  6. public class NioServerSocketChannel extends AbstractNioMessageChannel
  7.                              implements io.netty.channel.socket.ServerSocketChannel {
  8.    public NioServerSocketChannel(ServerSocketChannel channel) {
  9.         //父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT
  10.         super(null, channel, SelectionKey.OP_ACCEPT);
  11.         //DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator
  12.         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
  13.     }
  14. }
复制代码
这里是Netty客户端NioSocketChannel与JDK NIO 原生 SocketChannel关联的地方。此时注册的IO事件依然是0。目的也是只是为了获取NioSocketChannel在Selector中的SelectionKey。
同时通过SelectableChannel#register方法将Netty自定义的NioSocketChannel(这里的this指针)附着在SelectionKey的attechment属性上,完成Netty自定义Channel与JDK NIO Channel的关系绑定。这样在每次对Selector进行IO就绪事件轮询时,Netty 都可以从 JDK NIO Selector返回的SelectionKey中获取到自定义的Channel对象(这里指的就是NioSocketChannel)。

随后调用pipeline.invokeHandlerAddedIfNeeded()回调客户端NioSocketChannel上pipeline中的所有ChannelHandler的handlerAdded方法,此时pipeline的结构中只有一个ChannelInitializer。最终会在ChannelInitializer#handlerAdded回调方法中初始化客户端NioSocketChannel的pipeline。
  1.     protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
  2.         super(parent);
  3.         this.ch = ch;
  4.         this.readInterestOp = readInterestOp;
  5.         try {
  6.             //设置Channel为非阻塞 配合IO多路复用模型
  7.             ch.configureBlocking(false);
  8.         } catch (IOException e) {
  9.         }
  10.     }
复制代码
关于对Channel中pipeline的详细初始化过程,对细节部分感兴趣的同学可以回看下《详细图解Netty Reactor启动全流程》
此时客户端NioSocketChannel中的pipeline中的结构就变为了我们自定义的样子,在示例代码中我们自定义的ChannelHandler为EchoServerHandler。
  1.     protected AbstractChannel(Channel parent) {
  2.         this.parent = parent;
  3.         //channel全局唯一ID machineId+processId+sequence+timestamp+random
  4.         id = newId();
  5.         //unsafe用于底层socket的读写操作
  6.         unsafe = newUnsafe();
  7.         //为channel分配独立的pipeline用于IO事件编排
  8.         pipeline = newChannelPipeline();
  9.     }
复制代码
当客户端NioSocketChannel中的pipeline初始化完毕后,netty就开始调用safeSetSuccess(promise)方法回调regFuture中注册的ChannelFutureListener,通知客户端NioSocketChannel已经成功注册到Sub Reactor上了。
  1.     public DefaultChannelConfig(Channel channel) {
  2.             this(channel, new AdaptiveRecvByteBufAllocator());
  3.     }
复制代码
在服务端NioServerSocketChannel注册的时候我们会在listener中向Main Reactor提交bind绑定端口地址任务。但是在NioSocketChannel注册的时候,只会在listener中处理一下注册失败的情况。
当Sub Reactor线程通知ChannelFutureListener注册成功之后,随后就会调用pipeline.fireChannelRegistered()在客户端NioSocketChannel的pipeline中传播ChannelRegistered事件。

这里笔者重点要强调下,在之前介绍NioServerSocketChannel注册的时候,我们提到因为此时NioServerSocketChannel并未绑定端口地址,所以这时的NioServerSocketChannel并未激活,这里的isActive()返回false。register0方法直接返回。
服务端NioServerSocketChannel判断是否激活的标准为端口是否绑定成功。
  1.     private final class NioMessageUnsafe extends AbstractNioUnsafe {
  2.         private final List<Object> readBuf = new ArrayList<Object>();
  3.         @Override
  4.         public void read() {
  5.             try {
  6.                 try {
  7.                     do {
  8.                         ........省略.........
  9.                         //底层调用NioServerSocketChannel->doReadMessages 创建客户端SocketChannel
  10.                         int localRead = doReadMessages(readBuf);
  11.                         ........省略.........
  12.                         allocHandle.incMessagesRead(localRead);
  13.                     } while (allocHandle.continueReading());
  14.                 } catch (Throwable t) {
  15.                     exception = t;
  16.                 }
  17.                 int size = readBuf.size();
  18.                 for (int i = 0; i < size; i ++) {
  19.                     readPending = false;
  20.                     pipeline.fireChannelRead(readBuf.get(i));
  21.                 }
  22.                
  23.                   ........省略.........
  24.             } finally {
  25.                   ........省略.........
  26.             }
  27.         }
  28.     }
复制代码
客户端NioSocketChannel判断是否激活的标准为是否处于Connected状态。那么显然这里肯定是处于connected状态的。
  1. private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
  2.         private final EventLoopGroup childGroup;
  3.         private final ChannelHandler childHandler;
  4.         private final Entry<ChannelOption<?>, Object>[] childOptions;
  5.         private final Entry<AttributeKey<?>, Object>[] childAttrs;
  6.         @Override
  7.         @SuppressWarnings("unchecked")
  8.         public void channelRead(ChannelHandlerContext ctx, Object msg) {
  9.             final Channel child = (Channel) msg;
  10.             //向客户端NioSocketChannel的pipeline中
  11.             //添加在启动配置类ServerBootstrap中配置的ChannelHandler
  12.             child.pipeline().addLast(childHandler);
  13.             //利用配置的属性初始化客户端NioSocketChannel
  14.             setChannelOptions(child, childOptions, logger);
  15.             setAttributes(child, childAttrs);
  16.             try {
  17.                 /**
  18.                  * 1:在Sub Reactor线程组中选择一个Reactor绑定
  19.                  * 2:将客户端SocketChannel注册到绑定的Reactor上
  20.                  * 3:SocketChannel注册到sub reactor中的selector上,并监听OP_READ事件
  21.                  * */
  22.                 childGroup.register(child).addListener(new ChannelFutureListener() {
  23.                     @Override
  24.                     public void operationComplete(ChannelFuture future) throws Exception {
  25.                         if (!future.isSuccess()) {
  26.                             forceClose(child, future.cause());
  27.                         }
  28.                     }
  29.                 });
  30.             } catch (Throwable t) {
  31.                 forceClose(child, t);
  32.             }
  33.         }
  34. }
复制代码
NioSocketChannel已经处于connected状态,这里并不需要绑定端口,所以这里的isActive()返回true。
  1. public final class EchoServer {
  2.     static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
  3.     public static void main(String[] args) throws Exception {
  4.         // Configure the server.
  5.         //创建主从Reactor线程组
  6.         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  7.         EventLoopGroup workerGroup = new NioEventLoopGroup();
  8.         final EchoServerHandler serverHandler = new EchoServerHandler();
  9.         try {
  10.             ServerBootstrap b = new ServerBootstrap();
  11.             b.group(bossGroup, workerGroup)//配置主从Reactor
  12.              .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
  13.              .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
  14.              .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
  15.              .childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline
  16.                  @Override
  17.                  public void initChannel(SocketChannel ch) throws Exception {
  18.                      ChannelPipeline p = ch.pipeline();
  19.                      //p.addLast(new LoggingHandler(LogLevel.INFO));
  20.                      p.addLast(serverHandler);
  21.                  }
  22.              });
  23.             // Start the server. 绑定端口启动服务,开始监听accept事件
  24.             ChannelFuture f = b.bind(PORT).sync();
  25.             // Wait until the server socket is closed.
  26.             f.channel().closeFuture().sync();
  27.         } finally {
  28.             // Shut down all event loops to terminate all threads.
  29.             bossGroup.shutdownGracefully();
  30.             workerGroup.shutdownGracefully();
  31.         }
  32.     }
  33. }
复制代码
最后调用pipeline.fireChannelActive()在NioSocketChannel中的pipeline传播ChannelActive事件,最终在pipeline的头结点HeadContext中响应并注册OP_READ事件到Sub Reactor中的Selector上。
  1. public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
  2.     @Override
  3.     void init(Channel channel) {
  4.         ................省略................
  5.         p.addLast(new ChannelInitializer<Channel>() {
  6.             @Override
  7.             public void initChannel(final Channel ch) {
  8.                 final ChannelPipeline pipeline = ch.pipeline();
  9.                 ................省略................
  10.                 ch.eventLoop().execute(new Runnable() {
  11.                     @Override
  12.                     public void run() {
  13.                         pipeline.addLast(new ServerBootstrapAcceptor(
  14.                                 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  15.                     }
  16.                 });
  17.             }
  18.         });
  19.     }
  20. }
复制代码
注意这里的readInterestOp为客户端NioSocketChannel在初始化时设置的OP_READ事件。
到这里,Netty中的Main Reactor接收连接的整个流程,我们就介绍完了,此时Netty中主从Reactor组的结构就变为:

总结

本文我们介绍了NioServerSocketChannel处理客户端连接事件的整个过程。
其中我们也对比了NioServerSocketChannel与NioSocketChannel在创建初始化以及后面向Reactor注册过程中的差异之处。
当客户端NioSocketChannel接收完毕并向Sub Reactor注册成功后,那么接下来Sub Reactor就开始监听注册其上的所有客户端NioSocketChannel的OP_READ事件,并等待客户端向服务端发送网络数据。
后面Reactor的主角就该变为Sub Reactor以及注册在其上的客户端NioSocketChannel了。
下篇文章,我们将会讨论Netty是如何接收网络数据的~~~~ 我们下篇文章见~~
阅读原文
欢迎关注公众号:bin的技术小屋

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4