java netty 实现 websocket 服务端和客户端双向通信 实现心跳和断线重连 完整示例
maven依赖
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.97.Final</version>
- </dependency>
复制代码 服务端
一个接口 IGetHandshakeFuture
- package com.sux.demo.websocket2;
- import io.netty.channel.ChannelPromise;
- public interface IGetHandshakeFuture {
- ChannelPromise getHandshakeFuture();
- }
复制代码 服务端心跳 ServerHeartbeatHandler
- package com.sux.demo.websocket2;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) { // 读空闲
- System.out.println("关闭客户端连接, channel id=" + ctx.channel().id());
- ctx.channel().close();
- } else if (event.state() == IdleState.WRITER_IDLE) { // 写空闲
- System.out.println("服务端向客户端发送心跳");
- ctx.writeAndFlush(new PongWebSocketFrame());
- } else if (event.state() == IdleState.ALL_IDLE) { // 读写空闲
- }
- }
- }
- }
复制代码 服务端封装 WebSocketServer
- package com.sux.demo.websocket2;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.HttpServerCodec;
- import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
- import io.netty.handler.timeout.IdleStateHandler;
- import java.util.concurrent.TimeUnit;
- public class WebSocketServer {
- private EventLoopGroup bossGroup;
- private EventLoopGroup workerGroup;
- public WebSocketServer() {
- //创建两个线程组 boosGroup、workerGroup
- bossGroup = new NioEventLoopGroup();
- workerGroup = new NioEventLoopGroup();
- }
- public void start(int port, WebSocketServerHandler handler, String name) {
- try {
- //创建服务端的启动对象,设置参数
- ServerBootstrap bootstrap = new ServerBootstrap();
- //设置两个线程组boosGroup和workerGroup
- bootstrap.group(bossGroup, workerGroup)
- //设置服务端通道实现类型
- .channel(NioServerSocketChannel.class)
- //设置线程队列得到连接个数
- .option(ChannelOption.SO_BACKLOG, 128)
- //设置保持活动连接状态
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- //使用匿名内部类的形式初始化通道对象
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- //给pipeline管道设置处理器
- socketChannel.pipeline().addLast(new HttpServerCodec());
- socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
- socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket", null, false, 65536, false, false, false, 10000));
- socketChannel.pipeline().addLast(new IdleStateHandler(5, 2, 0, TimeUnit.SECONDS));
- socketChannel.pipeline().addLast(new ServerHeartbeatHandler());
- socketChannel.pipeline().addLast(handler);
- }
- });//给workerGroup的EventLoop对应的管道设置处理器
- //绑定端口号,启动服务端
- ChannelFuture channelFuture = bootstrap.bind(port).sync();
- System.out.println(name + " 已启动");
- //对通道关闭进行监听
- channelFuture.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- }
- }
- }
复制代码 服务端消息处理惩罚 WebSocketServerHandler
- package com.sux.demo.websocket2;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.*;
- import io.netty.handler.codec.http.websocketx.*;
- import io.netty.util.CharsetUtil;
- import java.util.ArrayList;
- import java.util.List;
- @ChannelHandler.Sharable
- public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
- private List<Channel> channelList;
- public WebSocketServerHandler() {
- channelList = new ArrayList<>();
- }
- public boolean hasClient() {
- return channelList.size() > 0;
- }
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
- if (msg instanceof PingWebSocketFrame) {
- System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发来的心跳:PING");
- }
- if (msg instanceof PongWebSocketFrame) {
- System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发来的心跳:PONG");
- }
- if (msg instanceof TextWebSocketFrame) {
- TextWebSocketFrame frame = (TextWebSocketFrame) msg;
- System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发来的消息:" + frame.text());
- /*for (Channel channel : channelList) {
- if (!ctx.channel().id().toString().equals(channel.id().toString())) {
- channel.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer(frame.text(), CharsetUtil.UTF_8)));
- System.out.println("服务端向客户端 " + channel.id().toString() + " 转发消息:" + frame.text());
- }
- }*/
- }
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- channelList.add(ctx.channel());
- System.out.println("客户端连接:" + ctx.channel().id().toString());
- }
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- channelList.remove(ctx.channel());
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.flush();
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.close();
- }
- public void send(String text) {
- for (Channel channel : channelList) {
- channel.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer(text, CharsetUtil.UTF_8)));
- }
- }
- }
复制代码 服务端测试主机 WebSocketClientHost
- package com.sux.demo.websocket2;
- public class WebSocketServerHost {
- public static void main(String[] args) {
- WebSocketServerHandler handler = new WebSocketServerHandler();
- WebSocketServer webSocketServer = new WebSocketServer();
- SendDataToClientThread thread = new SendDataToClientThread(handler);
- thread.start();
- webSocketServer.start(40005, handler, "WebSocket服务端");
- }
- }
- class SendDataToClientThread extends Thread {
- private WebSocketServerHandler handler;
- private int index = 1;
- public SendDataToClientThread(WebSocketServerHandler handler) {
- this.handler = handler;
- }
- @Override
- public void run() {
- try {
- while (index <= 5) {
- if (handler.hasClient()) {
- String msg = "服务端发送的测试消息, index = " + index;
- handler.send(msg);
- index++;
- }
- Thread.sleep(1000);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
复制代码 客户端消息处理惩罚 WebSocketClientHandler
- package com.sux.demo.websocket2;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) { // 读空闲
- System.out.println("断线重连");
- ctx.channel().close();
- } else if (event.state() == IdleState.WRITER_IDLE) { // 写空闲
- System.out.println("客户端向服务端发送心跳");
- ctx.writeAndFlush(new PongWebSocketFrame());
- // ctx.writeAndFlush(new PingWebSocketFrame());
- // ctx.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer("PING", CharsetUtil.UTF_8)));
- } else if (event.state() == IdleState.ALL_IDLE) { // 读写空闲
- }
- }
- }
- }
复制代码 客户端测试主机 WebSocketServerHost
[code]package com.sux.demo.websocket2;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import io.netty.util.CharsetUtil;public class WebSocketClientHost { public static void main(String[] args) { WebSocketClient webSocketClient = new WebSocketClient(); SendDataToServerThread thread = new SendDataToServerThread(webSocketClient); thread.start(); webSocketClient.connect("127.0.0.1", 40005, "WebSocket客户端"); }}class SendDataToServerThread extends Thread { private WebSocketClient webSocketClient; private int index = 1; public SendDataToServerThread(WebSocketClient webSocketClient) { this.webSocketClient = webSocketClient; } @Override public void run() { try { while (index |