欢迎关注公众号:bin的技术小屋,本文图片加载不出来的话可查看公众号原文
本系列Netty源码解析文章基于 4.1.56.Final版本
1. 前文回顾
在前边的系列文章中,笔者为大家详细剖析了 Reactor 模型在 netty 中的创建,启动,运行,接收连接,接收数据,发送数据的完整流程,在详细剖析整个 Reactor 模型如何在 netty 中实现的过程里,我们或多或少的见到了 pipeline 的身影。

比如在 Reactor 启动的过程中首先需要创建 NioServerSocketChannel ,在创建的过程中会为 NioServerSocketChannel 创建分配一个 pipeline ,用于对 OP_ACCEPT 事件的编排。
当 NioServerSocketChannel 向 main reactor 注册成功后,会在 pipeline 中触发 ChannelRegistered 事件的传播。
当 NioServerSocketChannel 绑定端口成功后,会在 pipeline 中触发 ChannelActive 事件的传播。

又比如在 Reactor 接收连接的过程中,当客户端发起一个连接并完成三次握手之后,连接对应的 Socket 会存放在内核中的全连接队列中,随后 JDK Selector 会通知 main reactor 此时 NioServerSocketChannel 上有 OP_ACCEPT 事件活跃,最后 main reactor 开始执行 NioServerSocketChannel 的底层操作类 NioMessageUnsafe#read 方法在 NioServerSocketChannel 中的 pipeline 中传播 ChannelRead 事件。

最终会在 NioServerSocketChannel 的 pipeline 中的 ServerBootstrapAcceptor 中响应 ChannelRead 事件并创建初始化 NioSocketChannel ,随后会为每一个新创建的 NioSocetChannel 创建分配一个独立的 pipeline ,用于各自 NioSocketChannel 上的 IO 事件的编排。并向 sub reactor 注册 NioSocketChannel ,随后在 NioSocketChannel 的 pipeline 中传播 ChannelRegistered 事件,最后传播 ChannelActive 事件。

还有在《Netty如何高效接收网络数据》一文中,我们也提过当 sub reactor 读取 NioSocketChannel 中来自客户端的请求数据时,会在 NioSocketChannel 的 pipeline 中传播 ChannelRead 事件,在一个完整的 read loop 读取完毕后会传播 ChannelReadComplete 事件。
在《一文搞懂Netty发送数据全流程》一文中,我们讲到了在用户经过业务处理后,通过 write 方法和 flush 方法分别在 NioSocketChannel 的 pipeline 中传播 write 事件和 flush 事件的过程。
笔者带大家又回顾了一下在前边系列文章中关于 pipeline 的使用场景,但是在这些系列文章中并未对 pipeline 相关的细节进行完整全面地描述,那么本文笔者将为大家详细的剖析下 pipeline 在 IO 事件的编排和传播场景下的完整实现原理。

2. pipeline的创建

Netty 会为每一个 Channel 分配一个独立的 pipeline ,pipeline 伴随着 channel 的创建而创建。
前边介绍到 NioServerSocketChannel 是在 netty 服务端启动的过程中创建的。而 NioSocketChannel 的创建是在当 NioServerSocketChannel 上的 OP_ACCEPT 事件活跃时,由 main reactor 线程在 NioServerSocketChannel 中创建,并在 NioServerSocketChannel 的 pipeline 中对 OP_ACCEPT 事件进行编排时(图中的 ServerBootstrapAcceptor 中)初始化的。
无论是创建 NioServerSocketChannel 里的 pipeline 还是创建 NioSocketChannel 里的 pipeline , 最终都会委托给它们的父类 AbstractChannel 。
 - public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
