一文搞懂 Netty 发送数据全流程 | 你想知道的细节全在这里 ...

一给  金牌会员 | 2022-8-23 22:51:25 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 857|帖子 857|积分 2571

欢迎关注公众号:bin的技术小屋,如果大家在看文章的时候发现图片加载不了,可以到公众号查看原文
本系列Netty源码解析文章基于 4.1.56.Final版本

《Netty如何高效接收网络数据》一文中,我们介绍了 Netty 的 SubReactor 处理网络数据读取的完整过程,当 Netty 为我们读取了网络请求数据,并且我们在自己的业务线程中完成了业务处理后,就需要将业务处理结果返回给客户端了,那么本文我们就来介绍下 SubReactor 如何处理网络数据发送的整个过程。
我们都知道 Netty 是一款高性能的异步事件驱动的网络通讯框架,既然是网络通讯框架那么它主要做的事情就是:

  • 接收客户端连接。
  • 读取连接上的网络请求数据。
  • 向连接发送网络响应数据。
前边系列文章在介绍Netty的启动以及接收连接的过程中,我们只看到 OP_ACCEPT 事件以及 OP_READ 事件的注册,并未看到 OP_WRITE 事件的注册。

  • 那么在什么情况下 Netty 才会向 SubReactor 去注册 OP_WRITE 事件呢?
  • Netty 又是怎么对写操作做到异步处理的呢?
本文笔者将会为大家一一揭晓这些谜底。我们还是以之前的 EchoServer 为例进行说明。
  1. @Sharable
  2. public class EchoServerHandler extends ChannelInboundHandlerAdapter {
  3.     @Override
  4.     public void channelRead(ChannelHandlerContext ctx, Object msg) {
  5.         //此处的msg就是Netty在read loop中从NioSocketChannel中读取到的ByteBuffer
  6.         ctx.write(msg);
  7.     }
  8. }
复制代码
我们将在《Netty如何高效接收网络数据》一文中读取到的 ByteBuffer (这里的 Object msg),直接发送回给客户端,用这个简单的例子来揭开 Netty 如何发送数据的序幕~~
在实际开发中,我们首先要通过解码器将读取到的 ByteBuffer 解码转换为我们的业务 Request 类,然后在业务线程中做业务处理,在通过编码器对业务 Response 类编码为 ByteBuffer ,最后利用 ChannelHandlerContext ctx 的引用发送响应数据。
本文我们只聚焦 Netty 写数据的过程,对于 Netty 编解码相关的内容,笔者会在后续的文章中专门介绍。

1. ChannelHandlerContext


通过前面几篇文章的介绍,我们知道 Netty 会为每个 Channel 分配一个 pipeline ,pipeline 是一个双向链表的结构。Netty 中产生的 IO 异步事件会在这个 pipeline 中传播。
Netty 中的 IO 异步事件大体上分为两类:

  • inbound事件:入站事件,比如前边介绍的 ChannelActive 事件, ChannelRead 事件,它们会从 pipeline 的头结点 HeadContext 开始一直向后传播。
  • outbound事件:出站事件,比如本文中即将要介绍到的 write事件 以及 flush 事件,出站事件会从相反的方向从后往前传播直到 HeadContext 。最终会在 HeadContext 中完成出站事件的处理。

    • 本例中用到的 channelHandlerContext.write()  会使 write 事件从当前 ChannelHandler 也就是这里的 EchoServerHandler 开始沿着 pipeline 向前传播。
    • 而 channelHandlerContext.channel().write() 则会使 write 事件从 pipeline 的尾结点 TailContext 开始向前传播直到 HeadContext 。


