Netty的源码分析和业务场景

打印 上一主题 下一主题

主题 907|帖子 907|积分 2721

Netty 是一个高性能、异步事件驱动的网络应用框架,它基于 Java NIO 构建,广泛应用于互联网、大数据、游戏开发、通信行业等多个领域。以下是对 Netty 的源码分析、业务场景的详细先容:
源码概述


  • Netty 的核心组件:Netty 的架构设计围绕着事件驱动的核心思想,主要包括 Channel、EventLoopGroup、ChannelHandlerContext 和 ChannelPipeline 等关键概念。
  • Channel:是网络连接的抽象表示,每个 Channel 都有一个或多个 ChannelHandler 来处理网络事件,如连接建立、数据汲取等。
  • EventLoopGroup:是一组 EventLoop 的聚集,每个 EventLoop 负责处理一组 Channel 的 I/O 事件。当 Channel 的事件触发时,相应的 EventLoop 会调用 ChannelHandler 中的方法举行处理。
  • ChannelPipeline:是 ChannelHandler 的有序聚集,用于处理进来的和出站的数据。通过在 Pipeline 中添加差别的 Handler,可以实现复杂的业务逻辑。
  • 源码中的关键流程:Netty 的源码分析需要关注的关键流程包括初始化、Channel 的注册、EventLoop 的工作流程、以及连接的建立和绑定过程。
Netty 提供了一个 Echo 示例,用于演示客户端和服务器端的基本通信流程。在这个示例中,客户端发送的消息被服务器端汲取并原样返回,展示了 Netty 处理网络通信的基本方法。
下面 V 哥来详细先容一下这几外关键核心组件。
1. Channel组件

Netty 的 Channel 组件是整个框架的核心之一,它代表了网络中的一个连接,可以是客户端的也可以是服务器端的。Channel 是一个低级别的接口,用于执行网络 I/O 操作。以下是对 Channel 组件的源码分析和解释:
Channel 接口定义

Channel 接口定义了一组操作网络连接的方法,例如绑定、连接、读取、写入和关闭。
  1. public interface Channel extends AttributeMap {
  2.     /**
  3.      * Returns the {@link ChannelId} of this {@link Channel}.
  4.      */
  5.     ChannelId id();
  6.     /**
  7.      * Returns the parent {@link Channel} of this channel. {@code null} if this is the top-level channel.
  8.      */
  9.     Channel parent();
  10.     /**
  11.      * Returns the {@link ChannelConfig} of this channel.
  12.      */
  13.     ChannelConfig config();
  14.     /**
  15.      * Returns the local address of this channel.
  16.      */
  17.    SocketAddress localAddress();
  18.     /**
  19.      * Returns the remote address of this channel. {@code null} if the channel is not connected.
  20.      */
  21.     SocketAddress remoteAddress();
  22.     /**
  23.      * Returns {@code true} if this channel is open and may be used.
  24.      */
  25.     boolean isOpen();
  26.     /**
  27.      * Returns {@code true} if this channel is active and may be used for IO.
  28.      */
  29.     boolean isActive();
  30.     /**
  31.      * Returns the {@link ChannelPipeline}.
  32.      */
  33.     ChannelPipeline pipeline();
  34.     /**
  35.      * Returns the {@link ChannelFuture} which is fired once the channel is registered with its {@link EventLoop}.
  36.      */
  37.     ChannelFuture whenRegistered();
  38.     /**
  39.      * Returns the {@link ChannelFuture} which is fired once the channel is deregistered from its {@link EventLoop}.
  40.      */
  41.     ChannelFuture whenDeregistered();
  42.     /**
  43.      * Returns the {@link ChannelFuture} which is fired once the channel is closed.
  44.      */
  45.     ChannelFuture whenClosed();
  46.     /**
  47.      * Register this channel to the given {@link EventLoop}.
  48.      */
  49.     ChannelFuture register(EventLoop loop);
  50.     /**
  51.      * Bind and listen for incoming connections.
  52.      */
  53.     ChannelFuture bind(SocketAddress localAddress);
  54.     /**
  55.      * Connect to the given remote address.
  56.      */
  57.     ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
  58.     /**
  59.      * Disconnect if connected.
  60.      */
  61.     ChannelFuture disconnect();
  62.     /**
  63.      * Close this channel.
  64.      */
  65.     ChannelFuture close();
  66.     /**
  67.      * Deregister this channel from its {@link EventLoop}.
  68.      */
  69.     ChannelFuture deregister();
  70.     /**
  71.      * Write the specified message to this channel.
  72.      */
  73.     ChannelFuture write(Object msg);
  74.     /**
  75.      * Write the specified message to this channel and generate a {@link ChannelFuture} which is done
  76.      * when the message is written.
  77.      */
  78.     ChannelFuture writeAndFlush(Object msg);
  79.     /**
  80.      * Flushes all pending messages.
  81.      */
  82.     ChannelFuture flush();
  83.     // ... 更多方法定义
  84. }