- protected AbstractChannel(Channel parent) {
- this.parent = parent;
- //channel全局唯一ID machineId+processId+sequence+timestamp+random
- id = newId();
- //unsafe用于底层socket的相关操作
- unsafe = newUnsafe();
- //为channel分配独立的pipeline用于IO事件编排
- pipeline = newChannelPipeline();
- }
- protected DefaultChannelPipeline newChannelPipeline() {
- return new DefaultChannelPipeline(this);
- }
- }
复制代码- public class DefaultChannelPipeline implements ChannelPipeline {
- ....................
- //pipeline中的头结点
- final AbstractChannelHandlerContext head;
- //pipeline中的尾结点
- final AbstractChannelHandlerContext tail;
- //pipeline中持有对应channel的引用
- private final Channel channel;
- ....................
- protected DefaultChannelPipeline(Channel channel) {
- //pipeline中持有对应channel的引用
- this.channel = ObjectUtil.checkNotNull(channel, "channel");
-
- ............省略.......
- tail = new TailContext(this);
- head = new HeadContext(this);
- head.next = tail;
- tail.prev = head;
- }
- ....................
- }
复制代码 在前边的系列文章中笔者多次提到过,pipeline 的结构是由 ChannelHandlerContext 类型的节点构成的双向链表。其中头结点为 HeadContext ,尾结点为 TailContext 。其初始结构如下:

