Netty 学习笔记

打印 上一主题 下一主题

主题 580|帖子 580|积分 1740

Java 网络编程

早期的 Java API 只支持由本地系统套接字库提供的所谓的阻塞函数,下面的代码展示了一个使用传统 Java API 的服务器代码的普通示例
  1. // 创建一个 ServerSocket 用以监听指定端口上的连接请求
  2. ServerSocket serverSocket = new ServerSocket(5000);
  3. // 对 accept 方法的调用将被阻塞,直到一个连接建立
  4. Socket clientSocket = serverSocket.accept();
  5. // 这些流对象都派生于该套接字的流对象
  6. BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
  7. PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
  8. String request, response;
  9. // 客户端发送了 "Done" 则退出循环
  10. while ((request = in.readLine()) != null) {
  11.     if ("Done".equals(request)) {
  12.         break;
  13.     }
  14.     // 请求被传递给服务器的处理方法
  15.     response = processRequest(request);
  16.     // 服务器的响应被发送给客户端
  17.     out.println(response);
  18. }
复制代码
这段代码只能同时处理一个毗连,要管理多个客户端,就要为每个新的客户端 Socket 创建一个新的 Thread,让我们来考虑一下这种方案的影响:

  • 在任何时间都会有大量线程处于休眠状态,造成资源浪费
  • 需要为每个线程的调用栈都分配内存
  • 线程的上下文切换会带来开销
这种并发方案对于中小数目的客户端还算理想,但不能很好地支持更大的并发

Java NIO

NIO(Non-blocking I/O,也称 New I/O),是一种同步非阻塞的 I/O 模子,也是 I/O 多路复用的基础。传统的 IO 流是阻塞的,这意味着,当一个线程调用读或写操作时,线程将被阻塞,直至数据被完全读取或写入。NIO 的非阻塞模式,使一个线程进行读或写操作时,如果目前无数据可用时,就不做操作,而不是保持线程阻塞,所以直至数据就绪以前,线程可以继续做其他事情
class java.nio.channels.Selector 是 Java 非阻塞 IO 实现的关键。它使用事件通知 API 以确定在一组非阻塞套接字中有哪些已经就绪并能进行 IO 相关操作。因为可以在任何时间点任意检查读操作或写操作的完成情况,所以单一线程可以处理多个并发的毗连

与阻塞 IO 模子相比,这种模子提供了更好的资源管理:

  • 使用较少的线程便可以处理许多毗连,减少内存管理和上下文切换所带来的开销
  • 当没有 IO 操作需要处理时,线程也可以用户其他任务

Reactor 线程模子

Reactor 是一种并发处理客户端哀求与相应的事件驱动模子。服务端在接收到客户哀求后采用多路复用策略,通过一个非阻塞的线程来异步接收所有的客户端哀求,并将这些哀求转发到相关的工作线程组进行处理。
Reactor 模子常常基于异步线程实现,常用的 Reactor 线程模子有三种:Reactor 单程模子、Reactor 多线程模子和 Reactor 主备多线程模子
1. Reactor 单线程模子

Reactor 单线程模子指所有的客户端 IO 哀求都在同一个线程(Thread)上完成。Reactor 单线程模子的各模块组成及职责如图所示

  • Client:NIO 客户端,向服务端发起 TCP 毗连,并发送数据
  • Acceptor:NIO 服务端,通过 Acceptor 接收客户端的 TCP 毗连
  • Dispatcher:接收客户端的数据并将数据以 ByteBuffer 的形式发送到对应的编解码器
  • DecoderHandler:解码器,读取客户端的数据并进行数据解码及处理和消息应答
  • EncoderHandler:编码器,将向客户端发送的数据(消息哀求或消息应答)进行统一的编码处理,并写入通道