而 pipeline 这样一个双向链表数据结构中的类型正是 ChannelHandlerContext  ,由 ChannelHandlerContext 包裹我们自定义的 IO 处理逻辑 ChannelHandler。
ChannelHandler 并不需要感知到它所处的 pipeline 中的上下文信息,只需要专心处理好 IO 逻辑即可,关于 pipeline 的上下文信息全部封装在 ChannelHandlerContext中。
ChannelHandler 在 Netty 中的作用只是负责处理 IO 逻辑,比如编码,解码。它并不会感知到它在 pipeline 中的位置,更不会感知和它相邻的两个 ChannelHandler。事实上 ChannelHandler也并不需要去关心这些,它唯一需要关注的就是处理所关心的异步事件
而 ChannelHandlerContext 中维护了 pipeline 这个双向链表中的 pre 以及 next 指针,这样可以方便的找到与其相邻的 ChannelHandler ,并可以过滤出一些符合执行条件的 ChannelHandler。正如它的命名一样, ChannelHandlerContext  正是起到了维护 ChannelHandler 上下文的一个作用。而 Netty 中的异步事件在 pipeline 中的传播靠的就是这个 ChannelHandlerContext 。
这样设计就使得 ChannelHandlerContext 和 ChannelHandler 的职责单一,各司其职,具有高度的可扩展性。
2. write事件的传播

我们无论是在业务线程或者是在 SubReactor 线程中完成业务处理后,都需要通过 channelHandlerContext 的引用将 write事件在 pipeline 中进行传播。然后在 pipeline 中相应的 ChannelHandler 中监听 write 事件从而可以对 write事件进行自定义编排处理(比如我们常用的编码器),最终传播到 HeadContext 中执行发送数据的逻辑操作。
前边也提到 Netty 中有两个触发 write 事件传播的方法,它们的传播处理逻辑都是一样的,只不过它们在 pipeline 中的传播起点是不同的。

  • channelHandlerContext.write() 方法会从当前 ChannelHandler 开始在 pipeline 中向前传播 write 事件直到 HeadContext。
  • channelHandlerContext.channel().write() 方法则会从 pipeline 的尾结点 TailContext 开始在 pipeline 中向前传播 write 事件直到 HeadContext 。

在我们清楚了 write 事件的总体传播流程后,接下来就来看看在 write 事件传播的过程中Netty为我们作了些什么?这里我们以 channelHandlerContext.write() 方法为例说明。
3. write方法发送数据

  1. abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
  2.     @Override
  3.     public ChannelFuture write(Object msg) {
  4.         return write(msg, newPromise());
  5.     }
  6.     @Override
  7.     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
  8.         write(msg, false, promise);
  9.         return promise;
  10.     }
  11. }
复制代码
这里我们看到 Netty 的写操作是一个异步操作,当我们在业务线程中调用 channelHandlerContext.write() 后,Netty 会给我们返回一个 ChannelFuture,我们可以在这个 ChannelFutrue 中添加 ChannelFutureListener ,这样当 Netty 将我们要发送的数据发送到底层 Socket 中时,Netty 会通过 ChannelFutureListener 通知我们写入结果。
  1.     @Override
  2.     public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
  3.         //此处的msg就是Netty在read loop中从NioSocketChannel中读取到的ByteBuffer
  4.         ChannelFuture future = ctx.write(msg);
  5.         future.addListener(new ChannelFutureListener() {
  6.             @Override
  7.             public void operationComplete(ChannelFuture future) throws Exception {
  8.                 Throwable cause = future.cause();
  9.                 if (cause != null) {
  10.                      处理异常情况
  11.                 } else {                    
  12.                      写入Socket成功后,Netty会通知到这里
  13.                 }
  14.             }
  15.         });
  16. }
复制代码
当异步事件在 pipeline 传播的过程中发生异常时,异步事件就会停止在 pipeline 中传播。所以我们在日常开发中,需要对写操作异常情况进行处理。

  • 其中 inbound 类异步事件发生异常时,会触发exceptionCaught事件传播
    exceptionCaught 事件本身也是一种 inbound 事件,传播方向会从当前发生异常的 ChannelHandler 开始一直向后传播直到 TailContext。
  • 而 outbound 类异步事件发生异常时,则不会触发exceptionCaught事件传播。一般只是通知相关 ChannelFuture。但如果是 flush 事件在传播过程中发生异常,则会触发当前发生异常的 ChannelHandler 中 exceptionCaught 事件回调。