2.1 HeadContext
- private static final String HEAD_NAME = generateName0(HeadContext.class);
- final class HeadContext extends AbstractChannelHandlerContext
- implements ChannelOutboundHandler, ChannelInboundHandler {
- //headContext中持有对channel unsafe操作类的引用 用于执行channel底层操作
- private final Unsafe unsafe;
- HeadContext(DefaultChannelPipeline pipeline) {
- super(pipeline, null, HEAD_NAME, HeadContext.class);
- //持有channel unsafe操作类的引用,后续用于执行channel底层操作
- unsafe = pipeline.channel().unsafe();
- //设置channelHandler的状态为ADD_COMPLETE
- setAddComplete();
- }
- @Override
- public ChannelHandler handler() {
- return this;
- }
- .......................
- }
复制代码 我们知道双向链表结构的 pipeline 中的节点元素为 ChannelHandlerContext ,既然 HeadContext 作为 pipeline 的头结点,那么它一定是 ChannelHandlerContext 类型的,所以它需要继承实现 AbstractChannelHandlerContext ,相当于一个哨兵的作用,因为用户可以以任意顺序向 pipeline 中添加 ChannelHandler ,需要用 HeadContext 来固定指向第一个 ChannelHandlerContext 。
在《一文搞懂Netty发送数据全流程》 一文中的《1. ChannelHandlerContext》小节中,笔者曾为大家详细介绍过 ChannelHandlerContext 在 pipeline 中的作用,忘记的同学可以在回看下。
于此同时 HeadContext 又实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,说明 HeadContext 即是一个 ChannelHandlerContext 又是一个 ChannelHandler ,它可以同时处理 Inbound 事件和 Outbound 事件。
我们也注意到 HeadContext 中持有了对应 channel 的底层操作类 unsafe ,这也说明 IO 事件在 pipeline 中的传播最终会落在 HeadContext 中进行最后的 IO 处理。它是 Inbound 事件的处理起点,也是 Outbound 事件的处理终点。这里也可以看出 HeadContext 除了起到哨兵的作用,它还承担了对 channel 底层相关的操作。
比如我们在《Reactor在Netty中的实现(启动篇)》中介绍的 NioServerSocketChannel 在向 main reactor 注册完成后会触发 ChannelRegistered 事件从 HeadContext 开始依次在 pipeline 中向后传播。- @Override
- public void channelRegistered(ChannelHandlerContext ctx) {
- //此时firstRegistration已经变为false,在pipeline.invokeHandlerAddedIfNeeded中已被调用过
- invokeHandlerAddedIfNeeded();
- ctx.fireChannelRegistered();
- }
复制代码 以及 NioServerSocketChannel 在与端口绑定成功后会触发 ChannelActive 事件从 HeadContext 开始依次在 pipeline 中向后传播,并在 HeadContext 中通过 unsafe.beginRead() 注册 OP_ACCEPT 事件到 main reactor 中。- @Override
- public void read(ChannelHandlerContext ctx) {
- //触发注册OP_ACCEPT或者OP_READ事件
- unsafe.beginRead();
- }
复制代码 同理在 NioSocketChannel 在向 sub reactor 注册成功后。会先后触发 ChannelRegistered 事件和 ChannelActive 事件从 HeadContext 开始在 pipeline 中向后传播。并在 HeadContext 中通过 unsafe.beginRead() 注册 OP_READ 事件到 sub reactor 中。- @Override
- public void channelActive(ChannelHandlerContext ctx) {
- //pipeline中继续向后传播channelActive事件
- ctx.fireChannelActive();
- //如果是autoRead 则自动触发read事件传播
- //在read回调函数中 触发OP_ACCEPT或者OP_READ事件注册
- readIfIsAutoRead();
- }
复制代码 在《一文搞懂Netty发送数据全流程》中介绍的 write 事件和 flush 事件最终会在 pipeline 中从后向前一直传播到 HeadContext ,并在 HeadContext 中相应事件回调函数中调用 unsafe 类操作底层 channel 发送数据。- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
- //到headContext这里 msg的类型必须是ByteBuffer,也就是说必须经过编码器将业务层写入的实体编码为ByteBuffer
- unsafe.write(msg, promise);
- }
- @Override
- public void flush(ChannelHandlerContext ctx) {
- unsafe.flush();
- }
复制代码从本小节的内容介绍中,我们可以看出在 Netty 中对于 Channel 的相关底层操作调用均是在 HeadContext 中触发的。
2.2 TailContext
- private static final String TAIL_NAME = generateName0(TailContext.class);
- final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
- TailContext(DefaultChannelPipeline pipeline) {
- super(pipeline, null, TAIL_NAME, TailContext.class);
- //设置channelHandler的状态为ADD_COMPLETE
- setAddComplete();
- }
- @Override
- public ChannelHandler handler() {
- return this;
- }
-
- ......................
- }
复制代码 同样 TailContext 作为双向链表结构的 pipeline 中的尾结点,也需要继承实现 AbstractChannelHandlerContext 。但它同时又实现了 ChannelInboundHandler 。
这说明 TailContext 除了是一个 ChannelHandlerContext 同时也是一个 ChannelInboundHandler 。
2.2.1 TailContext 作为一个 ChannelHandlerContext 的作用
TailContext 作为一个 ChannelHandlerContext 的作用是负责将 outbound 事件从 pipeline 的末尾一直向前传播直到 HeadContext 。当然前提是用户需要调用 channel 的相关 outbound 方法。- public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
- ChannelFuture write(Object msg);
- ChannelFuture write(Object msg, ChannelPromise promise);
- ChannelOutboundInvoker flush();
- ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
- ChannelFuture writeAndFlush(Object msg);
- }
复制代码- public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
- @Override
- public ChannelFuture write(Object msg) {
- return pipeline.write(msg);
- }
- @Override
- public Channel flush() {
- pipeline.flush();
- return this;
- }
- @Override
- public ChannelFuture writeAndFlush(Object msg) {
- return pipeline.writeAndFlush(msg);
- }
- }
复制代码- public class DefaultChannelPipeline implements ChannelPipeline {
- @Override
- public final ChannelFuture write(Object msg) {
- return tail.write(msg);
- }
- @Override
- public final ChannelPipeline flush() {
- tail.flush();
- return this;
- }
- @Override
- public final ChannelFuture writeAndFlush(Object msg) {
- return tail.writeAndFlush(msg);
- }
- }
复制代码 这里我们可以看到,当我们在自定义 ChannelHandler 中调用 ctx.channel().write(msg) 时,会在 AbstractChannel 中触发 pipeline.write(msg) ,最终在 DefaultChannelPipeline 中调用 tail.write(msg) 。使得 write 事件可以从 pipeline 的末尾开始向前传播,其他 outbound 事件的传播也是一样的道理。- public class EchoServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
- ctx.channel().write(msg);
- }
- }
复制代码 而我们自定义的 ChannelHandler 会被封装在一个 ChannelHandlerContext 中从而加入到 pipeline 中,而这个用于装载自定义 ChannelHandler 的 ChannelHandlerContext 与 TailContext 一样本质也都是 ChannelHandlerContext ,只不过在 pipeline 中的位置不同罢了。
 - public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