由于 Reactor 模式使用的是异步非阻塞 IO,因此一个线程可以独立处理多个 IO 相关的操作。Reactor 单线程模子将所有 IO 操作都集中在一个线程中处理,其处理流程如下:

  • Acceptor 接收客户端的 TCP 毗连哀求消息
  • 在链路创建成功后通过 Dispatcher 将接收到的消息写入 ByteBuffer,并派发到对应的 DecoderHandler 进行消息解码和处理
  • 在消息处理完成后调用对应的 EncoderHandler 将该哀求对应的相应消息进行编码和下发
2. Reactor 多线程模子

Reactor 多线程模子与单线程模子最大的区别在于,它使用线程池(ThreadPoll)处理客户端的 IO 哀求。Reactor 多线程模子如图所示

3. Reactor 主备多线程模子

在 Reactor 主备多线程模子中,服务端用于接收客户端毗连的不再是一个 NIO 线程而是一个独立的 NIO 线程池。主线程 Acceptor 在接收到客户端的 TCP 毗连哀求并创建完成毗连后(大概要经过鉴权、登录等过程),将新创建的 SocketChannel 注册到子 I/O 线程池(Sub Reactor Pool)的某个 I/O 线程上,由它负责具体的 SocketChannel 读写、编解码、业务处理工作。这样就将客户端毗连的创建和息的相应都以异步线程的方式来实现,大大提高了系统的吞吐量。Reactor 主备多线程模子如图所示


Netty 概述

Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 Java NIO 提供的 API 实现提供了对 TCP、UDP 和文件传输的支持。Netty的所有 IO 操作都是异步非阻塞的,通过 Future-Listener 机制,用户可以主动或者通过通知机制获取 IO 操作结果
Netty 架构计划的主要特性如下:

  • IO 多路复用模子:Netty 通过在 NioEventLoop(事件轮询机制)内封装 Selector 来实现 IO 的多路复用
  • 数据零拷贝:Netty 的数据接收和发送均采用直接内存进行 Socket 读写,大大提高了系统的性能
  • 内存重用机制:直接内存的分配和接纳是一种耗时的操作,为了只管重用缓冲区,Netty 提供了基于内存池的缓冲区重用机制
  • 无锁化机制:Netty 内部采用串行无锁化计划思想对 IO 进行操作。在具体使用过程中可以调整 NIO 线程池的线程参数,同时启动多个串行化的线程并行运行,这种局部无锁化的串行多线程计划比一个队列联合多个工作线程模子的性能更佳
  • 高性能序列化框架:Netty 默认基于 ProtoBuf 实现数据的序列化,通过扩展 Netty 的编解码接口,用户可以实现自定义的序列化框架

Netty 核心组件


  • Bootstrap/ServerBootstrap:Bootstrap 用于客户端服务的启动引导,ServerBootstrap 用于服务端服务的启动引导
  • NioEventLoop:基于线程队列的方式执行事件操作,具体要执行的事件操作包括毗连注册、端口绑定和 IO 数据读写等。每个 NioEventLoop 线程都负责多个 Channel 的事件处理
  • NioEventLoopGroup:NioEventLoop 生命周期的管理
  • Future/ChannelFuture:Future 和 ChannelFuture 用于异步通信的实现,基于异步通信方式可以在 IO 操作触发后注册一个监听事件,在 IO 操作完成后主动触发监听事件并完成后续操作
  • Channel:Channel 是 Netty 中的网络通信组件,用于执行具体的 IO 操作。Nettty 中所有的数据通信都基于 Channel 读取或者将数据写入对应的 Channel。Channel 的主要功能包括网络毗连的创建、毗连状态的管理(网络毗连的打开和关闭)、网络毗连参数的设置(每次接收数据的巨细)、基于异步 NIO 的网络数据操作(数据读取、数据写出)等
  • Selector:Selector 用于多路复用中 Channel 的管理。在 Netty中,一个 Selector 可以管理多个 Channel,在 Channel 毗连创建后将毗连注册到 Selector,Selector 在内部监听每个 Channel 上 IO 事件的变革,当 Channel 有网络 IO 事件发生时通知 ChannelHandler 执行具体的 IO 操作
  • ChannelHandlerContext:Channel 上下文信息的管理。每个 ChannelHandler 都对应一个 ChannelHandlerContext
  • ChannelHandler:IO 事件的拦截和处理。其中,ChannelInboundHandler 用于处理数据接收的 IO 操作,ChannelOutboundHandler 用于处理数据发送的 IO 操作
  • ChannelPipeline:基于拦截器计划模式实现的事件拦截处理和转发。Netty 中的每 Channel 都对应一个 ChannelPipeline,在 ChannelPipeline 中维护了一个由 ChannelHandlerContext 组成的双向链表,每个 ChannelHandlerContext 都对应一个 ChannelHandler,以完成对具体 Channel 事件的拦截和处理。其中,数据入站由 Head 向 Tail 依次通报和处理,数据出站由 Tail 向 Head 依次通报和处理