复制代码
Channel 的关键方法


  • id(): 返回 Channel 的唯一标识符。
  • parent(): 返回父 Channel,如果是顶级 Channel,则返回 null。
  • config(): 获取 Channel 的配置信息。
  • localAddress() 和 remoteAddress(): 分别返回本地和长途地址。
  • isOpen() 和 isActive(): 分别检查 Channel 是否打开和激活。
  • pipeline(): 返回与 Channel 关联的 ChannelPipeline,它是处理网络事件的处理器链。
  • register(), bind(), connect(), disconnect(), close(), deregister(): 这些方法用于执行网络 I/O 操作。
Channel 的实现类

Netty 为差别类型的网络通信协议提供了多种 Channel 的实现,例如:

  • NioSocketChannel:用于 NIO 传输的 TCP 协议的 Channel 实现。
  • NioServerSocketChannel:用于 NIO 传输的 TCP 服务器端 Channel 实现。
  • OioSocketChannel 和 OioServerSocketChannel:雷同 NIO,但是用于阻塞 I/O。
Channel 的生命周期


  • 创建:Channel 通过其工厂方法创建,通常与特定的 EventLoop 关联。
  • 注册:Channel 必须注册到 EventLoop 上,以便可以处理 I/O 事件。
  • 绑定/连接:服务器端 Channel 绑定到特定地址并开始监听;客户端 Channel 连接到长途地址。
  • 读取和写入:通过 Channel 读取和写入数据。
  • 关闭:关闭 Channel,开释相关资源。
Channel 的事件处理

Channel 的事件处理是通过 ChannelPipeline 和 ChannelHandler 完成的。ChannelPipeline 是一个处理器链,负责处理所有的 I/O 事件和 I/O 操作。每个 Channel 都有一个与之关联的 ChannelPipeline,可以通过 Channel 的 pipeline() 方法访问。
异步处理

Channel 的操作(如绑定、连接、写入、关闭)都是异步的,返回一个 ChannelFuture 对象,允许开发者设置回调,当操作完成或失败时执行。
内存管理

Netty 的 Channel 实现还涉及内存管理,利用 ByteBuf 作为数据容器,它是一个可变的字节容器,提供了一系列的操作方法来读写网络数据。
小结

Channel 是 Netty 中的一个核心接口,它定义了网络通信的基本操作。Netty 提供了多种 Channel 的实现,以支持差别的 I/O 模子和协议。通过 Channel,Netty 实现了高性能、异步和事件驱动的网络通信。
2. EventLoopGroup组件

EventLoopGroup 是 Netty 中一个非常告急的组件,它负责管理一组 EventLoop,每个 EventLoop 可以处理多个 Channel 的 I/O 事件。以下是对 EventLoopGroup 组件的详细分析和解释:
EventLoopGroup 接口定义

EventLoopGroup 接口定义了一组管理 EventLoop 的方法,以下是一些关键方法:
  1. public interface EventLoopGroup extends ExecutorService {
  2.     /**
  3.      * Returns the next {@link EventLoop} this group will use to handle an event.
  4.      * This will either return an existing or a new instance depending on the implementation.
  5.      */
  6.     EventLoop next();
  7.     /**
  8.      * Shuts down all {@link EventLoop}s and releases all resources.
  9.      */
  10.     ChannelFuture shutdownGracefully();
  11.     /**
  12.      * Shuts down all {@link EventLoop}s and releases all resources.
  13.      */
  14.     ChannelFuture shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
  15.     /**
  16.      * Returns a copy of the list of all {@link EventLoop}s that are part of this group.
  17.      */
  18.     List<EventLoop> eventLoops();
  19. }
复制代码
EventLoopGroup 的关键方法


  • next(): 返回下一个 EventLoop,用于处理事件。这可以是现有的 EventLoop 或者新创建的实例,具体取决于实现。
  • shutdownGracefully(): 优雅地关闭所有 EventLoop 并开释所有资源。这个方法允许指定一个静默期和一个超时时间,以便在关闭之前等候所有任务完成。
  • eventLoops(): 返回当前 EventLoopGroup 中所有 EventLoop 的列表。
EventLoopGroup 的实现类

