使用Netty框架完成客户端和服务端收发Protobuf消息

打印 上一主题 下一主题

主题 1033|帖子 1033|积分 3099

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
前言

本周继续学习尼恩编著的《Netty、Redis、ZooKeeper高并发实战》,一些资源也贴在这里,自己以后想看还可以找到,这个是在博客园的一个入口https://www.cnblogs.com/crazymakercircle/p/9904544.html。
这周主要学习了Netty客户端和服务端通信,书是由浅入深的在进行,从Socket NOI通信到 Reactor反应器模式,再到Netty框架,示例代码都在https://gitee.com/crazymaker/netty_redis_zookeeper_source_code.git 中可以看到,书结合源代码,自己在动手试验一下,感觉还是有些收获。 今天的示例代码就是实践出一个客户端和服务端传递protobuf的例子。
Netty 服务端

先来看一下服务端代码,
  1. @Slf4j
  2. public class ProtoBufServer {
  3.     private final int serverPort;
  4.     ServerBootstrap b = new ServerBootstrap();
  5.     public ProtoBufServer(int port) {
  6.         this.serverPort = port;
  7.     }
  8.     public void runServer() {
  9.         //创建reactor 线程组
  10.         EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
  11.         EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
  12.         try {
  13.             //1 设置reactor 线程组
  14.             b.group(bossLoopGroup, workerLoopGroup);
  15.             //2 设置nio类型的channel
  16.             b.channel(NioServerSocketChannel.class);
  17.             //3 设置监听端口
  18.             b.localAddress(serverPort);
  19.             //4 设置通道的参数
  20.             b.option(ChannelOption.SO_KEEPALIVE, true);
  21.             b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  22.             b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  23.             //5 装配子通道流水线
  24.             b.childHandler(new ChannelInitializer<SocketChannel>() {
  25.                 //有连接到达时会创建一个channel
  26.                 protected void initChannel(SocketChannel ch) throws Exception {
  27.                     // pipeline管理子通道channel中的Handler
  28.                     // 向子channel流水线添加3个handler处理器
  29.                     // protobufDecoder仅仅负责编码,并不支持读半包,所以在之前,一定要有读半包的处理器。
  30.                     // 有三种方式可以选择:
  31.                     // 使用netty提供ProtobufVarint32FrameDecoder
  32.                     // 继承netty提供的通用半包处理器 LengthFieldBasedFrameDecoder
  33.                     // 继承ByteToMessageDecoder类,自己处理半包
  34.                     // 半包的处理
  35.                     ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
  36.                     // 需要解码的目标类
  37.                     ch.pipeline().addLast(new ProtobufDecoder(MarketPriceProto.MarketPrice.getDefaultInstance()));
  38.                     ch.pipeline().addLast(new ProtobufBussinessHandler());
  39.                 }
  40.             });
  41.             // 6 开始绑定server
  42.             // 通过调用sync同步方法阻塞直到绑定成功
  43.             ChannelFuture channelFuture = b.bind().sync();
  44.             log.info(" 服务器启动成功,监听端口: " +
  45.                     channelFuture.channel().localAddress());
  46.             // 7 等待通道关闭的异步任务结束
  47.             // 服务监听通道会一直等待通道关闭的异步任务结束
  48.             ChannelFuture closeFuture = channelFuture.channel().closeFuture();
  49.             closeFuture.sync();
  50.         } catch (Exception e) {
  51.             e.printStackTrace();
  52.         } finally {
  53.             // 8 优雅关闭EventLoopGroup,
  54.             // 释放掉所有资源包括创建的线程
  55.             workerLoopGroup.shutdownGracefully();
  56.             bossLoopGroup.shutdownGracefully();
  57.         }
  58.     }
  59.     //服务器端业务处理器
  60.     static class ProtobufBussinessHandler extends ChannelInboundHandlerAdapter {
  61.         @Override
  62.         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  63.             MarketPriceProto.MarketPrice protoMsg = (MarketPriceProto.MarketPrice) msg;
  64.             //经过pipeline的各个decoder,到此Person类型已经可以断定
  65.             log.info("收到一个 MsgProtos.Msg 数据包 =》");
  66.             log.info("protoMsg.getId():=" + protoMsg.getId());
  67.             log.info("protoMsg.getClose():=" + protoMsg.getClose());
  68.         }
  69.     }
  70.     public static void main(String[] args) throws InterruptedException {
  71.         int port = SERVER_PORT;
  72.         new ProtoBufServer(port).runServer();
  73.     }
  74. }