Netty 原理

1. Netty Server 的初始化过程


  • 初始化 BossGroup 和 WorkerGroup
  • 基于 ServerBootstrap 设置 EventLoopGroup,包括毗连参数设置、Channel 类型设置、编解码 Handler 设置等
  • 绑定端口和服务启动
  1. public static void main(String[] args) {
  2.   // 1:创建 BossGroup 和 WorkerGroup
  3.   NioEventLoopGroup bossGroup = new NioEventLoopGroup();
  4.   NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  5.   final ServerBootstrap serverBootstrap = new ServerBootstrap();
  6.   // 2:配置NioEventLoopGroup
  7.   serverBootstrap
  8.     .group(bossGroup, workerGroup)
  9.     .channel(NioServerSocketchannel.class) // 设置 channel 的类型为 NIO
  10.     .option(ChannelOption.SO_BACKLOG, 1024)  // 设置 BACKLOG 的大小为 1024
  11.     .childOption(ChannelOption.SO_KEEPALIVE,true)  // 启用心跳检测机制
  12.     .childOption(ChannelOption.TCP_NODELAY,true)  // 设置数据包无延迟
  13.     // 设置 Channel 的类型为 NioSocketChannel
  14.     .childHandler(new ChannelInitializer<NioSocketChannel>() {
  15.       @Override
  16.       protected void initChannel(NioSocketChannel ch) {
  17.         // 配置解码器为 MessageDecoder 类
  18.         ch.pipeline().addLast("decoder", new MessageDecoder());
  19.         // 配置编码器为 MessageEncoder 类
  20.         ch.pipeline().addlast("encoder", new MessageEncoder());
  21.       }
  22.     });
  23.   // 3:绑定端口和服务启动
  24.   int port = 9000;
  25.   serverBootstrap.bind(port).addlistener(future -> {
  26.     if(future.isSuccess()) {
  27.       System.out.printin("server start up on port:" + port);
  28.     } else {
  29.       System.err.printin("server start up failed");
  30.     }
  31.   });
  32. }
复制代码
2. Netty 工作流程



  • Netty 抽象出两组线程池 BossGroup 和 WorkerGroup。BossGroup 专门负责接收客户端的毗连,WorkerGroup 专门负责网络的读写
  • BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
  • NioEventLoopGroup 相称于一个事件循环线程组,这个组中含有多个事件循环线程,每一个事件循环线程是 NioEventLoop
  • 每个 NioEventLoop 都有一个 selector,用于监听注册在其上的 socketChannel 的网络通讯
  • 每个 BossNioEventLoop 线程内部循环执行的步骤:


  • 处理 accept事件,与 client 创建毗连 , 生成 NioSocketChannel
  • 将 NioSocketChannel 注册到某个 worker NIOEventLoop 上的 selector
  • 处理任务队列的任务,即 runAllTasks

  • 每个 worker NIOEventLoop 线程循环执行的步骤


  • 轮询注册到自己 selector 上的所有 NioSocketChannel 的 read/write 事件
  • 处理 I/O 事件,即 read/write 事件,在对应 NioSocketChannel 处理业务
  • runAllTasks 处理任务队列 TaskQueue 的任务 ,一些耗时的业务处理可以放入 TaskQueue 慢慢处理,这样不影响数据在 pipeline 的流动处理

  • 每个 worker NIOEventLoop 处理 NioSocketChannel 业务时,会使用 pipeline,管道中维护了很多 handler 处理器用来处理 channel 中的数据

