用netty轻松实现一个高效稳定的TCP服务器

打印 上一主题 下一主题

主题 669|帖子 669|积分 2009

          随着物联网的发展,很多项目都开始涉及到了tcp毗连这块,在这里我们轻松用netty去实现,站在巨人的肩膀上。
关于netty包引用:
  1. <!-- TCP SERVER -->
  2.         <dependency>
  3.             <groupId>io.netty</groupId>
  4.             <artifactId>netty-all</artifactId>
  5.             <version>4.1.42.Final</version>
  6.             <scope>compile</scope>
  7.         </dependency>
复制代码

实现TCP服务器代码

依赖netty只需几行代码tcp服务:
  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.*;
  3. import io.netty.channel.nio.NioEventLoopGroup;
  4. import io.netty.channel.socket.SocketChannel;
  5. import io.netty.channel.socket.nio.NioServerSocketChannel;
  6. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.nio.ByteOrder;
  10. public class TcpServer {
  11.     private Logger log = LoggerFactory.getLogger(getClass());
  12.     //自定义tcp服务端口号
  13.     private int port=9000;
  14.     static TcpServer tcpServer;
  15.     //单例设计模式
  16.     private TcpServer(){
  17.     }
  18.     public static TcpServer getInstance(){
  19.         if(tcpServer==null){
  20.             tcpServer=new TcpServer();
  21.         }
  22.         return tcpServer;
  23.     };
  24. public void run() throws InterruptedException {
  25.     // 创建主线程组(接受连接)
  26.     EventLoopGroup bossGroup = new NioEventLoopGroup();
  27.     // 创建工作线程组(处理连接)
  28.     EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 指定工作线程数量为20
  29.     // 创建ServerBootstrap实例,用于配置服务器
  30.     ServerBootstrap bootstrap = new ServerBootstrap();
  31.     // 配置主、工作线程组
  32.     bootstrap.group(bossGroup, workerGroup);
  33.     // 指定使用NIO进行网络传输
  34.     bootstrap.channel(NioServerSocketChannel.class);
  35.     // 设置子Channel的Socket选项,允许地址重用
  36.     bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
  37.     // 配置子Channel的处理器,这里使用ChannelInitializer
  38.     bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  39.         @Override
  40.         public void initChannel(SocketChannel ch) throws Exception {
  41.             // 添加自定义的解码器,这里是处理协议
  42.             ch.pipeline().addLast(new YKCDecoderV1());
  43.             // 添加自定义的服务器处理器
  44.             ch.pipeline().addLast(new TCPServerHandler());
  45.         }
  46.     });
  47.     // 绑定端口并添加监听器,处理绑定操作的结果
  48.     bootstrap.bind(port).addListener((ChannelFutureListener) future -> {
  49.         // 在绑定成功后输出日志信息
  50.         log.info("bind success in port: " + port);
  51.     });
  52.     // 输出服务器启动成功信息
  53.     System.out.println("server started!");
  54. }
  55. }
复制代码
 业务处理代码(参考)

