王國慶 发表于 2024-12-18 15:12:48

Netty网络框架详细讲解

一、Netty基本内容

1.什么是netty?

Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。
异步的:
事件驱动:基于 Java NIO(Non-blocking I/O)的 Selector 实现的。
Netty 的核心设计目标是:

[*]高性能:充实利用 Java NIO 的非阻塞特性。
[*]可扩展性:适用于从简单客户端到复杂服务器的大多数网络应用场景。
[*]易用性:提供高度抽象的 API,屏蔽 NIO 编程的复杂细节。
2. Netty 的核心特性


[*]异步和事件驱动

[*]Netty 的通信是基于事件驱动的,事件由事件循环管理。
[*]异步特性让 Netty 在处理高并发时具有自然上风,避免阻塞操作。

[*]高效的线程模型

[*]Netty 使用少量线程处理大量连接,通过 I/O 多路复用机制实现。
[*]默认采取 Reactor 模式,分为 Boss 和 Worker 两类线程。

[*]灵活的编解码

[*]提供丰富的编码器和解码器支持,包罗 Protobuf、HTTP、WebSocket 等。
[*]支持自定义协议解析。

[*]内存管理

[*]使用内置的 ByteBuf 代替 Java 的 ByteBuffer,提供动态扩展、零拷贝等高效操作。

3. 初识 netty

在讲解netty核心组件之前,我们先通过一个简单的示例,对netty有一个整体的熟悉。这个示例演示的是netty处理I/O请求的能力。
service代码中,我们配置了一个线程池 group,用来监听到客户端的创建请求(connect)和处理 I/O 操作,当客户端发送connect时服务端会调用initChannel 方法初始化客户端和服务端创建的管道 channel,之后服务端和客户端可以通过这个channel来发送和接受消息(由 group 处理)。而且这个channel中有ChannelPipeline,所有通过channel接收或发送的消息都会经过ChannelPipeline中添加的方法来处理。
这个示例中我们只添加了当触发读事件时,通过创建的channel来获取消息,并通过服务端的ChannelPipeline来解码打印输出。 (解码:二进制转成字符串)
public class Service {
    public static void main(String[] args) {
               
                NioEventLoopGroup group = new NioEventLoopGroup();
      
      // 1. 配置和启动 Netty 服务端
      // ServerBootstrap 负责设置服务端的各项配置,包括线程池、通道类型、事件处理器等。
      new ServerBootstrap()
                // 2. 将 group(线程池) 配置到 ServerBootstrap 中。
                .group(group)
                // 3. 指定了服务端使用的通道类型 NioServerSocketChannel,用于监听客户端连接
                .channel(NioServerSocketChannel.class)
                // 4. 设置连接到服务端的每个客户端的通道(NioSocketChannel)如何初始化
                .childHandler(
                        new ChannelInitializer<NioSocketChannel>() {
                            // 6. 每当有新的客户端向服务端建立连接(connect)时,调用initChannel初始化服务端的ChannelPipeline
                            @Override
                            protected void initChannel(NioSocketChannel sc) throws Exception {
                              // 当服务端收到客户端信息时经过ChannelPipeline的处理
                              sc.pipeline().addLast(new StringDecoder());
                              sc.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                    @Override   // 处理接收到的消息
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        System.out.println(msg);
                                    }
                              });
                            }
                        }
                )
                // 5. 绑定端口 8888 并启动服务
                .bind(8888);
    }
}client代码中,我们配置了一个线程池group,添加了解码器,并在和服务端创建连接后通过channel发送 hello world 消息,通过channel 发送的消息会经过客户端的ChannelPipeline,ChannelPipeline对发送的消息进行了编码。(转成二进制)
public class Client {
    public static void main(String[] args) throws InterruptedException {

      // 1. 创建一个 EventLoopGroup,用于客户端的 I/O 操作
      NioEventLoopGroup group = new NioEventLoopGroup();

      // 2. 配置和启动 Netty 客户端
      new Bootstrap()
                // 3. 将 group 配置到 Bootstrap 中,负责客户端的 I/O 操作
                .group(group)
                // 4. 指定了客户端使用的通道类型(NioSocketChannel 用于基于 NIO 的 socket 通信)
                .channel(NioSocketChannel.class)
                // 5. 配置客户端的 ChannelPipeline
                .handler(new ChannelInitializer<NioSocketChannel>() {
                  // 7. 当客户端向服务端建立连接(connect)时调用initChannel初始化客户端的ChannelPipeline
                  @Override
                  protected void initChannel(NioSocketChannel sc) throws Exception {
                        // 发送消息时执行 ChannelPipeline 中的操作
                        sc.pipeline().addLast(new StringEncoder());
                  }
                })
                // 6. 连接到服务端的 8888 端口
                .connect(new InetSocketAddress("localhost", 8888))
                // 8. 阻塞等待直到连接成功
                .sync()
                // 9. 获取客户端和服务端之间的 Channel
                .channel()
                // 10. 向服务端发送消息
                .writeAndFlush("hello world");

      // 12. 关闭客户端的 EventLoopGroup,释放资源
      group.shutdownGracefully();
    }
}service、client、channel、channelPipeline的关系
https://i-blog.csdnimg.cn/direct/56424ea6596a426783e9bb8efcc4f62a.png
二、Netty 的核心组件