Netty 提供了几种 EventLoopGroup 的实现,主要包括:

  • DefaultEventLoopGroup: 默认的 EventLoopGroup 实现,利用 NioEventLoop 作为其 EventLoop 实现。
  • EpollEventLoopGroup: 特定于 Linux 的 EventLoopGroup 实现,利用 EpollEventLoop 作为其 EventLoop 实现,利用 Linux 的 epoll 机制提高性能。
  • OioEventLoopGroup: 阻塞 I/O 模式下的 EventLoopGroup 实现,利用 OioEventLoop 作为其 EventLoop 实现。
EventLoopGroup 的工作原理


  • 创建: EventLoopGroup 通过其构造函数创建,可以指定线程数。
  • 注册: Channel 需要注册到 EventLoop 上,以便 EventLoop 可以处理其 I/O 事件。
  • 事件循环: 每个 EventLoop 在其线程中运行一个事件循环,处理注册到它的 Channel 的 I/O 事件。
  • 关闭: EventLoopGroup 可以被关闭,开释所有资源。
EventLoopGroup 的线程模子


  • 单线程模子: 一个 EventLoopGroup 只包含一个 EventLoop,适用于小容量应用。
  • 多线程模子: 一个 EventLoopGroup 包含多个 EventLoop,每个 EventLoop 在单独的线程中运行,适用于高并发应用。
EventLoopGroup 的利用场景


  • 服务器端: 在服务器端,通常利用两个 EventLoopGroup。一个用于接受连接(bossGroup),一个用于处理连接(workerGroup)。bossGroup 通常利用较少的线程,而 workerGroup 可以根据需要处理更多的并发连接。
  • 客户端端: 在客户端,通常只需要一个 EventLoopGroup,用于处理所有的连接。
示例代码

以下是如何在 Netty 中利用 EventLoopGroup 的示例代码:
  1. public class NettyServer {
  2.     public static void main(String[] args) {
  3.         EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 用于接受连接
  4.         EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理连接
  5.         try {
  6.             ServerBootstrap b = new ServerBootstrap();
  7.             b.group(bossGroup, workerGroup)
  8.              .channel(NioServerSocketChannel.class)
  9.              .childHandler(new ChannelInitializer<SocketChannel>() {
  10.                  @Override
  11.                  public void initChannel(SocketChannel ch) throws Exception {
  12.                      ChannelPipeline p = ch.pipeline();
  13.                      p.addLast(new LoggingHandler());
  14.                      p.addLast(new MyServerHandler());
  15.                  }
  16.              });
  17.             ChannelFuture f = b.bind(8080).sync(); // 绑定端口并启动服务器
  18.             System.out.println("Server started on port 8080");
  19.             f.channel().closeFuture().sync();
  20.         } catch (InterruptedException e) {
  21.             e.printStackTrace();
  22.         } finally {
  23.             bossGroup.shutdownGracefully();
  24.             workerGroup.shutdownGracefully();
  25.         }
  26.     }
  27. }
复制代码
在这个示例中,bossGroup 用于接受连接,workerGroup 用于处理连接。通过 ServerBootstrap 类配置服务器,并利用 ChannelInitializer 来设置 Channel 的处理器链。
总结

EventLoopGroup 是 Netty 中管理事件循环的核心组件,它通过 EventLoop 处理 I/O 事件,支持高并发和异步操作。通过公道配置 EventLoopGroup,可以显著提高网络应用的性能和可扩展性。
3. ChannelPipeline组件

ChannelPipeline 是 Netty 中的一个核心组件,它负责管理一组 ChannelHandler,并且定义了 I/O 事件和操作如何在这些处理器之间流动。以下是对 ChannelPipeline 组件的详细分析和解释:
ChannelPipeline 接口定义

ChannelPipeline 是一个接口,定义了操作 ChannelHandler 的方法:
  1. public interface ChannelPipeline extends Iterable<ChannelHandler> {
  2.     /**
  3.      * Add the specified handler to the context of the current channel.
  4.      */
  5.     void addLast(EventExecutorGroup executor, String name, ChannelHandler handler);
  6.     /**
  7.      * Add the specified handlers to the context of the current channel.
  8.      */
  9.     void addLast(EventExecutorGroup executor, ChannelHandler... handlers);
  10.     // ... 省略其他 addFirst, addBefore, addAfter, remove, replace 方法
  11.     /**
  12.      * Get the {@link ChannelHandler} by its name.
  13.      */
  14.     ChannelHandler get(String name);
  15.     /**
  16.      * Find the first {@link ChannelHandler} in the {@link ChannelPipeline} that matches the specified class.
  17.      */
  18.     ChannelHandler first();
  19.     /**
  20.      * Find the last {@link ChannelHandler} in the {@link ChannelPipeline} that matches the specified class.
  21.      */
  22.     ChannelHandler last();
  23.     /**
  24.      * Returns the context object of the specified handler.
  25.      */
  26.     ChannelHandlerContext context(ChannelHandler handler);
  27.     // ... 省略 contextFor, remove, replace, fireChannelRegistered, fireChannelUnregistered 等方法
  28. }
