netty构建udp服务器以及发送报文到客户端客户端详细案例 ...

打印 上一主题 下一主题

主题 547|帖子 547|积分 1641

目录

一、基于netty创建udp服务端以及对应通道设置关键
二、发送数据
三、netty中的ChannelOption常用参数说明
1、ChannelOption.SO_BACKLOG
2、ChannelOption.SO_REUSEADDR
3、ChannelOption.SO_KEEPALIVE
4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
5、ChannelOption.SO_LINGER
6、ChannelOption.TCP_NODELAY


一、基于netty创建udp服务端以及对应通道设置关键

  1. @Configuration
  2. @RefreshScope
  3. public class NettyUdpServer {
  4.     @Value("${netty.server.udpPort}")
  5.     private int port;
  6.     private EventLoopGroup bossGroup;//主线程
  7.     private Channel channel;//通道
  8.     private ChannelFuture future; //回调
  9.     @Autowired
  10.     private DataCollector dataCollector;;
  11.     public Channel start() throws InterruptedException {
  12.         //判断是否支持Epoll模式,从而创建不同的线程组
  13.         bossGroup = Epoll.isAvailable()? new EpollEventLoopGroup() : new NioEventLoopGroup();
  14.         try {
  15.             Bootstrap b = new Bootstrap();
  16.             //linux平台下增加SO_REUSEPORT特性提高性能,支持多个进程或者线程绑定到同一个端口,提高服务器程序的吞吐性能
  17.             if(Epoll.isAvailable()) {
  18.                 //设置反应器线程组
  19.                 b.group(bossGroup)
  20.                  .handler(new EpollUdpServerInitializer(dataCollector))
  21.                 //设置nio类型的通道
  22.                  .channel(EpollDatagramChannel.class)
  23.                  .option(ChannelOption.SO_BROADCAST, true)
  24.                  .option(ChannelOption.SO_REUSEADDR, true)
  25.                  .option(ChannelOption.SO_RCVBUF, 1024 * 1024)
  26.                  .option(EpollChannelOption.SO_REUSEPORT, true);
  27.             }else{
  28.                 //设置反应器线程组
  29.                 b.group(bossGroup)
  30.                  .handler(new UdpServerInitializer(dataCollector))
  31.                  //设置nio类型的通道
  32.                  .channel(NioDatagramChannel.class)
  33.                  //设置通道的参数
  34.                  .option(ChannelOption.SO_BROADCAST, true)
  35.                  .option(ChannelOption.SO_REUSEADDR, true);
  36.             }
  37.             //Channel channel = server.bind(port).sync().channel();
  38.             //开始绑定服务器,通过调用sync()同步方法阻塞直到绑定成功
  39.             //ChannelFuture channelFuture = b.bind(port).sync();
  40.             //等待通道关闭的异步任务结束
  41.             //ChannelFuture closeFuture = channelFuture.channel().closeFuture();
  42.             //closeFuture.sync();
  43.             ChannelFuture f = b.bind(port).sync();
  44.             channel = f.channel();
  45.             if(f.isSuccess()){
  46.                 //MasterSelector registry = new MasterSelector("","netty-services",  port);
  47.                 System.out.println("UDP服务器启动,监听在端口:" + port);
  48.             }else {
  49.                 channel.closeFuture().sync();
  50.             }
  51.         } finally {
  52.             //bossGroup.shutdownGracefully().sync();
  53.         }
  54.         System.out.println("Udp服务器启动,监听在端口:"+port);
  55.         return channel;
  56.     }
  57. }
复制代码
以上代码中Epoll.isAvailable()用户判断是window照旧linux环境,linux环境默认采用Epoll相干通道,所以显式设置EpollDatagramChannel通道。在处理(handler)的设置中要根据差别的通道设置初始化的通道类型:
linux环境下EpollDatagramChannel通道设置 .handler(new EpollUdpServerInitializer(dataCollector))详细代码
  1. public class EpollUdpServerInitializer extends ChannelInitializer<EpollDatagramChannel> {
  2.     private final DataCollector dataCollector;
  3.     public EpollUdpServerInitializer(DataCollector dataCollector) {
  4.         this.dataCollector = dataCollector;
  5.     }
  6.     @Override
  7.     protected void initChannel(EpollDatagramChannel epollDatagramChannel) throws Exception {
  8.         epollDatagramChannel.pipeline()
  9.                 //添加netty空闲超时检查的支持
  10.                 .addLast(new UdpServerHandler(dataCollector));
  11.     }
复制代码
要使 通过服务器端通过EpollDatagramChannel通道发送数据,客户端能够正常接收到数据,下图中标红的泛型通道类要与服务器端设置的通道类一致

同意要支持Nio类型通道为NioDatagramChannel.class时,通道初始化为:
  1. public class UdpServerInitializer extends ChannelInitializer<NioDatagramChannel> {
  2.     private final DataCollector dataCollector;
  3.     public UdpServerInitializer(DataCollector dataCollector) {
  4.         this.dataCollector = dataCollector;
  5.     }
  6.     @Override
  7.     protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
  8.         nioDatagramChannel.pipeline()
  9.                 //添加netty空闲超时检查的支持
  10.                 .addLast(new UdpServerHandler(dataCollector));
  11.     }
  12. }