以下是处理报文业务类可参考,注意代码未优化:
  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.ByteBufUtil;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.ByteToMessageDecoder;
  5. import java.net.InetSocketAddress;
  6. import java.util.List;
  7. import java.util.regex.Matcher;
  8. import java.util.regex.Pattern;
  9. /**
  10. * 正则解析版
  11. */
  12. public class YKCDecoderV1 extends ByteToMessageDecoder {
  13.     final static String reg = "^68.{14,332}";//单指令解析 根据业务协议报文定最短和最长
  14.     final static Pattern pattern1 = Pattern.compile(reg);
  15.     @Override
  16.     protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf bufferIn, List<Object> list) throws Exception {
  17.         // 获取可读字节数
  18.         int leng = bufferIn.readableBytes();
  19.         // 如果可读字节数小于8,输出错误信息并跳过这部分数据
  20.         if (leng < 8) {
  21.             System.out.println("err! cmd len < 8 .");
  22.             String s = ByteBufUtil.hexDump(bufferIn);
  23.             System.out.println(s);
  24.             bufferIn.skipBytes(leng);
  25.             return;
  26.         } else {
  27.             
  28.             String s = ByteBufUtil.hexDump(bufferIn);
  29.             Matcher matcher1 = pattern1.matcher(s);
  30.             if (matcher1.find()) {
  31.                 String cmd = matcher1.group();
  32.                 //单指令
  33.                 System.out.println("sign cmd: " + cmd);
  34.                 String lenStr = cmd.substring(2, 4);
  35.                 int len = (Integer.parseInt(lenStr, 16) + 4) * 2;
  36.                 int cmdLen = cmd.length();
  37.                 if (cmdLen == len) {
  38.                     JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(cmd);
  39.                     list.add(jfyChargeProtocol);
  40.                     bufferIn.skipBytes(leng);
  41.                 } else if (cmdLen > len) {
  42.                     multiHand(cmd, list);
  43.                     bufferIn.skipBytes(leng);
  44.                 }
  45.             } else {
  46.                 logErr(channelHandlerContext, s);
  47.                 System.out.println("err! cmd format invalid: " + s);
  48.                 bufferIn.skipBytes(leng);
  49.             }
  50.         }
  51.     }
  52.     private void multiHand(String cmd, List<Object> list) {
  53.         if (cmd.length() < 8) {
  54.             return;
  55.         }
  56.         String lenStr = cmd.substring(2, 4);
  57.         int len = (Integer.parseInt(lenStr, 16) + 4) * 2;
  58.         if (len > cmd.length()) {
  59.             return;
  60.         }
  61.         String newCmd = cmd.substring(0, len);
  62.         if (newCmd.length() == len) {
  63.             System.out.println("multi cmd-> " + newCmd);
  64.             JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(newCmd);
  65.             list.add(jfyChargeProtocol);
  66.         }
  67.         if (cmd.length() > len) {
  68.             System.out.println("multi xxx-> " + cmd);
  69.             String two = cmd.substring(len);
  70.             if(two.startsWith("68")){
  71.                 multiHand(two, list);
  72.             }
  73.         }
  74.     }
  75.     private int checkSignCmd(String cmd) {
  76.         int cmd_len = getCmdLen(cmd);
  77.         return cmd.length() - cmd_len;
  78.     }
  79.     private int getCmdLen(String cmd) {
  80.         String leng = cmd.substring(28, 30) + cmd.substring(26, 28);
  81.         int dec_num = Integer.parseInt(leng, 16);
  82.         return (dec_num * 2) + 34;
  83.     }
  84.     private void logErr(ChannelHandlerContext ctx, String msg) {
  85.         InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
  86.         String clientIP = insocket.getAddress().getHostAddress();
  87.         System.out.println(clientIP + " :: " + msg);
  88.     }
复制代码
  1. public class JFYChargeProtocol {
  2.     private int length;
  3.     private byte[] raw;
  4.     private String rawStr;
  5.     public JFYChargeProtocol(int length,byte[] raw){
  6.         this.length=length;
  7.         this.raw=raw;
  8.     }
  9.     public JFYChargeProtocol(String raw){
  10.         this.rawStr=raw;
  11.     }
  12.    
  13.     public int getLength() {
  14.         return length;
  15.     }
  16.     public void setLength(int length) {
  17.         this.length = length;
  18.     }
  19.     public byte[] getRaw() {
  20.         return raw;
  21.     }
  22.     public void setRaw(byte[] raw) {
  23.         this.raw = raw;
  24.     }
  25.     public String getRawStr() {
  26.         return rawStr;
  27.     }
  28.     public void setRawStr(String rawStr) {
  29.         this.rawStr = rawStr;
  30.     }
  31. }