1. EventLoop 和 EventLoopGroup

1.1 基本概念

EventLoop 是一个 单线程的事件循环,用于处理 I/O 操作、普通使命和定时使命。

[*]I/O 事件处理:EventLoop 会循环监听和处理 I/O 事件(如网络连接、数据读取、数据写入等)
[*]使命调度:EventLoop 可以实验异步使命,通常是定时使命或需要在 I/O 线程上实验的使命。
EventLoopGroup 是多个 EventLoop 的容器,负责管理其生命周期,并为 Netty 中的 I/O 操作分配线程。

[*]负责创建、分配和管理多个 EventLoop, 每个 EventLoop 绑定一个独立的线程。
[*]为每个新连接(Channel)分配一个 EventLoop,包管 Channel 的事件总是由同一个 EventLoop 处理。
[*]将使命(普通使命、定时使命、I/O 事件)分配到 EventLoop 中运行。
1.2 与 Channel 关联


[*]EventLoopGroup 管理多个 EventLoop, 每个 EventLoop 与一个线程绑定。
[*]一个 EventLoop 内部维护了一个 selector 来管理服务多个 Channel。
[*]每个 Channel 绑定到唯一的 EventLoop,从而包管线程安全。
            |-------(线程1)EventLoop1 ─── Channel1
            |                   └─ Channel2
            |                   └─ Channel3
EventLoopGroup
            |-------(线程2)EventLoop2 ─── Channel4
                                 └─ Channel5
1.3 EventLoopGroup 的实现

EventLoopGroup 是一个接口,我们创建对象时要创建接口的实现,其中前三个可以处理 io操作、普通使命、定时使命,第四个不能处理I/O操作
实现形貌NioEventLoopGroup基于 Java NIO 的 Selector,实现跨平台支持。线程池,可指定线程数,默认是cpu核心数 * 2EpollEventLoopGroup基于 Linux epoll 的实现,性能更高,但只支持 Linux。KQueueEventLoopGroup基于 macOS 和 BSD 系统的 kqueue 实现,性能更高,但只支持这些平台。DefaultEventLoopGroup不处理 I/O 操作,仅用于普通使命调度或定时使命。只有一个线程1.4常用方法