复制代码
 要使 通过服务器端通过NioDatagramChannel通道发送数据,客户端能够正常接收到数据,下图中标红的泛型通道类要与服务器端设置的通道类一致

二、发送数据

关键代码,采用writeAndFlush发送数据,留意:要发送udp数据报,
  1. public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
  2.     /**设置最大消息大小*/
  3.     private static final int MAX_MESSAGE_SIZE = 2048;
  4.     /**线程池*/
  5.     private ExecutorService executorService;
  6.     private final DataCollector dataCollector;
  7.     public UdpServerHandler(DataCollector dataCollector) {
  8.         this.dataCollector = dataCollector;
  9.         //根据当前系统可用的处理器数量创建一个固定长度的线程池
  10.         executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  11.     }
  12.     @Override
  13.     protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagramPacket) throws Exception {
  14.         ByteBuf buffer = datagramPacket.content();
  15.         //确保不会超出最大消息大小
  16.         if(buffer.readableBytes() > MAX_MESSAGE_SIZE) {
  17.             buffer.release();
  18.             return;
  19.         }
  20.         UdpDatagram udpDatagram = parseUdpDatagram(buffer);
  21.         UdpDatagram respUdpDatagram = dataCollector.processUdpDatagram(udpDatagram);
  22.         if (null != respUdpDatagram) {
  23.              handleReceivedData(ctx, respUdpDatagram, datagramPacket);
  24.         }
  25.     }
  26.     /**
  27.      * 处理接收到的数据
  28.      * @param ctx
  29.      * @param udpDatagram
  30.      */
  31.     public void handleReceivedData(ChannelHandlerContext ctx, UdpDatagram udpDatagram, DatagramPacket datagramPacket) throws ExecutionException, InterruptedException {
  32.         Channel channel = ctx.channel();
  33.         if (log.isInfoEnabled()) {
  34.             log.info("received udp message: sessionId: {}, opCode: {}, short messageId: {}",
  35.                     ctx.channel().id(), udpDatagram.getMessageTypeEnum(), udpDatagram.getShortMessageId());
  36.         }
  37.         byte[] payloadBytes = udpDatagram.getPayloadBytes();
  38.         ByteBuf copiedBuffer = Unpooled.copiedBuffer(payloadBytes);
  39.         ChannelFuture channelFuture = channel.writeAndFlush(new DatagramPacket(copiedBuffer.retain(), datagramPacket.sender()));
  40.         channelFuture.addListener(new ChannelFutureListener() {
  41.             @Override
  42.             public void operationComplete(ChannelFuture channelFuture) throws Exception {
  43.                 if (channelFuture.isSuccess()) {
  44.                     // 数据发送成功
  45.                     log.info("数据发送成功:sender host: {}, sender port:{}, sender address:{}",datagramPacket.sender().getHostName(),datagramPacket.sender().getPort(), datagramPacket.sender().getAddress());
  46.                 } else {
  47.                     // 数据发送失败
  48.                     log.error("数据发送失败: {}",channelFuture.cause().getStackTrace());
  49.                     channelFuture.cause().printStackTrace();
  50.                 }
  51.             }
  52.         });
  53.     }
  54.     @Override
  55.     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  56.         dataCollector.tcpConnect(ctx.channel());
  57.     }
  58.     @Override
  59.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  60.         if (log.isWarnEnabled()) {
  61.             log.warn("udp session throw an exception, sessionId:{} exception message: {}",
  62.                     ctx.channel().id().asLongText(), cause.getMessage());
  63.         }
  64.     }
  65.     //当客户端关闭链接时关闭通道
  66.     @Override
  67.     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  68.         dataCollector.tcpChannelDisconnect(ctx.channel());
  69.     }
  70. }
复制代码
处理类继续SimpleChannelInboundHandler类泛型类为DatagramPacket 
 writeAndFlush方法中发送的数据类型要是DatagramPacket

三、netty中的ChannelOption常用参数说明