复制代码

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import org.apache.logging.log4j.LogManager;
  6. import org.apache.logging.log4j.Logger;
  7. import org.springframework.stereotype.Service;
  8. import java.util.Map;
  9. import java.util.concurrent.ConcurrentHashMap;
  10. @Service
  11. public class TCPServerHandler extends ChannelInboundHandlerAdapter {
  12.     private static final Logger logger = LogManager.getLogger(TCPServerHandler.class);
  13.     static Map<String,ChannelHandlerContext> inList=new ConcurrentHashMap<String,ChannelHandlerContext>();
  14.     /**
  15.      * 新连接
  16.      * @param ctx
  17.      */
  18.     @Override
  19.     public void channelActive(ChannelHandlerContext ctx) {
  20.         String channelName=getChannelName(ctx);
  21.         inList.put(channelName,ctx);
  22.         logger.info("dev new conn > " +channelName);
  23.     }
  24.     private String getChannelName(ChannelHandlerContext ctx) {
  25.         return "ykc".concat(ctx.channel().remoteAddress().toString());
  26.     }
  27.     /**
  28.      * 连接下线
  29.      * @param ctx
  30.      * @throws Exception
  31.      */
  32.     @Override
  33.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  34.         String channelName=getChannelName(ctx);
  35.         logger.info("dev close conn > " + channelName);
  36.         inList.remove(channelName);
  37.         ctx.fireChannelInactive();
  38.     }
  39.     @Override
  40.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  41.         JFYChargeProtocol in = (JFYChargeProtocol) msg;
  42.         String readMsg= in.getRawStr();
  43.         logger.info("read dev <= " + readMsg);
  44.         String channelName=getChannelName(ctx);
  45.         readMsg=channelName+"$$"+readMsg;
  46.         PackageHandlerImpl.getInstance().doHandle(readMsg);
  47.         //ctx.writeAndFlush(in);
  48.     }
  49.     @Override
  50.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  51.          cause.printStackTrace();
  52.          ctx.close();
  53.     }
  54.     /**
  55.      * 回复信息给设备
  56.      * @param hex
  57.      */
  58.     public static boolean RepDev(String hex){
  59.         String[] kv= hex.split("\\$\\$");
  60.         if(kv.length==2){
  61.             String key=kv[0];
  62.             ChannelHandlerContext context=inList.get(key);
  63.             if(context!=null){
  64.                 byte[] bytes= ByteUtil.hexString2Bytes(kv[1]);
  65.                 ByteBuf byteBuf= Unpooled.copiedBuffer(bytes);
  66.                 context.writeAndFlush(byteBuf);
  67.                 return true;
  68.             }else{
  69.                 logger.error("dev offline="+key);
  70.             }
  71.         }else{
  72.             logger.error("cmd format err");
  73.         }
  74.       return false;
  75.     }
  76. }
复制代码
  1. import java.util.ArrayList;
  2. import java.util.List;
  3. public  class PackageHandlerImpl implements PackageHandler {
  4.     public static List<PackageHandler> packageHandlers= new ArrayList<PackageHandler>();
  5.     static PackageHandlerImpl packageHandler;
  6.     protected PackageHandlerImpl(){
  7.         super();
  8.         System.out.println("init PackageHandlerImpl");
  9.     }
  10.     public static PackageHandlerImpl getInstance(){
  11.         if(packageHandler==null){
  12.             packageHandler=new PackageHandlerImpl();
  13.         }
  14.         return packageHandler;
  15.     }
  16.     @Override
  17.     public void doHandle(String hex) {
  18.         for(PackageHandler f : packageHandlers){
  19.             f.doHandle(hex);
  20.         }
  21.     }
  22.     public PackageHandlerImpl addHandle(PackageHandler f){
  23.         packageHandlers.add(f);
  24.         return this;
  25.     }
  26. }
复制代码
  1. /**
  2. * 包处理
  3. */
  4. public interface PackageHandler {
  5.      void doHandle(String hex) ;
  6. }
复制代码
  1. import org.apache.activemq.command.ActiveMQQueue;
  2. import org.apache.logging.log4j.LogManager;
  3. import org.apache.logging.log4j.Logger;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.jms.annotation.JmsListener;
  7. import org.springframework.jms.core.JmsMessagingTemplate;
  8. import org.springframework.stereotype.Service;
  9. import javax.jms.Destination;
  10. @Service
  11. public class TranServiceImpl {
  12.     private static final Logger logger = LogManager.getLogger(DevServiceApplication.class);
  13.     /**
  14.      * 接受服务器 返回数据
  15.      */
  16.     private final String out_name="ykc_out";
  17.     /**
  18.      * 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
  19.      */
  20.     @Autowired
  21.     private JmsMessagingTemplate jmsTemplate;
  22.     /**
  23.      * 发送消息 采用系统配置类型
  24.      *
  25.      * @param queueName 是发送到的队列名称
  26.      * @param message   是发送到的队列
  27.      */
  28.     public void sendMessage(String queueName, final String message) {
  29.         jmsTemplate.convertAndSend(queueName, message);
  30.     }
  31.     /**
  32.      * 发送消息 采用指定队列类型
  33.      *
  34.      * @param queueName 是发送到的队列
  35.      * @param message   是发送到的队列
  36.      */
  37.     public void sendMessageByQueue(String queueName, final String message) {
  38.         Destination destination = new ActiveMQQueue(queueName);
  39.         jmsTemplate.convertAndSend(destination, message);
  40.     }
  41.     @JmsListener(destination = out_name)
  42.     public void receiveQueue(String text) {
  43.         System.out.println("to dev => "+text);
  44.         if(!TCPServerHandler.RepDev(text)){
  45.             logger.error("write mq fail ==> "+text);
  46.         }
  47.     }
  48. }