Netty 实战

Netty 的使用分为客户端和服务端两部分。客户端用于毗连服务端上报数据,并接服务端下发的哀求指令等。服务端主要用于接收客户端的数据,并根据协议的规定对客户端的消息进行相应
定义通用消息格式 BaseMessage
  1. public class BaseMessage {
  2.   //消息创建的时间
  3.   private Date createTime;
  4.   //消息接收的时间
  5.   private Date receiveTime;
  6.   //消息内容
  7.   private String messageContent;
  8.   //消息id
  9.   private int messageId;
  10.   //省略get、set、构造方法
  11. }
复制代码
定义消息处理工具类 MessageUtils
  1. public class MessageUtils {
  2.   //将 BaseMessage 消息写入 ByteBuf
  3.   public static ByteBuf getByteBuf(BaseMessage baseMessage) throws UnsupportedEncodingException {
  4.     byte[] req = JSON.toJSONString(baseMessage).getBytes("UTF-8");
  5.     ByteBuf byteBuf = Unpooled.buffer();
  6.     byteBuf.writeBytes(reg);
  7.     return byteBuf;
  8.   }
  9.   //从ByteBuf中获取信息,使用UTF-8编码后解析为BaseMessage的系统消息格式
  10.   public static BaseMessage getBaseMessage(ByteBuf buf) {
  11.     byte[] con = new byte[buf.readableBytes()];
  12.     buf.readBytes(con);
  13.     try {
  14.       String message = new String(con, "UTF8");
  15.       BaseMessage baseMessage = JSON.parseObject(message, BaseMessage.class);
  16.       baseMessage.setReceiveTime(new Date());
  17.       return baseMessage;
  18.     } catch(UnsupportedEncodingException e) {
  19.       e.printStackTrace();
  20.       return null;
  21.     }
  22.   }
  23. }
复制代码
定义 NettyServer
  1. public class NettyServer {
  2.   private final static Log logger = LogFactory.getLog(NettyServer.class);
  3.   private int port;
  4.   public NettyServer(int port) {
  5.     this.port = port;
  6.     bind ();
  7.   }
  8.   private void bind() {
  9.     //1:创建BossGroup和WorkerGroup
  10.     EventLoopGroup boss = new NioEventLoopGroup();
  11.     EventLoopGroup worker = new NioEventLoopGroup();
  12.     try {
  13.       //2:创建ServerBootstrap
  14.       ServerBootstrap bootstrap = new ServerBootstrap();
  15.       bootstrap.group(boss, worker);
  16.       //3:设置Channel和 Option
  17.       bootstrap.channel(NioServerSocketChannel.class);
  18.       bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
  19.       bootstrap.option(ChannelOption.TCP_NODELAY, true);
  20.       bootstrap.childoption(ChannelOption.SO_KEEPALIVE, true);
  21.       bootstrap.childHandier(new channelInitializer<SocketChannel>() {
  22.         @Override
  23.         protected void initChannel(SocketChannel socketChannel) throws Exception {
  24.           ChannelPipeline p = socketChannel.pipeline();
  25.           //定义MessageDecoder,用于解码Server接收的消息并处理
  26.           p.addLast("decoder", new MessageDecoder());
  27.         }
  28.       });
  29.       //4:设置绑定端口号并启动
  30.       ChannelFuture channelFuture = bootstrap.bind(port).sync();
  31.       if (channelFuture.isSuccess()) {
  32.         logger.info("NettyServer start success, port: " + this.port);
  33.       }
  34.       //5:设置异步关闭连接
  35.       channelFuture.channel().closeFuture().sync();
  36.     } catch(Exception e) {
  37.       logger.error("NettyServer start fail, exception:" + e.getMessage());
  38.       e.printStackTrace():
  39.     } finally {
  40.       //6:优雅退出函数设置
  41.       boss.shutdownGracefully();
  42.       worker.shutdownGracefully();
  43.     }
  44.   }
  45.   public static void main(String[] args) throws InterruptedException {
  46.     new NettyServer(9000);
  47.   }
  48. }