我们继续回归到写操作的主线上来~~~
  1.     private void write(Object msg, boolean flush, ChannelPromise promise) {
  2.         ObjectUtil.checkNotNull(msg, "msg");
  3.         ................省略检查promise的有效性...............
  4.         //flush = true 表示channelHandler中调用的是writeAndFlush方法,这里需要找到pipeline中覆盖write或者flush方法的channelHandler
  5.         //flush = false 表示调用的是write方法,只需要找到pipeline中覆盖write方法的channelHandler
  6.         final AbstractChannelHandlerContext next = findContextOutbound(flush ?
  7.                 (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
  8.         //用于检查内存泄露
  9.         final Object m = pipeline.touch(msg, next);
  10.         //获取pipeline中下一个要被执行的channelHandler的executor
  11.         EventExecutor executor = next.executor();
  12.         //确保OutBound事件由ChannelHandler指定的executor执行
  13.         if (executor.inEventLoop()) {
  14.             //如果当前线程正是channelHandler指定的executor则直接执行
  15.             if (flush) {
  16.                 next.invokeWriteAndFlush(m, promise);
  17.             } else {
  18.                 next.invokeWrite(m, promise);
  19.             }
  20.         } else {
  21.             //如果当前线程不是ChannelHandler指定的executor,则封装成异步任务提交给指定executor执行,注意这里的executor不一定是reactor线程。
  22.             final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
  23.             if (!safeExecute(executor, task, promise, m, !flush)) {
  24.                 task.cancel();
  25.             }
  26.         }
  27.     }
复制代码
write 事件要向前在 pipeline 中传播,就需要在 pipeline 上找到下一个具有执行资格的 ChannelHandler,因为位于当前 ChannelHandler 前边的可能是 ChannelInboundHandler 类型的也可能是 ChannelOutboundHandler 类型的 ChannelHandler ,或者有可能压根就不关心 write 事件的 ChannelHandler(没有实现write回调方法)。

这里我们就需要通过 findContextOutbound 方法在当前 ChannelHandler 的前边找到 ChannelOutboundHandler 类型并且覆盖实现 write 回调方法的 ChannelHandler 作为下一个要执行的对象。
3.1 findContextOutbound
  1.   private AbstractChannelHandlerContext findContextOutbound(int mask) {
  2.         AbstractChannelHandlerContext ctx = this;
  3.         //获取当前ChannelHandler的executor
  4.         EventExecutor currentExecutor = executor();
  5.         do {
  6.             //获取前一个ChannelHandler
  7.             ctx = ctx.prev;
  8.         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
  9.         return ctx;
  10.     }
  11.     //判断前一个ChannelHandler是否具有响应Write事件的资格
  12.     private static boolean skipContext(
  13.             AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
  14.         return (ctx.executionMask & (onlyMask | mask)) == 0 ||
  15.                 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
  16.     }
复制代码
findContextOutbound 方法接收的参数是一个掩码,这个掩码表示要向前查找具有什么样执行资格的 ChannelHandler。因为我们这里调用的是 ChannelHandlerContext 的 write 方法所以 flush = false,传递进来的掩码为 MASK_WRITE,表示我们要向前查找覆盖实现了 write 回调方法的 ChannelOutboundHandler。
3.1.1 掩码的巧妙应用

Netty 中将 ChannelHandler 覆盖实现的一些异步事件回调方法用 int 型的掩码来表示,这样我们就可以通过这个掩码来判断当前 ChannelHandler 具有什么样的执行资格。
  1. final class ChannelHandlerMask {
  2.     ....................省略......................
  3.     static final int MASK_CHANNEL_ACTIVE = 1 << 3;
  4.     static final int MASK_CHANNEL_READ = 1 << 5;
  5.     static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
  6.     static final int MASK_WRITE = 1 << 15;
  7.     static final int MASK_FLUSH = 1 << 16;
  8.    //outbound事件掩码集合
  9.    static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
  10.             MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
  11.     ....................省略......................
  12. }
复制代码
最终会在 doWriteFileRegion 方法中通过 FileChannel#transferTo 方法底层用到的系统调用为 sendFile 实现零拷贝网络文件的传输。
  1. abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
  2.     //ChannelHandler执行资格掩码
  3.     private final int executionMask;
  4.     ....................省略......................
  5. }
复制代码
关于 Netty 中涉及到的零拷贝,笔者会有一篇专门的文章为大家讲解,本文的主题我们还是先聚焦于把发送流程的主线打通。
我们继续回到发送数据流程主线上来~~
  1.     @Override
  2.     protected void doBeginRead() throws Exception {
  3.         final SelectionKey selectionKey = this.selectionKey;
  4.         if (!selectionKey.isValid()) {
  5.             return;
  6.         }
  7.         readPending = true;
  8.         final int interestOps = selectionKey.interestOps();
  9.         /**
  10.          * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件
  11.          * 2:SocketChannel 初始化时 readInterestOp设置的是OP_READ事件
  12.          * */
  13.         if ((interestOps & readInterestOp) == 0) {
  14.             //注册监听OP_ACCEPT或者OP_READ事件
  15.             selectionKey.interestOps(interestOps | readInterestOp);
  16.         }
  17.     }
复制代码
<ul>region.transferred() >= region.count() :表示当前 FileRegion 中的文件数据已经传输完毕。那么在这种情况下本次 write loop 没有写入任何数据到 Socket ,所以返回 0 ,writeSpinCount - 0 意思就是本次 write loop 不算,继续循环。
localFlushedAmount > 0 :表示本 write loop 中写入了一些数据到 Socket 中,会有返回 1,writeSpinCount - 1 减少一次 write loop 次数。

localFlushedAmount >> 1);        }    }[/code]
由于操作系统会动态调整 SO_SNDBUF 的大小,所以这里 netty 也需要根据操作系统的动态调整做出相应的调整,目的是尽量多的去写入数据。
attempted == written 表示本次 write loop 尝试写入的数据能全部写入到 Socket 的写缓冲区中,那么下次 write loop 就应该尝试去写入更多的数据。
那么这里的更多具体是多少呢?
Netty 会将本次写入的数据量 written 扩大两倍,如果扩大两倍后的写入量大于本次 write loop 的最大限制写入量 maxBytesPerGatheringWrite,说明用户的写入需求很猛烈,Netty当然要满足这样的猛烈需求,那么就将当前 NioSocketChannelConfig 中的 maxBytesPerGatheringWrite 更新为本次 write loop 两倍的写入量大小。
在下次 write loop 写入数据的时候,就会尝试从 ChannelOutboundBuffer 中加载最多 written * 2 大小的字节数。
如果扩大两倍后的写入量依然小于等于本次 write loop 的最大限制写入量 maxBytesPerGatheringWrite,说明用户的写入需求还不是很猛烈,Netty 继续维持本次 maxBytesPerGatheringWrite 数值不变。
如果本次写入的数据还不及尝试写入数据的 1 / 2 :written < attempted >>> 1。说明当前 Socket 写缓冲区的可写容量不是很多了,下一次 write loop 就不要写这么多了尝试减少下次写入的量将下次 write loop 要写入的数据减小为 attempted 的1 / 2。当然也不能无限制的减小,最小值不能低于 2048。
这里可以结合笔者前边的文章《一文聊透ByteBuffer动态自适应扩缩容机制》中介绍到的 read loop 场景中的扩缩容一起对比着看。
read loop 中的扩缩容触发时机是在一个完整的 read loop 结束时候触发。而 write loop 中扩缩容的触发时机是在每次 write loop 发送完数据后,立即触发扩缩容判断。

  • 当本次 write loop 批量发送完 ChannelOutboundBuffer 中的数据之后,最后调用in.removeBytes(localWrittenBytes) 从 ChannelOutboundBuffer 中移除全部写完的 Entry ,如果只发送了 Entry 的部分数据则更新 Entry 对象中封装的 DirectByteBuffer 的 readerIndex,等待下一次 write loop 写入。