复制代码
ChannelPipeline 的关键方法


  • addLast(String name, ChannelHandler handler): 在管道的末尾添加一个新的处理器,并为其指定一个名称。
  • addFirst(String name, ChannelHandler handler): 在管道的开头添加一个新的处理器。
  • addBefore(String baseName, String name, ChannelHandler handler): 在指定处理器前添加一个新的处理器。
  • addAfter(String baseName, String name, ChannelHandler handler): 在指定处理器后添加一个新的处理器。
  • get(String name): 根据名称获取 ChannelHandler。
  • first() 和 last(): 分别获取管道中的第一个和末了一个处理器。
  • context(ChannelHandler handler): 获取指定处理器的上下文。
ChannelHandlerContext

ChannelHandlerContext 是 ChannelHandler 和 ChannelPipeline 之间的桥梁,提供了访问和管理 Channel、ChannelPipeline 和 ChannelFuture 的能力:
  1. public interface ChannelHandlerContext extends AttributeMap, ResourceLeakHint {
  2.     /**
  3.      * Return the current channel to which this context is bound.
  4.      */
  5.     Channel channel();
  6.     /**
  7.      * Return the current pipeline to which this context is bound.
  8.      */
  9.     ChannelPipeline pipeline();
  10.     /**
  11.      * Return the name of the {@link ChannelHandler} which is represented by this context.
  12.      */
  13.     String name();
  14.     /**
  15.      * Return the {@link ChannelHandler} which is represented by this context.
  16.      */
  17.     ChannelHandler handler();
  18.     // ... 省略其他方法
  19. }
复制代码
ChannelPipeline 的工作原理

ChannelPipeline 维护了一个双向链表的 ChannelHandler 聚集。每个 Channel 实例都有一个与之关联的 ChannelPipeline。当 I/O 事件发生时,如数据被读取到 Channel,该事件会被传递到 ChannelPipeline,然后按照 ChannelHandler 在管道中的顺序举行处理。
处理器的执行顺序


  • 入站事件:当数据被读取到 Channel 时,事件会从管道的尾部向头部传递,直到某个 ChannelHandler 处理该事件。
  • 出站事件:当需要发送数据时,事件会从管道的头部向尾部传递,直到数据被写出。
源码分析

ChannelPipeline 的实现类 DefaultChannelPipeline 内部利用了一个 ChannelHandler 的双向链表来维护处理器的顺序:
  1. private final AbstractChannelHandlerContext head;
  2. private final AbstractChannelHandlerContext tail;
  3. private final List<ChannelHandler> handlers = new ArrayList<ChannelHandler>();
复制代码

  • head 和 tail 是链表的头尾节点。
  • handlers 是存储所有处理器的列表。
添加处理器时,DefaultChannelPipeline 会更新链表和列表:
  1. public void addLast(EventExecutorGroup executor, String name, ChannelHandler handler) {
  2.     if (handler == null) {
  3.         throw new NullPointerException("handler");
  4.     }
  5.     if (name == null) {
  6.         throw new NullPointerException("name");
  7.     }
  8.     AbstractChannelHandlerContext newCtx = new TailContext(this, executor, name, handler);
  9.     synchronized (this) {
  10.         if (tail == null) {
  11.             head = tail = newCtx;
  12.         } else {
  13.             tail.next = newCtx;
  14.             newCtx.prev = tail;
  15.             tail = newCtx;
  16.         }
  17.         handlers.add(newCtx);
  18.     }
  19. }
复制代码
小结

ChannelPipeline 是 Netty 中处理网络事件和请求的管道,它通过维护一个 ChannelHandler 的链表来管理事件的流动。通过 ChannelHandlerContext,ChannelHandler 可以大概访问和修改 Channel 和 ChannelPipeline 的状态。这种设计使得事件处理流程高度可定制和灵活,是 Netty 高性能和易于利用的关键因素之一。
4. 源码中的关键流程

在 Netty 的 ChannelPipeline 的源码中,关键流程涉及处理器的添加、事件的触发、以及事件在处理器之间的流动。以下是一些关键流程的分析:
1. 处理器的添加