复制代码
 运行

当前是集成到 springboot2框架在这里即可运行,或实现实现 org.springframework.boot.ApplicationRunner 或 org.springframework.boot.CommandLineRunner 的接口,即启动后实行的任务,不用框架的在main方法也可以直接运行。
  1. /**
  2. * tcp服务在框架启动后 跟着启动即可
  3. */
  4. @SpringBootApplication
  5. public class DevServiceApplication {
  6.     private static final Logger logger = LogManager.getLogger(DevServiceApplication.class);
  7.     public static void main(String[] args) {
  8.         TcpServer tcpServer = TcpServer.getInstance();
  9.         try {
  10.             tcpServer.run();
  11.             PackageHandlerImpl packageHandler = PackageHandlerImpl.getInstance();
  12.             packageHandler.addHandle(new PackageHandlerByMQ());
  13.         } catch (InterruptedException e) {
  14.             e.printStackTrace();
  15.             logger.error("TCP服务错误", e);
  16.             throw new RuntimeException();
  17.         }
  18.     }
复制代码
总结

看看服务器上的tcp服务运行情况
运行天数:

流量状态图:


            站在netty巨人的肩膀上,这个tcp服务实现方式简单,运行更是稳定。服务器运行时就摆设了不绝到今天共运行1235天了,900多个装备同步在线,配了2g的jvm运行内存,cpu占用5.6(top截图等了很久才出来5.6是个峰值,平常不到1)确保某个市的充电桩装备。中心由于客户的充电桩装备协议题目更新过几次,刚时开始是使用netty官网的解码LengthFieldBasedFrameDecoder做处理,可以说非常高效,但随后发现有几个产商的装备报文头部有特别字符,而且刚好和协议头有些重叠,再考虑到示来的产商协议的不确定性,为了兼容这些产家不得以并以正则的方法行止理。

扩展部分

Netty 官方提供的编解码器


  • 字符串编解码器:

    • StringEncoder:将字符串编码为字节。
    • StringDecoder:将字节解码为字符串。

  • 字节流编解码器:

    • ByteArrayEncoder:将字节数组编码为字节。
    • ByteArrayDecoder:将字节解码为字节数组。

  • 对象序列化编解码器:

    • ObjectEncoder:将对象序列化为字节。
    • ObjectDecoder:将字节反序列化为对象。

  • 长度字段编解码器:

    • LengthFieldPrepender:在消息头部添加表现消息长度的字段。
    • LengthFieldBasedFrameDecoder:根据长度字段解码消息,用于处理拆包和粘包题目。

  • 行分隔符编解码器:

    • LineBasedFrameDecoder:按行切分消息,通常用于处理文本协议。

  • DelimiterBasedFrameDecoder:

    • DelimiterBasedFrameDecoder:按照指定的分隔符切分消息,用于处理自定义分隔符的协议。

  • Protobuf 编解码器:

    • ProtobufEncoder:将 Protobuf 对象编码为字节。
    • ProtobufDecoder:将字节解码为 Protobuf 对象。

  • HTTP 编解码器:

    • HttpRequestEncoder:将 HTTP 哀求编码为字节。
    • HttpResponseDecoder:将字节解码为 HTTP 响应。
    • HttpRequestDecoder:将字节解码为 HTTP 哀求。
    • HttpResponseEncoder:将 HTTP 响应编码为字节。

  • WebSocket 编解码器:

    • WebSocketServerProtocolHandler:处理 WebSocket 握手以及帧的编解码。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

刘俊凯

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表