[*]EventLoop 的方法
方法功能next()获取当前 EventLoop 的下一个 EventLoop,通常用于轮询分配。submit提交一个有返回值的使命到当前的 EventLoop 中,返回一个 Future 用于获取结果。execute(Runnable task)提交一个使命到当前 EventLoop 的使命队列中实验。schedule提交一个耽误实验的使命到当前 EventLoop 的使命队列中实验。scheduleAtFixedRate提交一个周期性使命,每隔固定时间间隔实验。scheduleWithFixedDelay提交一个使命,每次实验完成后等待固定耽误时间再实验下一次。parent()获取当前 EventLoop 所属的 EventLoopGroup。register(Channel channel)将一个 Channel 注册到当前 EventLoop,并返回异步结果 ChannelFuture。inEventLoop()判断当前线程是否属于此 EventLoop。inEventLoop(Thread thread)判断指定线程是否属于此 EventLoop。shutdownGracefully()优雅地关闭当前 EventLoop,完成已有使命并释放资源。
[*]EventLoopGrop 的方法
方法功能next()获取一个 EventLoop,通常采取轮询方式分配。execute(Runnable task)提交一个使命到某个 EventLoop 的使命队列中。schedule()提交一个耽误或周期性使命。shutdownGracefully()优雅地关闭所有 EventLoop,释放资源。优雅地是指停止接受新使命,等待之前接收到的使命实验完1.5 实验使命

EventLoopGroup group = new NioEventLoopGroup(4); // 创建一个包含4个线程的 EventLoopGroup

// 获取EventLoop
EventLoop eventLoop1 = group.next(); // 获取一个 EventLoop
EventLoop eventLoop2 = group.next(); // 轮询获取下一个 EventLoop

// 提交任务,立即执行
eventLoop1.execute(() -> {
    System.out.println("Task executed in EventLoop1: " + Thread.currentThread().getName());
});

// 提交任务,延迟执行
eventLoop2.schedule(() -> {
    System.out.println("Scheduled task executed in EventLoop2");
}, 3, TimeUnit.SECONDS); // 延迟3秒执行任务

// 关闭 EventLoopGroup
group.shutdownGracefully(); // 优雅关闭所有 EventLoop1.6 细分 EventLoopGroup

在一开始给大家的代码中只用到了一个线程池来处理连接请求(connect)和处理I/O。在实际开发中一般会单独用一个线程来处理连接请求,另一个线程池来处理I/O操作。同时,对于比力复杂的不涉及I/O操作的使命,我们也可以再创建一个线程池来专门处理。
下面代码补充了上面讲的这两个功能
public class Service {
    public static void main(String[] args) {
      // 创建两个 EventLoopGroup,分别处理 Boss Group 线程池 和 Worker Group 线程池
      EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 只需要一个线程来接受客户端的连接请求
      EventLoopGroup workerGroup = new NioEventLoopGroup(); // 默认为 CPU 核数 * 2 个线程处理 I/O 操作
      DefaultEventLoopGroup defaultGroup = new DefaultEventLoopGroup(1);

      // 1. 配置和启动 Netty 服务端
      // ServerBootstrap 负责设置服务端的各项配置,包括线程池、通道类型、事件处理器等。
      new ServerBootstrap()
                // 2. 将 Boss Group 和 Worker Group 配置到 ServerBootstrap 中。
                // bossGroup 负责处理客户端的连接请求,workerGroup 负责处理已连接客户端的 I/O 操作
                .group(bossGroup, workerGroup)
                // 3. 指定了服务端使用的通道类型 NioServerSocketChannel,用于监听客户端连接
                .channel(NioServerSocketChannel.class)
                // 4. 设置连接到服务端的每个客户端的通道(NioSocketChannel)如何初始化
                .childHandler(
                        new ChannelInitializer<NioSocketChannel>() {
                            // 6. 每当有新的客户端向服务端建立连接(connect)时,调用initChannel初始化服务端的ChannelPipeline
                            @Override
                            protected void initChannel(NioSocketChannel sc) throws Exception {
                              // 当服务端收到客户端信息时经过ChannelPipeline的处理
                              sc.pipeline().addLast(new StringDecoder());
                              sc.pipeline().addLast("handle1", new ChannelInboundHandlerAdapter() {
                                    @Override   // 处理接收到的消息
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        System.out.println(msg);
                                        // 提交任务到 DefaultEventLoop 中来执行 handle2
                                        defaultGroup.submit(() -> {
                                          // 在 DefaultEventLoop 中处理消息
                                          System.out.println("Handle2: " + msg);
                                        });
                                    }
                              });
                            }
                        }
                )
                // 5. 绑定端口 8888 并启动服务
                .bind(8888);
    }
}2. Channel