复制代码
代码中有注释解释,我在这里加一下说明
代码中有两个EventLoopGroup bossLoopGroup和EventLoopGroup workerLoopGroup,使用两个是什么原因呢? 一个是负责处理连接监听事件, 一个负责处理数据IO事件和Handler业务处理,通俗点解释就是一个负责接客,一个负责服务客户。如果只有一个人就会忙不过来,让后面的人等很久。
b.childHandler这个就是我们具体的如何处理接收到的消息,他们都继承ChannelInboundHandlerAdapter,通过PipeLine把消息进行处理。我们从通道里面拿到的都是字节码,那么要转成我们需要的Protobuf类,就需要用到这些处理类
前两个处理类ProtobufVarint32FrameDecoder和ProtobufDecoder都是Netty提供的,一个是为了解决半包问题,半包问题是因为在TCP传输的时候对数据包进行了拆包或者分包,收到的时候如果直接处理,就会有问题,需要我们在应用层进行二次封装。 在这个示例中如果我们不使用ProtobufVarint32FrameDecoder,客户端也不用,那么就会出现有的可以解析出来,有的报错的情况: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
可以用搬家来打比分,我们把一个家从一个地方搬到另外一个地方,还要求布局一样,汽车运输的时候可能要分好几次,那么我们可以先记住位置,然后随便装车搬过去,过去后先暂存,再按记录的位置进行还原,这样来保证一模一样。
ProtobufBussinessHandler 是我们自定义的Handler继承了ChannelInboundHandlerAdapter,通过它我们拿到channel里面的数据,转换成我们的具体的Protobuf对象。这里只是简单的打印出来。 如果要把它继续传下去,需要调用 super.channelRead(ctx,msg)传递下去。
客户端
  1. @Slf4j
  2. public class ProtoBufScanClient {
  3.     private int serverPort;
  4.     private String serverIp;
  5.     Bootstrap b = new Bootstrap();
  6.     public ProtoBufScanClient(String ip, int port) {
  7.         this.serverPort = port;
  8.         this.serverIp = ip;
  9.     }
  10.     public void runClient() {
  11.         //创建reactor 线程组
  12.         EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
  13.         try {
  14.             //1 设置reactor 线程组
  15.             b.group(workerLoopGroup);
  16.             //2 设置nio类型的channel
  17.             b.channel(NioSocketChannel.class);
  18.             //3 设置监听端口
  19.             b.remoteAddress(serverIp, serverPort);
  20.             //4 设置通道的参数
  21.             b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  22.             //5 装配通道流水线
  23.             b.handler(new ChannelInitializer<SocketChannel>() {
  24.                 //初始化客户端channel
  25.                 protected void initChannel(SocketChannel ch) throws Exception {
  26.                     // 客户端channel流水线添加2个handler处理器
  27.                     ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
  28.                     ch.pipeline().addLast(new ProtobufEncoder());
  29.                 }
  30.             });
  31.             ChannelFuture f = b.connect();
  32.             f.addListener((ChannelFuture futureListener) ->
  33.             {
  34.                 if (futureListener.isSuccess()) {
  35.                     log.info("EchoClient客户端连接成功!");
  36.                 } else {
  37.                     log.info("EchoClient客户端连接失败!");
  38.                 }
  39.             });
  40.             // 阻塞,直到连接完成
  41.             f.sync();
  42.             Channel channel = f.channel();
  43.             Scanner scanner = new Scanner(System.in);
  44.             log.info("请输入发送内容:");
  45.             GenericFutureListener sendCallBack = new GenericFutureListener() {
  46.                 @Override
  47.                 public void operationComplete(Future future) throws Exception {
  48.                     if (future.isSuccess()) {
  49.                         log.info("发送成功!");
  50.                     } else {
  51.                         log.info("发送失败!");
  52.                     }
  53.                 }
  54.             };
  55.             while (scanner.hasNext()) {
  56.                 //获取输入的内容
  57.                 String next = scanner.next();
  58.                 String[] values = next.split(",");
  59.                 if(values.length != 5)
  60.                 {
  61.                     log.info("格式不正确!");
  62.                 }
  63.                 else {
  64.                     MarketPriceProto.MarketPrice msg = build(values);
  65.                     ChannelFuture writeAndFlushFuture = channel.writeAndFlush(msg);
  66.                     writeAndFlushFuture.addListener(sendCallBack);
  67.                 }
  68.                 log.info("请输入发送内容:");
  69.             }
  70.             channel.flush();
  71.             // 7 等待通道关闭的异步任务结束
  72.             // 服务监听通道会一直等待通道关闭的异步任务结束
  73.             ChannelFuture closeFuture = channel.closeFuture();
  74.             closeFuture.sync();
  75.         } catch (Exception e) {
  76.             e.printStackTrace();
  77.         } finally {
  78.             // 优雅关闭EventLoopGroup,
  79.             // 释放掉所有资源包括创建的线程
  80.             workerLoopGroup.shutdownGracefully();
  81.         }
  82.     }
  83.     //构建ProtoBuf对象
  84.     public MarketPriceProto.MarketPrice build(String[] values) {
  85.         MarketPriceProto.MarketPrice.Builder builder = MarketPriceProto.MarketPrice.newBuilder();
  86.         builder.setId(values[0]);
  87.         builder.setOpen(Double.valueOf(values[1]));
  88.         builder.setHigh(Double.valueOf(values[2]));
  89.         builder.setLow(Double.valueOf(values[3]));
  90.         builder.setClose(Double.valueOf(values[4]));
  91.         return builder.build();
  92.     }
  93.     public static void main(String[] args) throws InterruptedException {
  94.         int port = SERVER_PORT;
  95.         String ip = SOCKET_SERVER_IP;
  96.         new ProtoBufScanClient(ip, port).runClient();
  97.     }
  98. }
复制代码
Netty客户端代码和服务端很接近,这里它只用了一个线程组,客户端只有它一个使用,和服务端模式不一样,一个就够了。
客户端读取控制台输入数据,然后构造成MarketPriceProto.MarketPrice,它的Proto定义如下
  1. // [开始声明]
  2. syntax = "proto3";
  3. //定义protobuf的包名称空间
  4. package com.ken.netty.protocol;
  5. // [结束声明]
  6. // [开始 java 选项配置]
  7. option java_package = "com.ken.netty.protocol";
  8. option java_outer_classname = "MarketPriceProto";
  9. // [结束 java 选项配置]
  10. // [开始 消息定义]
  11. message MarketPrice {
  12.   string id = 1;
  13.   double open = 2;
  14.   double high = 3;
  15.   double low = 4;
  16.   double close = 5;
  17. }
复制代码
channel.writeAndFlush(msg),  这里我们直接把Protobuf对象传递进channel, ProtobufEncoder会对它进行编码。
总结

代码比较简单,这是在有书的帮助和源代码帮助下实现的,不通过书这个也不是这么容易理解的。 作者还实现了一个及时通信的例子https://gitee.com/crazymaker/SimpleCrayIM。  也需要花些时间来学习一下。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张春

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表