java netty 实现 websocket 服务端和客户端双向通信 实现心跳和断线重连 完 ...

打印 上一主题 下一主题

主题 865|帖子 865|积分 2595

java netty 实现 websocket 服务端和客户端双向通信 实现心跳和断线重连 完整示例
maven依赖
  1. <dependency>
  2.     <groupId>io.netty</groupId>
  3.     <artifactId>netty-all</artifactId>
  4.     <version>4.1.97.Final</version>
  5. </dependency>
复制代码
服务端

一个接口 IGetHandshakeFuture
  1. package com.sux.demo.websocket2;
  2. import io.netty.channel.ChannelPromise;
  3. public interface IGetHandshakeFuture {
  4.     ChannelPromise getHandshakeFuture();
  5. }
复制代码
服务端心跳 ServerHeartbeatHandler
  1. package com.sux.demo.websocket2;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
  5. import io.netty.handler.timeout.IdleState;
  6. import io.netty.handler.timeout.IdleStateEvent;
  7. public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
  8.     @Override
  9.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  10.         if (evt instanceof IdleStateEvent) {
  11.             IdleStateEvent event = (IdleStateEvent) evt;
  12.             if (event.state() == IdleState.READER_IDLE) { // 读空闲
  13.                 System.out.println("关闭客户端连接, channel id=" + ctx.channel().id());
  14.                 ctx.channel().close();
  15.             } else if (event.state() == IdleState.WRITER_IDLE) { // 写空闲
  16.                 System.out.println("服务端向客户端发送心跳");
  17.                 ctx.writeAndFlush(new PongWebSocketFrame());
  18.             } else if (event.state() == IdleState.ALL_IDLE) { // 读写空闲
  19.             }
  20.         }
  21.     }
  22. }
复制代码
服务端封装 WebSocketServer
  1. package com.sux.demo.websocket2;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.handler.codec.http.HttpObjectAggregator;
  8. import io.netty.handler.codec.http.HttpServerCodec;
  9. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
  10. import io.netty.handler.timeout.IdleStateHandler;
  11. import java.util.concurrent.TimeUnit;
  12. public class WebSocketServer {
  13.     private EventLoopGroup bossGroup;
  14.     private EventLoopGroup workerGroup;
  15.     public WebSocketServer() {
  16.         //创建两个线程组 boosGroup、workerGroup
  17.         bossGroup = new NioEventLoopGroup();
  18.         workerGroup = new NioEventLoopGroup();
  19.     }
  20.     public void start(int port, WebSocketServerHandler handler, String name) {
  21.         try {
  22.             //创建服务端的启动对象,设置参数
  23.             ServerBootstrap bootstrap = new ServerBootstrap();
  24.             //设置两个线程组boosGroup和workerGroup
  25.             bootstrap.group(bossGroup, workerGroup)
  26.                     //设置服务端通道实现类型
  27.                     .channel(NioServerSocketChannel.class)
  28.                     //设置线程队列得到连接个数
  29.                     .option(ChannelOption.SO_BACKLOG, 128)
  30.                     //设置保持活动连接状态
  31.                     .childOption(ChannelOption.SO_KEEPALIVE, true)
  32.                     //使用匿名内部类的形式初始化通道对象
  33.                     .childHandler(new ChannelInitializer<SocketChannel>() {
  34.                         @Override
  35.                         protected void initChannel(SocketChannel socketChannel) throws Exception {
  36.                             //给pipeline管道设置处理器
  37.                             socketChannel.pipeline().addLast(new HttpServerCodec());
  38.                             socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
  39.                             socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket", null, false, 65536, false, false, false, 10000));
  40.                             socketChannel.pipeline().addLast(new IdleStateHandler(5, 2, 0, TimeUnit.SECONDS));
  41.                             socketChannel.pipeline().addLast(new ServerHeartbeatHandler());
  42.                             socketChannel.pipeline().addLast(handler);
  43.                         }
  44.                     });//给workerGroup的EventLoop对应的管道设置处理器
  45.             //绑定端口号,启动服务端
  46.             ChannelFuture channelFuture = bootstrap.bind(port).sync();
  47.             System.out.println(name + " 已启动");
  48.             //对通道关闭进行监听
  49.             channelFuture.channel().closeFuture().sync();
  50.         } catch (InterruptedException e) {
  51.             e.printStackTrace();
  52.         } finally {
  53.         }
  54.     }
  55. }