Channel 是 Netty 中用于数据传输的核心组件,代表了 I/O 操作的端点。它用于处理连接、接收和发送数据,并且与具体的 I/O 模型(如 NIO、Epoll 或 KQueue)紧密集成。

[*]Channel 重要用于处理 I/O 操作,支持异步 I/O 事件的处理和数据传输。
[*]在 Netty 中,Channel 是与客户端和服务端之间的连接一一对应的。
2.1 Channel 的类型

Netty 提供了多种类型的 Channel,每种类型根据差别的协议和传输方式进行优化。常见的 Channel 类型如下:
类型形貌NioServerSocketChanne基于 Java NIO 的 TCP 服务器通道,专门用于监听和接受传入的 TCP 连接请求。NioSocketChannel基于 Java NIO 的 TCP 客户端与服务端通道,通常用于实现基于 TCP 协议的连接。EpollServerSocketChannel基于 Linux epoll 的 TCP 服务器通道,仅支持 Linux 系统,用于监听 TCP 连接请求。EpollSocketChannel基于 Linux epoll 的通道,性能较高,只支持 Linux 系统,适用于需要高效 I/O 操作的场景。NioDatagramChannel基于 Java NIO 的 UDP 通道,适用于 UDP 协议的网络通信。2.2 Channel 的基本功能

Netty 的 Channel 提供了一些基础的操作,以下是最常用的一些功能:

[*]连接管理: 用于创建、绑定、连接和关闭连接。
[*]数据读写: 支持从 Channel 中读取数据和向 Channel 写入数据。
[*]事件触发: 可以处理来自客户端或服务器的 I/O 事件(如接收数据、写入数据等)。
[*]流量控制: 支持背压机制,控制数据的读写速率。
2.3 常用方法

方法功能connect(SocketAddress remoteAddress)连接到指定的远程所在(如服务端)。close()关闭当前的 Channel,释放资源。write(Object msg)向 Channel 写入数据。通常是发送到对方的缓冲区,数据并未立刻发送,需要调用 flush() 方法来发送。flush()革新 Channel 缓冲区,将之前写入的数据发送出去。read()从 Channel 中读取数据。bind(SocketAddress localAddress)将 Channel 绑定到本地所在,监听来自客户端的连接请求。isOpen()检查 Channel 是否已打开。返回 true 表示可以继承使用该 Channel,false 则表示已经关闭。isActive()检查 Channel 是否是活动的,即是否已经连接上远程所在并处于有效状态。eventLoop()获取当前 Channel 所绑定的 EventLoop。config()获取 Channel 配置信息,返回一个 ChannelConfig 对象,包罗各种 I/O 配置选项。2.4 ChannelPipeline

ChannelPipeline 是一个链式布局,负责管理 I/O 操作的处理逻辑。在一个 Channel 中,每个数据的读取、写入操作都会经过一系列处理器(ChannelHandler)的加工。这些处理器可以实验多种使命,如编码、解码、协议解析等。

[*]ChannelHandler:用于处理 I/O 操作的具体业务逻辑,如编解码、业务处理、非常处理等。
[*]ChannelPipeline:由多个 ChannelHandler 组成,形成一个处理链。每当发生 I/O 事件时,事件会在 ChannelPipeline 中按照次序流动,逐个交给对应的处理器处理。
2.5 获取和关闭channel

由于connect和close方法是异步实验的,也就是在另外的线程中实验,主线程只负责调用,以是获取 channel 分为两种情况,一种是在主线程中获取channel并实验业务代码,另一种情况是在实验connect的线程实验业务代码。关闭channel同样,要么是主线程,要么是close的线程。