到这里,write loop 中的发送数据的逻辑就介绍完了,接下来 Netty 会在 write loop 中循环地发送数据直到写满 16 次或者数据发送完毕。
还有一种退出 write loop 的情况就是当 Socket 中的写缓冲区满了,无法在写入时。Netty 会退出 write loop 并向 reactor 注册 OP_WRITE 事件。
但这其中还隐藏着一种情况就是如果 write loop 已经写满 16 次但还没写完数据并且此时 Socket 写缓冲区还没有满,还可以继续在写。那 Netty 会如何处理这种情况呢?
6. 处理Socket可写但已经写满16次还没写完的情况
  1.     @Override    protected void doWrite(ChannelOutboundBuffer in) throws Exception {              SocketChannel ch = javaChannel();        int writeSpinCount = config().getWriteSpinCount();        do {              .........将待发送数据转换到JDK NIO ByteBuffer中.........            int nioBufferCnt = in.nioBufferCount();            switch (nioBufferCnt) {    @Override
  2.     protected void doBeginRead() throws Exception {
  3.         final SelectionKey selectionKey = this.selectionKey;
  4.         if (!selectionKey.isValid()) {
  5.             return;
  6.         }
  7.         readPending = true;
  8.         final int interestOps = selectionKey.interestOps();
  9.         /**
  10.          * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件
  11.          * 2:SocketChannel 初始化时 readInterestOp设置的是OP_READ事件
  12.          * */
  13.         if ((interestOps & readInterestOp) == 0) {
  14.             //注册监听OP_ACCEPT或者OP_READ事件
  15.             selectionKey.interestOps(interestOps | readInterestOp);
  16.         }
  17.     }                case 1: {                      .....发送单个nioBuffer....                }                default: {                      .....批量发送多个nioBuffers......                }                        }        } while (writeSpinCount > 0);                //处理write loop结束 但数据还没写完的情况        incompleteWrite(writeSpinCount < 0);    }
复制代码
当 write loop 结束后,这时 writeSpinCount 的值会有两种情况:

  • writeSpinCount < 0:这种情况有点不好理解,我们在介绍 Netty 通过零拷贝的方式传输网络文件也就是这里的 case 0 分支逻辑时,详细介绍了 doWrite0 方法的几种返回值,当 Netty 在传输文件的过程中发现 Socket 缓冲区已满无法在继续写入数据时,会返回 WRITE_STATUS_SNDBUF_FULL = Integer.MAX_VALUE,这就使得 writeSpinCount的值 <  0。随后 break 掉 write loop 来到 incompleteWrite(writeSpinCount < 0) 方法中,最后会在 incompleteWrite 方法中向 reactor 注册 OP_WRITE 事件。当 Socket 缓冲区变得可写时,epoll 会通知 reactor 线程继续发送文件。
  1. ctx = ctx.prev;
复制代码

  • writeSpinCount == 0: 这种情况很好理解,就是已经写满了 16 次,但是还没写完,同时 Socket 的写缓冲区未满,还可以继续写入。这种情况下即使 Socket 还可以继续写入,Netty 也不会再去写了,因为执行 flush 操作的是 reactor 线程,而 reactor 线程负责执行注册在它上边的所有 channel 的 IO 操作,Netty 不会允许 reactor 线程一直在一个 channel 上执行 IO 操作,reactor 线程的执行时间需要均匀的分配到每个 channel 上。所以这里 Netty 会停下,转而去处理其他 channel 上的 IO 事件。
那么还没写完的数据,Netty会如何处理呢
  1. public class EchoChannelHandler extends ChannelOutboundHandlerAdapter {
  2.     @Override
  3.     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  4.         super.write(ctx, msg, promise);
  5.     }
  6. }
复制代码
这个方法的 if 分支逻辑,我们在介绍do {.....}while()循环体 write loop 中发送逻辑时已经提过,在 write loop 循环发送数据的过程中,如果发现 Socket 缓冲区已满,无法写入数据时( localWrittenBytes

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

一给

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表