复制代码
服务端消息处理惩罚 WebSocketServerHandler
  1. package com.sux.demo.websocket2;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.*;
  4. import io.netty.handler.codec.http.websocketx.*;
  5. import io.netty.util.CharsetUtil;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. @ChannelHandler.Sharable
  9. public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
  10.     private List<Channel> channelList;
  11.     public WebSocketServerHandler() {
  12.         channelList = new ArrayList<>();
  13.     }
  14.     public boolean hasClient() {
  15.         return channelList.size() > 0;
  16.     }
  17.     @Override
  18.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
  19.         if (msg instanceof PingWebSocketFrame) {
  20.             System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发来的心跳:PING");
  21.         }
  22.         if (msg instanceof PongWebSocketFrame) {
  23.             System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发来的心跳:PONG");
  24.         }
  25.         if (msg instanceof TextWebSocketFrame) {
  26.             TextWebSocketFrame frame = (TextWebSocketFrame) msg;
  27.             System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发来的消息:" + frame.text());
  28.             /*for (Channel channel : channelList) {
  29.                 if (!ctx.channel().id().toString().equals(channel.id().toString())) {
  30.                     channel.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer(frame.text(), CharsetUtil.UTF_8)));
  31.                     System.out.println("服务端向客户端 " + channel.id().toString() + " 转发消息:" + frame.text());
  32.                 }
  33.             }*/
  34.         }
  35.     }
  36.     @Override
  37.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
  38.         channelList.add(ctx.channel());
  39.         System.out.println("客户端连接:" + ctx.channel().id().toString());
  40.     }
  41.     @Override
  42.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  43.         channelList.remove(ctx.channel());
  44.     }
  45.     @Override
  46.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  47.         ctx.flush();
  48.     }
  49.     @Override
  50.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  51.         ctx.close();
  52.     }
  53.     public void send(String text) {
  54.         for (Channel channel : channelList) {
  55.             channel.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer(text, CharsetUtil.UTF_8)));
  56.         }
  57.     }
  58. }
复制代码
服务端测试主机 WebSocketClientHost
  1. package com.sux.demo.websocket2;
  2. public class WebSocketServerHost {
  3.     public static void main(String[] args) {
  4.         WebSocketServerHandler handler = new WebSocketServerHandler();
  5.         WebSocketServer webSocketServer = new WebSocketServer();
  6.         SendDataToClientThread thread = new SendDataToClientThread(handler);
  7.         thread.start();
  8.         webSocketServer.start(40005, handler, "WebSocket服务端");
  9.     }
  10. }
  11. class SendDataToClientThread extends Thread {
  12.     private WebSocketServerHandler handler;
  13.     private int index = 1;
  14.     public SendDataToClientThread(WebSocketServerHandler handler) {
  15.         this.handler = handler;
  16.     }
  17.     @Override
  18.     public void run() {
  19.         try {
  20.             while (index <= 5) {
  21.                 if (handler.hasClient()) {
  22.                     String msg = "服务端发送的测试消息, index = " + index;
  23.                     handler.send(msg);
  24.                     index++;
  25.                 }
  26.                 Thread.sleep(1000);
  27.             }
  28.         } catch (Exception e) {
  29.             e.printStackTrace();
  30.         }
  31.     }
  32. }
复制代码
客户端消息处理惩罚 WebSocketClientHandler
  1. package com.sux.demo.websocket2;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
  5. import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
  6. import io.netty.handler.timeout.IdleState;
  7. import io.netty.handler.timeout.IdleStateEvent;
  8. public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
  9.     @Override
  10.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
  11.         if (evt instanceof IdleStateEvent) {
  12.             IdleStateEvent event = (IdleStateEvent) evt;
  13.             if (event.state() == IdleState.READER_IDLE) { // 读空闲
  14.                 System.out.println("断线重连");
  15.                 ctx.channel().close();
  16.             } else if (event.state() == IdleState.WRITER_IDLE) { // 写空闲
  17.                 System.out.println("客户端向服务端发送心跳");
  18.                 ctx.writeAndFlush(new PongWebSocketFrame());
  19.                 // ctx.writeAndFlush(new PingWebSocketFrame());
  20.                 // ctx.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer("PING", CharsetUtil.UTF_8)));
  21.             } else if (event.state() == IdleState.ALL_IDLE) { // 读写空闲
  22.             }
  23.         }
  24.     }
  25. }
复制代码
客户端测试主机 WebSocketServerHost

[code]package com.sux.demo.websocket2;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import io.netty.util.CharsetUtil;public class WebSocketClientHost {    public static void main(String[] args) {        WebSocketClient webSocketClient = new WebSocketClient();        SendDataToServerThread thread = new SendDataToServerThread(webSocketClient);        thread.start();        webSocketClient.connect("127.0.0.1", 40005, "WebSocket客户端");    }}class SendDataToServerThread extends Thread {    private WebSocketClient webSocketClient;    private int index = 1;    public SendDataToServerThread(WebSocketClient webSocketClient) {        this.webSocketClient = webSocketClient;    }    @Override    public void run() {        try {            while (index
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

半亩花草

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表