[*]客户端通过同步的方式获取和关闭 channel
public class Client {
    private static final Logger log = LoggerFactory.getLogger(Main.class);
    public static void main(String[] args) throws InterruptedException {
      ChannelFuture connectFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                  }
                })
                .connect("localhost", 8888);

      // 1. 在主线程中获取channel,并输出
      connectFuture.sync();        // 阻塞,直到connect线程执行成功
      Channel channel = connectFuture.channel();
      log.info("{}", channel);

      new Thread(() -> {
            Scanner sc = new Scanner(System.in);
            while (true) {
                String line = sc.nextLine();
                if ("q".equals(line)) {
                  channel.close();
                  // close是异步的,所以如果在close后面直接写善后工作的代码,无法保证其在close之后执行
                  break;
                }
                channel.writeAndFlush(line);
            }
      }).start();

      // 2. 关闭channel, 并执行关闭后的善后工作
      ChannelFuture closeFuture = channel.closeFuture();// 获取关闭的结果
      closeFuture.sync(); // 阻塞, 直到调用close
      log.info("{}", closeFuture);
    }
}
// 输出
20:29:15.955 -
20:29:15.960 - 0
20:29:15.960 - 1
hello
q
20:29:42.378 - AbstractChannel$CloseFuture@70e9c95d(success)
[*]客户端通过异步的方式获取和关闭channel
public class Client {
    private static final Logger log = LoggerFactory.getLogger(Main.class);
    public static void main(String[] args) throws InterruptedException {
      ChannelFuture connectFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                  }
                })
                .connect("localhost", 8888);

      // 1. 在connect线程中获取channel,并输出
      connectFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Channel channel = future.channel();
                log.info("{}", channel);

                Scanner sc = new Scanner(System.in);
                while (true) {
                  String line = sc.nextLine();
                  if ("q".equals(line)) {
                        channel.close();
                        break;
                  }
                  channel.writeAndFlush(line);
                }
            }
      });

      // 2. 关闭channel, 并执行关闭后的善后工作
      log.info("0");
      ChannelFuture closeFuture = connectFuture.sync().channel().closeFuture();// 关闭并返回结果
      closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.info("{}", future);
            }
      });
    }
}观察可以看到,对于异步实验业务代码都是通过 ChannelFuture 的 addListener 方法给这个future添加一个回调函数。
3. Future 和 Promise

在 Netty 中,Future 是一个异步操作的结果容器,用于表示当前使命的实验状态以及操作的结果或失败原因。相比于 Java 原生的 Future 接口,Netty 提供了功能更加强大的 ChannelFuture 和 Promise,支持异步回调和链式操作,极大地方便了异步编程。
3.1 Future 异步回调


[*]同步操作
"hello world" 由主线程输出。这点跟 Java juc 中的 future 雷同,调用get时阻塞等待future返回结果,然后继承业务的处理
public class Main {
    private static final Logger log = LoggerFactory.getLogger(Main.class);
    public static void main(String[] args) throws ExecutionException, InterruptedException {
      EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      EventLoop next = eventLoopGroup.next();
      Future<String> resFuture = next.submit(() -> {
            sleep(1000);
            return "hello world";
      });
      log.info("main run");
      log.info("{}", resFuture.get());
      eventLoopGroup.shutdownGracefully();
    }
}
// 输出
21:49:34.367 - main run
21:49:35.379 - hello world
[*]异步回调
观察输出可以看到 "hello world" 不再是由 main 线程输出
public class Main {    private static final Logger log = LoggerFactory.getLogger(Main.class);    public static void main(String[] args) throws ExecutionException, InterruptedException {      EventLoopGroup eventLoopGroup = new NioEventLoopGroup();      EventLoop next = eventLoopGroup.next();      Future resFuture = next.submit(() -> {            sleep(1000);            return "hello world";      });      log.info("main run");      resFuture.addListener(new GenericFutureListener
页: [1]
查看完整版本: Netty网络框架详细讲解