当创建 ChannelPipeline 并准备添加 ChannelHandler 时,需要确定处理器的顺序和位置。Netty 允许开发者在管道的开始、结束或指定位置插入处理器。
  1. ChannelPipeline pipeline = channel.pipeline();
  2. pipeline.addLast("myHandler", new MyChannelHandler());
复制代码
在 DefaultChannelPipeline 类中,处理器被添加到一个双向链表中,每个处理器节点(AbstractChannelHandlerContext)保存了指向前一个和后一个处理器的引用。
2. 事件循环和触发

每个 Channel 都与一个 EventLoop 关联,EventLoop 负责处理所有注册到它上面的 Channel 的事件。当 EventLoop 运行时,它会不断地循环,等候并处理 I/O 事件。
  1. // EventLoop 的事件循环
  2. public void run() {
  3.     for (;;) {
  4.         // ...
  5.         processSelectedKeys();
  6.         // ...
  7.     }
  8. }
复制代码
3. 事件的捕获和传递

当 EventLoop 检测到一个 I/O 事件(如数据到达)时,它会触发相应的操作。对于 ChannelPipeline 来说,这意味着需要调用得当的 ChannelHandler 方法。
  1. // 伪代码,展示了事件如何被传递到 ChannelHandler
  2. if (channelRead) {
  3.     pipeline.fireChannelRead(msg);
  4. }
复制代码
4. 入站和出站事件的处理


  • 入站事件(如数据被读取)通常从 ChannelPipeline 的尾部开始传递,沿着管道向前,直到某个处理器处理了该事件。
  • 出站事件(如写数据)则从 ChannelPipeline 的头部开始传递,沿着管道向后,直到数据被写出。
  1. // 入站事件处理
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  3.     // 处理消息或传递给下一个处理器
  4.     ctx.fireChannelRead(msg);
  5. }
  6. // 出站事件处理
  7. public void write(ChannelHandlerContext ctx, Object msg) {
  8.     // 写消息或传递给下一个处理器
  9.     ctx.write(msg);
  10. }
复制代码
5. 处理器链的遍历

ChannelPipeline 需要可以大概遍历处理器链,以便按顺序触发事件。这通常通过从 ChannelHandlerContext 获取下一个或前一个处理器来实现。
  1. // 伪代码,展示了如何获取下一个处理器并调用它
  2. ChannelHandlerContext nextCtx = ctx.next();
  3. if (nextCtx != null) {
  4.     nextCtx.invokeChannelRead(msg);
  5. }
复制代码
6. 动态修改处理器链

在事件处理过程中,大概需要动态地修改处理器链,如添加新的处理器或移除当前处理器。
  1. pipeline.addLast("newHandler", new AnotherChannelHandler());
  2. pipeline.remove(ctx.handler());
复制代码
7. 资源管理和清理

当 Channel 关闭时,ChannelPipeline 需要确保所有的 ChannelHandler 都可以大概执行它们的清理逻辑,开释资源。
  1. public void channelInactive(ChannelHandlerContext ctx) {
  2.     // 清理逻辑
  3. }
复制代码
8. 非常处理

在事件处理过程中,如果抛出非常,ChannelPipeline 需要可以大概捕获并适本地处理这些非常,避免影响整个管道的运行。
  1. try {
  2.     // 可能抛出异常的操作
  3. } catch (Exception e) {
  4.     ctx.fireExceptionCaught(e);
  5. }
复制代码
小结

ChannelPipeline 的源码中包含了多个关键流程,确保了事件可以大概按顺序在处理器之间传递,同时提供了动态修改处理器链和非常处理的能力。这些流程共同构成了 Netty 中事件驱动的网络编程模子的基础。
业务场景


  • 微服务架构:Netty 可以作为 RPC 框架的基础,实现服务间的高效通信。
  • 游戏服务器:由于游戏行业对延迟和并发要求极高,Netty 的异步非阻塞特性非常得当构建高并发的游戏服务器。
  • 实时通信体系:Netty 可用于构建如即时消息、视频会议等需要低延迟数据传输的实时通信体系。
  • 物联网平台:Netty 可以作为设备与云平台之间的通信桥梁,处理大规模的设备连接和数据流。
  • 互联网行业:在分布式体系中,Netty 常作为基础通信组件被 RPC 框架利用,例如阿里的分布式服务框架 Dubbo 利用 Netty 作为其通信组件。
  • 大数据领域:Netty 也被用于大数据技能的网络通信部分,例如 Hadoop 的高性能通信组件 Avro 的 RPC 框架就采用了 Netty。
末了

通过深入分析 Netty 的源码和理解其在差别业务场景下的应用,开发者可以更好地利用这一强大的网络编程框架,构建高效、稳固且可扩展的网络应用。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户云卷云舒

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表