复制代码
定义 MessageDecoder 解码器
  1. public class MessageDecoder extends ChannelHandlerAdapter {
  2.   private final static Log logger = LogFactory.getLog(MessageDecoder.class);
  3.   // 覆写channelRead方法并接收客户端发送的消息
  4.   @Override
  5.   public void channelRead(ChannelHandlerContext ctx, Object msg) {
  6.     //1:接收到客户端发送的消息并解码
  7.     ByteBuf buf = (ByteBuf) msg;
  8.     BaseMessage message = MessageUtils.getBaseMessage(buf);
  9.     try {
  10.       //2:定义回复消息体
  11.       BaseMessage responseMessage = new BaseMessage(message.getMessageID() + 1, "response from server", new Date());
  12.       logger.info("send response message for client:" + JSON.toJSONString(responseMessage));
  13.       //3:消息编码
  14.       ByteBuf byteBuf = MessageUtils.getByteBuf(responseMessage);
  15.       //4:消息发送,将消息通过ChannelHandlerContext写入Channel
  16.       ctx.writeAndFlush(byteBuf);
  17.     } catch(UnsupportedEncodingException e) {
  18.       e.printStackTrace();
  19.     }
  20.   }
  21.   @Override//连接断开触发事件
  22.   public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  23.     logger.error("channel removed");
  24.     super.handlerRemoved(ctx);
  25.   }
  26.   @Override//连接异常触发事件
  27.   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  28.     logger.error("channel exception");
  29.     super.exceptionCaught(ctx, cause);
  30.   }
  31.   @Override//连接注册触发事件
  32.   public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  33.     logger.error("channel registered");
  34.     super.channelRegistered(ctx);
  35.   }
  36. }
复制代码
定义 NettyClient
  1. public class NettyClient {
  2.   private final static Log logger = LogFactory.getlog(NettyClient.class);
  3.   //服务端的端口号
  4.   private int port = 9000;//服务端的 IP地址
  5.   private String host = "localhost";
  6.   public NettyClient(String host, int port) throws InterruptedException {
  7.     this.port = port;
  8.     this.host = host;
  9.     start();
  10.   }
  11.   private void start() throws InterruptedException {
  12.     EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
  13.     try {
  14.       Bootstrap bootstrap = new Bootstrap();
  15.       bootstrap.channel(NioSocketChannel.class);
  16.       bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
  17.       bootstrap.group(eventLoopGroup);
  18.       bootstrap.removeAddress(host, port);
  19.       bootstrap.handler(new ChannelInitializer<SocketChannel>(){
  20.         @Override
  21.         protected void initChannel(SocketChannel socketChannel) throws Exception {
  22.           socketChannel.pipeline().addLast(new NettyClientHandler());
  23.         }
  24.       });
  25.     }
  26.   }
  27. }
复制代码
定义 NettyClientHandler 消息处理器
  1. public class NettyClientHandler extends ChannelHandlerAdapter {
  2.   private final static Log logger = LogFactory.getLog(NettyClientHandler.class);
  3.   @Override//连接创建后,Netty会自动调用channelActive方法
  4.   public void channelActive(ChannelHandlerContext ctx) throws Exception {
  5.     //创建一条消息,发送给服务端
  6.     BaseMessage message = new BaseMessage(0, "message from client", new Date());
  7.     ByteBuf byteBuf = MessageUtils.getByteBuf(message);
  8.     ctx.writeAndFlush(byteBuf);
  9.     logger.info("send a message for server:" + JSON.toJSONString(message));
  10.   }
  11.   @Override//读取服务端的消息
  12.   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  13.     ByteBuf buf = (ByteBuf) msg;
  14.     BaseMessage message = MessageUtils.getBaseMessage(buf);
  15.     logger.info("received message form server:" + JSON,toJSONString(message));
  16.   }
  17. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦应逍遥

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表