1、ChannelOption.SO_BACKLOG

   ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数。函数listen(int socketfd, int backlog)用来初始化服务端可连接队列。
  服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等候处理,backlog参数指定了队列的大小。
  2、ChannelOption.SO_REUSEADDR

   ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表现允许重复利用本地地址和端口。
  好比,某个服务器进程占用了TCP的80端口举行监听,此时再次监听该端口就会返回错误,利用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比力常利用。
  好比某个进程非正常退出,该程序占用的端口可能要被占用一段时间才气允许其他进程利用,而且程序死掉以后,内核一需要一定的时间才气够开释此端口,不设置SO_REUSEADDR就无法正常利用该端口。
  3、ChannelOption.SO_KEEPALIVE

   Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。
  当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个运动探测数据报文。
  4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

   ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作发送缓冲区大小和接受缓冲区大小。
  接收缓冲区用于生存网络协议站内收到的数据,直到应用程序读取乐成,发送缓冲区用于生存发送数据,直到发送乐成。
  5、ChannelOption.SO_LINGER

   ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的环境下,只管发送数据,不一定包管会发送剩余的数据,造成了数据的不确定性,利用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送。
  6、ChannelOption.TCP_NODELAY

   ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的利用与Nagle算法有关。
  Nagle算法是将小的数据包组装为更大的帧然后举行发送,而不是输入一次发送一次,因此在数据包不敷的时候会等候其他数据的到来,组装成大的数据包举行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
  而该参数的作用就是禁止利用Nagle算法,利用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,实用于文件传输。
  ChannelOption属性  SO_BROADCAST对应套接字层的套接字:SO_BROADCAST,将消息发送到广播地址。
如果目的中指定的接口支持广播数据包,则启用此选项可让应用程序发送广播消息。SO_KEEPALIVE对应套接字层的套接字:SO_KEEPALIVE,保持连接。
在空闲套接字上发送探测,以验证套接字是否仍处于运动状态。SO_SNDBUF对应套接字层的套接字:SO_SNDBUF,设置发送缓冲区的大小。SO_RCVBUF对应套接字层的套接字:SO_RCVBUF,获取接收缓冲区的大小。SO_REUSEADDR对应套接字层的套接字:SO_REUSEADDR,本地地址复用。
启用此选项允许绑定已利用的本地地址。SO_LINGER对应套接字层的套接字:SO_LINGER,耽误关闭连接。
启用此选项,在调用close时如果存在未发送的数据时,在close期间将阻止调用应用程序,直到数据被传输或连接超时。SO_BACKLOG 对应TCP/IP协议中<font color=red>backlog</font>参数,<font color=red>backlog</font>即连接队列,设置TCP中的连接队列大小。如果队列满了,会发送一个ECONNREFUSED错误信息给C端,即“ Connection refused”。
SO_TIMEOUT等候客户连接的超时时间。IP_TOS对应套接字层的套接字:IP_TOS,在IP标头中设置服务类型(TOS)和优先级。IP_MULTICAST_ADDR对应IP层的套接字选项:IP_MULTICAST_IF,设置应发送多播数据报的传出接口。IP_MULTICAST_IF对应IP层的套接字选项:IP_MULTICAST_IF2,设置应发送多播数据报的IPV6传出接口。IP_MULTICAST_TTL对应IP层的套接字选项:IP_MULTICAST_TTL,在传出的 多播数据报的IP头中设置生存时间(TTL)。IP_MULTICAST_LOOP_DISABLED取消 指定应将 传出的多播数据报的副本 回传到发送主机,只要它是多播组的成员即可。TCP_NODELAY 对应TCP层的套接字选项:TCP_NODELAY,指定TCP是否遵循<font color=#35b998>Nagle算法</font> 决定何时发送数据。Nagle算法代表通过淘汰必须发送包的个数来增加网络软件体系的服从。即尽可能发送大块数据避免网络中充斥着大量的小数据块。如果要追求高及时性,需要设置关闭Nagle算法;如果需要追求淘汰网络交互次数,则设置开启Nagle算法。

  

  

























 ChannelOption通用配置
参数说明
ALLOCATORByteBuf的分配器,默认值为ByteBufAllocator.DEFAULT。
RCVBUF_ALLOCATOR 用于Channel分配接受Buffer的分配器,默认值为AdaptiveRecvByteBufAllocator.DEFAULT,是一个自适应的接受缓冲区分配器,能根据接受到的数据自动调治大小。可选值为FixedRecvByteBufAllocator,固定大小的接受缓冲区分配器。
MESSAGE_SIZE_ESTIMATOR 消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,此中ByteBuf和ByteBufHolder为实际大小,FileRegion估算值为0。该值估算的字节数在计算水位时利用,FileRegion为0可知FileRegion不影响高低水位。
CONNECT_TIMEOUT_MILLIS连接超时毫秒数,默认值30000毫秒即30秒。
WRITE_SPIN_COUNT一个Loop写操作执行的最大次数,默认值为16。也就是说,对于大数据量的写操作至多举行16次,如果16次仍没有全部写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行。这样,其他的写请求才气被响应不会由于单个大数据量写请求而耽误。
WRITE_BUFFER_WATER_MARK
ALLOW_HALF_CLOSURE一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。
AUTO_READ自动读取,默认值为True。Netty只在须要的时候才设置关心相应的I/O事件。对于读操作,需要调用channel.read()设置关心的I/O事件为OP_READ,这样如有数据到达才气读取以供用户处理。该值为True时,每次读操作完毕后会自动调用channel.read(),从而有数据到达便能读取;否则,需要用户手动调用channel.read()。需要留意的是:当调用config.setAutoRead(boolean)方法时,如果状态由false变为true,将会调用channel.read()方法读取数据;由true变为false,将调用config.autoReadCleared()方法终止数据读取。






免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表