- ChannelFuture write(Object msg);
- ChannelFuture write(Object msg, ChannelPromise promise);
- ChannelOutboundInvoker flush();
- ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
- ChannelFuture writeAndFlush(Object msg);
- }
复制代码 我们看到 ChannelHandlerContext 接口本身也会继承 ChannelInboundInvoker
和 ChannelOutboundInvoker 接口,所以说 ContextHandlerContext 也可以触发 inbound 事件和 outbound 事件,只不过表达的语义是在 pipeline 中从当前 ChannelHandler 开始向前或者向后传播 outbound 事件或者 inbound 事件。- public class EchoServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
- ctx.write(msg);
- }
- }
复制代码 这里表示 write 事件从当前 EchoServerHandler 开始在 pipeline 中向前传播直到 HeadContext 。

2.2.2 TailContext 作为一个 ChannelInboundHandler 的作用
最后 TailContext 作为一个 ChannelInboundHandler 的作用就是为 inbound 事件在 pipeline 中的传播做一个兜底的处理。
这里提到的兜底处理是什么意思呢?
比如我们前边介绍到的,在 NioSocketChannel 向 sub reactor 注册成功后之后触发的 ChannelRegistered 事件和 ChannelActive 事件。或者在 reactor 线程读取 NioSocketChannel 中的请求数据时所触发的 channelRead 事件和 ChannelReadComplete 事件。
这些 inbound 事件都会首先从 HeadContext 开始在 pipeline 中一个一个的向后传递。
极端的情况是如果 pipeline 中所有 ChannelInboundHandler 中相应的 inbound 事件回调方法均不对事件作出处理,并继续向后传播。如下示例代码所示:- public class EchoServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
- ctx.fireChannelRead(msg);
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) {
- ctx.fireChannelReadComplete();
- }
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- ctx.fireChannelRegistered();
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- ctx.fireChannelActive();
- }
- }
复制代码 最终这些 inbound 事件在 pipeline 中得不到处理,最后会传播到 TailContext 中。- final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- onUnhandledInboundMessage(ctx, msg);
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) {
- onUnhandledInboundChannelReadComplete();
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) {
- onUnhandledInboundChannelActive();
- }
- }
复制代码 而在 TailContext 中需要对这些得不到任何处理的 inbound 事件做出最终处理。比如丢弃该 msg,并释放所占用的 directByteBuffer,以免发生内存泄露。- protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
- onUnhandledInboundMessage(msg);
- if (logger.isDebugEnabled()) {
- logger.debug("Discarded message pipeline : {}. Channel : {}.",
- ctx.pipeline().names(), ctx.channel());
- }
- }
- protected void onUnhandledInboundMessage(Object msg) {
- try {
- logger.debug(
- "Discarded inbound message {} that reached at the tail of the pipeline. " +
- "Please check your pipeline configuration.", msg);
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
复制代码 3. pipeline中的事件分类
在前边的系列文章中,笔者多次介绍过,Netty 中的 IO 事件一共分为两大类: inbound 类事件和 outbound 类事件。其实如果严格来分的话应该分为三类。第三种事件类型为 exceptionCaught 异常事件类型。
而 exceptionCaught 事件在事件传播角度上来说和 inbound 类事件一样,都是从 pipeline 的 HeadContext 开始一直向后传递或者从当前 ChannelHandler 开始一直向后传递直到 TailContext 。所以一般也会将 exceptionCaught 事件统一归为 inbound 类事件。
而根据事件类型的分类,相应负责处理事件回调的 ChannelHandler 也会被分为两类:
- ChannelInboundHandler :主要负责响应处理 inbound 类事件回调和 exceptionCaught 事件回调。
- ChannelOutboundHandler :主要负责响应处理 outbound 类事件回调。
那么我们常说的 inbound 类事件和 outbound 类事件具体都包含哪些事件呢?
3.1 inbound类事件
[code]final class ChannelHandlerMask { // inbound事件集合 static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED | MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ | MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED; private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND; // inbound 类事件相关掩码 static final int MASK_EXCEPTION_CAUGHT = 1; static final int MASK_CHANNEL_REGISTERED = 1 |