随着物联网的发展,很多项目都开始涉及到了tcp毗连这块,在这里我们轻松用netty去实现,站在巨人的肩膀上。
关于netty包引用:
- <!-- TCP SERVER -->
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.42.Final</version>
- <scope>compile</scope>
- </dependency>
复制代码
实现TCP服务器代码
依赖netty只需几行代码tcp服务:
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.nio.ByteOrder;
- public class TcpServer {
- private Logger log = LoggerFactory.getLogger(getClass());
- //自定义tcp服务端口号
- private int port=9000;
- static TcpServer tcpServer;
- //单例设计模式
- private TcpServer(){
- }
- public static TcpServer getInstance(){
- if(tcpServer==null){
- tcpServer=new TcpServer();
- }
- return tcpServer;
- };
- public void run() throws InterruptedException {
- // 创建主线程组(接受连接)
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- // 创建工作线程组(处理连接)
- EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 指定工作线程数量为20
- // 创建ServerBootstrap实例,用于配置服务器
- ServerBootstrap bootstrap = new ServerBootstrap();
- // 配置主、工作线程组
- bootstrap.group(bossGroup, workerGroup);
- // 指定使用NIO进行网络传输
- bootstrap.channel(NioServerSocketChannel.class);
- // 设置子Channel的Socket选项,允许地址重用
- bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
- // 配置子Channel的处理器,这里使用ChannelInitializer
- bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- // 添加自定义的解码器,这里是处理协议
- ch.pipeline().addLast(new YKCDecoderV1());
- // 添加自定义的服务器处理器
- ch.pipeline().addLast(new TCPServerHandler());
- }
- });
- // 绑定端口并添加监听器,处理绑定操作的结果
- bootstrap.bind(port).addListener((ChannelFutureListener) future -> {
- // 在绑定成功后输出日志信息
- log.info("bind success in port: " + port);
- });
- // 输出服务器启动成功信息
- System.out.println("server started!");
- }
- }
复制代码 业务处理代码(参考)
以下是处理报文业务类可参考,注意代码未优化:
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.ByteBufUtil;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- import java.net.InetSocketAddress;
- import java.util.List;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- /**
- * 正则解析版
- */
- public class YKCDecoderV1 extends ByteToMessageDecoder {
- final static String reg = "^68.{14,332}";//单指令解析 根据业务协议报文定最短和最长
- final static Pattern pattern1 = Pattern.compile(reg);
- @Override
- protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf bufferIn, List<Object> list) throws Exception {
- // 获取可读字节数
- int leng = bufferIn.readableBytes();
- // 如果可读字节数小于8,输出错误信息并跳过这部分数据
- if (leng < 8) {
- System.out.println("err! cmd len < 8 .");
- String s = ByteBufUtil.hexDump(bufferIn);
- System.out.println(s);
- bufferIn.skipBytes(leng);
- return;
- } else {
-
- String s = ByteBufUtil.hexDump(bufferIn);
- Matcher matcher1 = pattern1.matcher(s);
- if (matcher1.find()) {
- String cmd = matcher1.group();
- //单指令
- System.out.println("sign cmd: " + cmd);
- String lenStr = cmd.substring(2, 4);
- int len = (Integer.parseInt(lenStr, 16) + 4) * 2;
- int cmdLen = cmd.length();
- if (cmdLen == len) {
- JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(cmd);
- list.add(jfyChargeProtocol);
- bufferIn.skipBytes(leng);
- } else if (cmdLen > len) {
- multiHand(cmd, list);
- bufferIn.skipBytes(leng);
- }
- } else {
- logErr(channelHandlerContext, s);
- System.out.println("err! cmd format invalid: " + s);
- bufferIn.skipBytes(leng);
- }
- }
- }
- private void multiHand(String cmd, List<Object> list) {
- if (cmd.length() < 8) {
- return;
- }
- String lenStr = cmd.substring(2, 4);
- int len = (Integer.parseInt(lenStr, 16) + 4) * 2;
- if (len > cmd.length()) {
- return;
- }
- String newCmd = cmd.substring(0, len);
- if (newCmd.length() == len) {
- System.out.println("multi cmd-> " + newCmd);
- JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(newCmd);
- list.add(jfyChargeProtocol);
- }
- if (cmd.length() > len) {
- System.out.println("multi xxx-> " + cmd);
- String two = cmd.substring(len);
- if(two.startsWith("68")){
- multiHand(two, list);
- }
- }
- }
- private int checkSignCmd(String cmd) {
- int cmd_len = getCmdLen(cmd);
- return cmd.length() - cmd_len;
- }
- private int getCmdLen(String cmd) {
- String leng = cmd.substring(28, 30) + cmd.substring(26, 28);
- int dec_num = Integer.parseInt(leng, 16);
- return (dec_num * 2) + 34;
- }
- private void logErr(ChannelHandlerContext ctx, String msg) {
- InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
- String clientIP = insocket.getAddress().getHostAddress();
- System.out.println(clientIP + " :: " + msg);
- }
复制代码- public class JFYChargeProtocol {
- private int length;
- private byte[] raw;
- private String rawStr;
- public JFYChargeProtocol(int length,byte[] raw){
- this.length=length;
- this.raw=raw;
- }
- public JFYChargeProtocol(String raw){
- this.rawStr=raw;
- }
-
- public int getLength() {
- return length;
- }
- public void setLength(int length) {
- this.length = length;
- }
- public byte[] getRaw() {
- return raw;
- }
- public void setRaw(byte[] raw) {
- this.raw = raw;
- }
- public String getRawStr() {
- return rawStr;
- }
- public void setRawStr(String rawStr) {
- this.rawStr = rawStr;
- }
- }
复制代码
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import org.apache.logging.log4j.LogManager;
- import org.apache.logging.log4j.Logger;
- import org.springframework.stereotype.Service;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- @Service
- public class TCPServerHandler extends ChannelInboundHandlerAdapter {
- private static final Logger logger = LogManager.getLogger(TCPServerHandler.class);
- static Map<String,ChannelHandlerContext> inList=new ConcurrentHashMap<String,ChannelHandlerContext>();
- /**
- * 新连接
- * @param ctx
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) {
- String channelName=getChannelName(ctx);
- inList.put(channelName,ctx);
- logger.info("dev new conn > " +channelName);
- }
- private String getChannelName(ChannelHandlerContext ctx) {
- return "ykc".concat(ctx.channel().remoteAddress().toString());
- }
- /**
- * 连接下线
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- String channelName=getChannelName(ctx);
- logger.info("dev close conn > " + channelName);
- inList.remove(channelName);
- ctx.fireChannelInactive();
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- JFYChargeProtocol in = (JFYChargeProtocol) msg;
- String readMsg= in.getRawStr();
- logger.info("read dev <= " + readMsg);
- String channelName=getChannelName(ctx);
- readMsg=channelName+"$$"+readMsg;
- PackageHandlerImpl.getInstance().doHandle(readMsg);
- //ctx.writeAndFlush(in);
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- cause.printStackTrace();
- ctx.close();
- }
- /**
- * 回复信息给设备
- * @param hex
- */
- public static boolean RepDev(String hex){
- String[] kv= hex.split("\\$\\$");
- if(kv.length==2){
- String key=kv[0];
- ChannelHandlerContext context=inList.get(key);
- if(context!=null){
- byte[] bytes= ByteUtil.hexString2Bytes(kv[1]);
- ByteBuf byteBuf= Unpooled.copiedBuffer(bytes);
- context.writeAndFlush(byteBuf);
- return true;
- }else{
- logger.error("dev offline="+key);
- }
- }else{
- logger.error("cmd format err");
- }
- return false;
- }
- }
复制代码- import java.util.ArrayList;
- import java.util.List;
- public class PackageHandlerImpl implements PackageHandler {
- public static List<PackageHandler> packageHandlers= new ArrayList<PackageHandler>();
- static PackageHandlerImpl packageHandler;
- protected PackageHandlerImpl(){
- super();
- System.out.println("init PackageHandlerImpl");
- }
- public static PackageHandlerImpl getInstance(){
- if(packageHandler==null){
- packageHandler=new PackageHandlerImpl();
- }
- return packageHandler;
- }
- @Override
- public void doHandle(String hex) {
- for(PackageHandler f : packageHandlers){
- f.doHandle(hex);
- }
- }
- public PackageHandlerImpl addHandle(PackageHandler f){
- packageHandlers.add(f);
- return this;
- }
- }
复制代码- /**
- * 包处理
- */
- public interface PackageHandler {
- void doHandle(String hex) ;
- }
复制代码- import org.apache.activemq.command.ActiveMQQueue;
- import org.apache.logging.log4j.LogManager;
- import org.apache.logging.log4j.Logger;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.jms.annotation.JmsListener;
- import org.springframework.jms.core.JmsMessagingTemplate;
- import org.springframework.stereotype.Service;
- import javax.jms.Destination;
- @Service
- public class TranServiceImpl {
- private static final Logger logger = LogManager.getLogger(DevServiceApplication.class);
- /**
- * 接受服务器 返回数据
- */
- private final String out_name="ykc_out";
- /**
- * 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
- */
- @Autowired
- private JmsMessagingTemplate jmsTemplate;
- /**
- * 发送消息 采用系统配置类型
- *
- * @param queueName 是发送到的队列名称
- * @param message 是发送到的队列
- */
- public void sendMessage(String queueName, final String message) {
- jmsTemplate.convertAndSend(queueName, message);
- }
- /**
- * 发送消息 采用指定队列类型
- *
- * @param queueName 是发送到的队列
- * @param message 是发送到的队列
- */
- public void sendMessageByQueue(String queueName, final String message) {
- Destination destination = new ActiveMQQueue(queueName);
- jmsTemplate.convertAndSend(destination, message);
- }
- @JmsListener(destination = out_name)
- public void receiveQueue(String text) {
- System.out.println("to dev => "+text);
- if(!TCPServerHandler.RepDev(text)){
- logger.error("write mq fail ==> "+text);
- }
- }
- }
复制代码 运行
当前是集成到 springboot2框架在这里即可运行,或实现实现 org.springframework.boot.ApplicationRunner 或 org.springframework.boot.CommandLineRunner 的接口,即启动后实行的任务,不用框架的在main方法也可以直接运行。
- /**
- * tcp服务在框架启动后 跟着启动即可
- */
- @SpringBootApplication
- public class DevServiceApplication {
- private static final Logger logger = LogManager.getLogger(DevServiceApplication.class);
- public static void main(String[] args) {
- TcpServer tcpServer = TcpServer.getInstance();
- try {
- tcpServer.run();
- PackageHandlerImpl packageHandler = PackageHandlerImpl.getInstance();
- packageHandler.addHandle(new PackageHandlerByMQ());
- } catch (InterruptedException e) {
- e.printStackTrace();
- logger.error("TCP服务错误", e);
- throw new RuntimeException();
- }
- }
复制代码 总结
看看服务器上的tcp服务运行情况
运行天数:
流量状态图:
站在netty巨人的肩膀上,这个tcp服务实现方式简单,运行更是稳定。服务器运行时就摆设了不绝到今天共运行1235天了,900多个装备同步在线,配了2g的jvm运行内存,cpu占用5.6(top截图等了很久才出来5.6是个峰值,平常不到1)确保某个市的充电桩装备。中心由于客户的充电桩装备协议题目更新过几次,刚时开始是使用netty官网的解码LengthFieldBasedFrameDecoder做处理,可以说非常高效,但随后发现有几个产商的装备报文头部有特别字符,而且刚好和协议头有些重叠,再考虑到示来的产商协议的不确定性,为了兼容这些产家不得以并以正则的方法行止理。
扩展部分
Netty 官方提供的编解码器
- 字符串编解码器:
- StringEncoder:将字符串编码为字节。
- StringDecoder:将字节解码为字符串。
- 字节流编解码器:
- ByteArrayEncoder:将字节数组编码为字节。
- ByteArrayDecoder:将字节解码为字节数组。
- 对象序列化编解码器:
- ObjectEncoder:将对象序列化为字节。
- ObjectDecoder:将字节反序列化为对象。
- 长度字段编解码器:
- LengthFieldPrepender:在消息头部添加表现消息长度的字段。
- LengthFieldBasedFrameDecoder:根据长度字段解码消息,用于处理拆包和粘包题目。
- 行分隔符编解码器:
- LineBasedFrameDecoder:按行切分消息,通常用于处理文本协议。
- DelimiterBasedFrameDecoder:
- DelimiterBasedFrameDecoder:按照指定的分隔符切分消息,用于处理自定义分隔符的协议。
- Protobuf 编解码器:
- ProtobufEncoder:将 Protobuf 对象编码为字节。
- ProtobufDecoder:将字节解码为 Protobuf 对象。
- HTTP 编解码器:
- HttpRequestEncoder:将 HTTP 哀求编码为字节。
- HttpResponseDecoder:将字节解码为 HTTP 响应。
- HttpRequestDecoder:将字节解码为 HTTP 哀求。
- HttpResponseEncoder:将 HTTP 响应编码为字节。
- WebSocket 编解码器:
- WebSocketServerProtocolHandler:处理 WebSocket 握手以及帧的编解码。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |