使用Netty搭建TCP服务器

打印 上一主题 下一主题

主题 1623|帖子 1623|积分 4869

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
1.添加pom文件,引入Netty依赖

  1.         <dependency>
  2.             <groupId>io.netty</groupId>
  3.             <artifactId>netty-all</artifactId>
  4.             <version>4.1.65.Final</version>
  5.         </dependency>
  6.         
复制代码
2.添加ChannelHandlerContextMap类,用来存储通道的标识和通道对象

在长连接中,我们可以给每个通道附带一些属性,也可以说是标识,当你拿到这个通道对象时,可以获取这些属性,不外这里的前提是,需要先拿到通道对象,所以,我们无法通过通道标识去拿到通道对象,这样非常的不友好,比方,我们需要获取某个用户的通道,然后向这个通道推送消息,这个时候我们就只能循环每个通道,然后判断当前循环的通道是不是对应用户的,加了这个类,可以让我们快速的找到对应用户的通道,这里使用ConcurrentHashMap类,ConcurrentHashMap类是线程安全的。
  1. import io.netty.channel.Channel;
  2. import java.util.Map;
  3. import java.util.Set;
  4. import java.util.concurrent.ConcurrentHashMap;
  5. public class ChannelHandlerContextMap {
  6.     private static ChannelHandlerContextMap instance;
  7.     private final Map<String, Channel> channelMap;
  8.     private ChannelHandlerContextMap() {
  9.         channelMap = new ConcurrentHashMap<>();
  10.     }
  11.     public static synchronized ChannelHandlerContextMap getInstance() {
  12.         if (instance == null) {
  13.             instance = new ChannelHandlerContextMap();
  14.         }
  15.         return instance;
  16.     }
  17.     public void put(String identifier, Channel ctx) {
  18.         channelMap.put(identifier, ctx);
  19.     }
  20.     public Channel get(String identifier) {
  21.         return channelMap.get(identifier);
  22.     }
  23.     public Channel remove(String identifier) {
  24.         return channelMap.remove(identifier);
  25.     }
  26.     public Boolean containsKey(String identifier) {
  27.         return channelMap.containsKey(identifier);
  28.     }
  29.     public Set<Map.Entry<String, Channel>> entrySet() {
  30.         return channelMap.entrySet();
  31.     }
  32.     public int size() {
  33.         return channelMap.size();
  34.     }
  35. }
复制代码
3.添加HeartbeatHandler类,用来处置惩罚通道在一定时间内,和服务器没有消息往来的处置惩罚方案

  1. import io.netty.channel.ChannelHandlerContext;
  2. import io.netty.handler.timeout.IdleState;
  3. import io.netty.handler.timeout.IdleStateEvent;
  4. import io.netty.handler.timeout.IdleStateHandler;
  5. public class HeartbeatHandler extends IdleStateHandler {
  6.     public HeartbeatHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
  7.         super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);
  8.     }
  9.     @Override
  10.     protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
  11.         String deviceNameAttr = ctx.channel().attr("deviceName").get();
  12.         if(!"Server".equals(deviceNameAttr)){
  13.             //LogUtil.LOGI(HeartbeatHandler.class.getName(),"客户端超时类型:"+evt.state()+";通道标识:"+deviceNameAttr);
  14.             if (evt.state() == IdleState.READER_IDLE) {
  15.                 // 一段时间内没有收到过消息
  16.                 LogUtil.LOGI(HeartbeatHandler.class.getName(),"客户端读取超时,关闭通道:"+deviceNameAttr);
  17.                 ctx.close();
  18.             }
  19. //            if (evt.state() == IdleState.WRITER_IDLE) {
  20. //                //一段时间内没有向外发送过消息
  21. //                LogUtil.LOGI(HeartbeatHandler.class.getName(),"客户端写入超时,关闭通道:"+deviceNameAttr);
  22. //                //ctx.close();
  23. //            }
  24. //            if (evt.state() == IdleState.ALL_IDLE) {
  25. //                //一段时间内既没收到过消息也没发送过消息
  26. //                LogUtil.LOGI(HeartbeatHandler.class.getName(),"客户端超时,关闭通道:"+deviceNameAttr);
  27. //                //ctx.close();
  28. //            }
  29.         }
  30.     }
  31. }
复制代码
当一定时间内,通道和服务器没有消息往来时,会进入channelIdle方法,这里的一定时间,后面的代码中会设置;在channelIdle方法中,我们可以通过ctx.channel()来获取触发条件的通道,通过evt.state()来获取触发的范例,触发范例一共有三种:
IdleState.READER_IDLE:服务端一段时间内,没有收到客户端的消息,触发
IdleState.WRITER_IDLE:服务端一段时间内,没有向客户端发送消息,触发
IdleState.ALL_IDLE:服务端一段时间内,既没有收到客户端的消息,也没有向客户端发送消息,触发
可以在这里做本身的逻辑,我这边是直接关闭通道,防止通道占用过多
3.添加NettyHandler类,处置惩罚服务器收到的消息

  1. import com.alibaba.fastjson.JSONArray;
  2. import com.alibaba.fastjson.JSONObject;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.channel.group.ChannelGroup;
  7. import io.netty.channel.group.DefaultChannelGroup;
  8. import io.netty.util.Attribute;
  9. import io.netty.util.concurrent.GlobalEventExecutor;
  10. import org.springframework.stereotype.Component;
  11. import java.util.Map;
  12. import java.util.Set;
  13. @Component
  14. public class NettyHandler extends ChannelInboundHandlerAdapter {
  15.     public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  16.     @Override
  17.     public void channelActive(ChannelHandlerContext ctx) {
  18.         LogUtil.LOGI(NettyHandler.class.getName(),"客户端["+ctx.channel().remoteAddress()+"]已连接");
  19.     }
  20.     @Override
  21.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  22.         String deviceName = ctx.channel().attr("deviceName").get();
  23.         if(deviceName!=null && ChannelHandlerContextMap.getInstance().containsKey(deviceName)){
  24.             ChannelHandlerContextMap.getInstance().remove(deviceName);
  25.             LogUtil.LOGI(NettyHandler.class.getName(),"客户端["+deviceName+"]断开连接");
  26.         }else{
  27.             LogUtil.LOGI(NettyHandler.class.getName(),"客户端["+ctx.channel().remoteAddress()+"]断开连接");
  28.         }
  29.     }
  30.     @Override
  31.     public void channelRead(ChannelHandlerContext ctx, Object msg){
  32.    
  33.         LogUtil.LOGI(NettyHandler.class.getName(),"源消息:"+message);
  34.         
  35.     }
  36.    
  37.     public void sendMessgae(Channel ctx,String message){
  38.         ctx.writeAndFlush(message);
  39.     }
  40.     @Override
  41.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  42.         cause.printStackTrace();
  43.         ctx.close();
  44.     }
  45.     public String buildReturnPackage(int type,Object content){
  46.         JSONObject jsonObject = new JSONObject();
  47.         jsonObject.put("type",type);
  48.         jsonObject.put("typeName",ConnectCodeUtil.MESSAGE_TYPE.get(type));
  49.         jsonObject.put("content",content);
  50.         return jsonObject+"##@##";
  51.     }   
  52. }
复制代码
channelActive:当客户端连接上服务端,会触发这个方法
channelInactive:当客户端断开服务端,会触发这个方法
exceptionCaught:当出现非常时,会触发这个方法
channelRead:当收到消息,会触发这个方法
我们需要在channelInactivechannelRead这两个方法中,管理好我们第二步创建的Map集合
channelInactive方法中,可以看到,我并不是直接在Map中操作remove,是需要先判断该通道是否包罗标识,存在标识我才remove,这和我们的业务逻辑干系,我们的业务逻辑存在客户端连上了服务端,不外还没有认证(就是认证时,会给通道打上标识,而且添加到Map中)的环境。这里根据你们的业务逻辑进行处置惩罚。
因为我这边的逻辑是,当存在同一个标识的通道出现时,我会将之前的通道给关闭,这个场景应该很常见,关于这个场景有一个标题,当这个场景出现,我会在channelRead方法中将Map中的通道覆盖,然后在channelInactive方法中移除Map中的通道数据,这样就会造成存在挤号时,Map中的数据是小于channelGroup中的数据的,这个你们根据本身的业务需求解决就好了
buildReturnPackage方法,是我构建发送消息的一个通用方法,消息最后的##@##是分隔符,后面会讲到。
4.添加NettyChannelInitializer类

  1. import io.netty.buffer.Unpooled;
  2. import io.netty.channel.ChannelInitializer;
  3. import io.netty.channel.socket.SocketChannel;
  4. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  5. import io.netty.handler.codec.string.StringDecoder;
  6. import io.netty.handler.codec.string.StringEncoder;
  7. import io.netty.handler.logging.LogLevel;
  8. import io.netty.handler.logging.LoggingHandler;
  9. import io.netty.util.CharsetUtil;
  10. public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
  11.     @Override
  12.     protected void initChannel(SocketChannel channel) throws Exception {
  13.         //channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
  14.         String delimiter = "##@##";
  15.         // 对服务端返回的消息通过_$进行分隔,并且每次查找的最大大小为1024字节
  16.         channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
  17.                 Unpooled.wrappedBuffer(delimiter.getBytes())));
  18.         // 将分隔之后的字节数据转换为字符串
  19.         channel.pipeline().addLast(new StringDecoder());
  20.         channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
  21.         channel.pipeline().addLast("heartbeatHandler", new HeartbeatHandler(90, 90, 90)); // 添加心跳处理器
  22.         channel.pipeline().addLast(new NettyHandler());
  23.         //channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
  24.     }
  25. }
复制代码
这个类就是将自界说的NettyHandlerHeartbeatHandler都添加进来,HeartbeatHandler的参数就是HeartbeatHandler类的构造方法的入参,分别是:
读取超时时间:IdleState.READER_IDLE
写入超时时间:IdleState.WRITER_IDLE
读写超时时间:IdleState.ALL_IDLE
单位秒
这里的##@##的界说分割符解码器,因为在Netty传输消息中,可能会存在粘包和拆包的标题,界说分隔符解码器就是,让Netty服务器将两个分隔符中心的文本,当做一条消息,从而解决粘包和拆包的标题。
解码器尚有多种范例:
LineBasedFrameDecoder:界说一条消息已\r\n结尾
FixedLengthFrameDecoder:界说一条消息的固定长度
DelimiterBasedFrameDecoder:自界说分隔符
5.添加配置文件

  1. server.port=8473
  2. netty.port = 5555
  3. netty.url= 127.0.0.1
复制代码
我这里是.properties文件
6.添加启动类

  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelOption;
  4. import io.netty.channel.EventLoopGroup;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import org.springframework.stereotype.Component;
  8. import java.net.InetSocketAddress;
  9. @Component
  10. public class NettyServer {
  11.     // 开启服务端口
  12.     public static void start(InetSocketAddress address) {
  13.         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  14.         EventLoopGroup workerGroup = new NioEventLoopGroup();
  15.         try {
  16.             ServerBootstrap bootstrap = new ServerBootstrap()
  17.                     .group(bossGroup, workerGroup)
  18.                     .channel(NioServerSocketChannel.class)
  19.                     .localAddress(address)
  20.                     .childHandler(new NettyChannelInitializer())
  21.                     .option(ChannelOption.SO_BACKLOG, 102400)
  22.                     .childOption(ChannelOption.SO_KEEPALIVE, true);
  23.             // 绑定端口,开始接收进来的连接
  24.             ChannelFuture future = bootstrap.bind(address).sync();
  25.             LogUtil.LOGI(NettyServer.class.getName(),"开启Netty服务端口:" + address.getPort());
  26.             future.channel().closeFuture().sync();
  27.         } catch (Exception e) {
  28.             e.printStackTrace();
  29.             bossGroup.shutdownGracefully();
  30.             workerGroup.shutdownGracefully();
  31.         }
  32.     }
  33. }
复制代码
然后在springboot的启动类,添加下面代码,就能启动了
  1.         InetSocketAddress address = new InetSocketAddress(URL, PORT);
  2.         server.start(address);
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户国营

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表