Netty实现udp服务器

科技颠覆者  金牌会员 | 2024-6-28 23:01:50 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 552|帖子 552|积分 1656

1、TCP与UDP通讯协议

网络传输层协议有两种,一种是TCP,别的一种是UDP。
TCP是一种面向连接的协议,提供可靠的数据传输。TCP通过三次握手建立连接,并通过确认和重传机制,包管数据的完备性和可靠性。TCP适用于对数据正确性要求较高、对实时性要求较低的应用场景,如网页浏览、文件传输等。
UDP是一种无连接的协议,不包管数据的可靠性传输。UDP通过尽力交付数据包的方式进行传输,不对数据包的传输状态进行确认和重传,因此速率较快。UDP适用于对实时性要求较高、对数据正确性要求较低的应用场景,如视频传输、语音通讯等。
那么,这两种有哪些区别呢?请看下面:


  • 连接方式:UDP是无连接的,TCP是面向连接的;
  • 可靠性:UDP不包管数据的可靠性传输,TCP包管数据的可靠传输;
  • 速率:UDP传输速率较快,TCP传输速率较慢;
  • 传输方式:UDP采用尽力交付的方式传输数据包,不进行确认和重传;TCP通过确认和重传机制包管数据的完备性和可靠性;
  • 开销:UDP的开销较低,TCP的开销较高。
总结:我们可以根据具体的应用场景和需求选择使用UDP或TCP进行数据传输。如果对数据的实时性要求较高,且对数据正确性要求较低,可以选择使用UDP。如果对数据的正确性要求较高,可以选择使用TCP。
2、游戏行业选择的通讯协议

游戏由于对数据的正确性(不允许丢包,乱序)非常高,一样平常都是选择基于TCP的socket通讯。但UDP的低时延,快速传输对实时性要求非常高的游戏类型也黑白常大的吸引力。据说,魔兽天下以及Dota2使用UDP开发。当然,使用udp通讯的游戏肯定在通讯层做了适配,包管关键数据不丢包,不乱序。
比年来,基于UDP协议的KCP(Kuai Control Protocol,快速可靠传输协议),也得到了快速的发展。据说,原神是使用KCP通讯的。
3、Netty使用UDP协议

netty使用udp协议,网上的例子都黑白常简朴的。都是两个类搞定。没有办理以下几个题目:


  • 如何与现有网络框架适配(支持私有协议栈,支持javabean,不单单是字符串)
  • 如何自动与客户端通讯(不再是客户端发一个消息,服务器直接推送一个回包)
本文重要就这两个题目进行案例说明。
3.1、netty数据包载体

udp是无连接的,这意味着通讯双方无需像TCP那般“三次握手四次释放”。只要知道对方的socket地址(Ip+Port),即可发送数据,发完即终止。在Netty里,udp的通讯载体叫做DatagramPacket,负责将数据(ByteBuf)从源头发送到目的地。
  1. public class DatagramPacket extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder {
  2.     public DatagramPacket(ByteBuf data, InetSocketAddress recipient) {
  3.         super(data, recipient);
  4.     }
  5.     public DatagramPacket(ByteBuf data, InetSocketAddress recipient, InetSocketAddress sender) {
  6.         super(data, recipient, sender);
  7.     }
  8. }
复制代码
3.2私有协议栈编解码

3.2.1、通讯基类

而我们的网络底层通讯是基于javabean的,因此定义我们的消息基类如下:
  1. public class UdpMessage implements Message {
  2.     private String senderIp;
  3.     private int senderPort;
  4.     private String receiverIp;
  5.     private int receiverPort;
  6. }
复制代码
3.2.2、私有协议栈消息编码

我们将私有协议栈定义为 包头(消息类型id),包体(具体消息经编码后的字节数组)。将自定义消息UdpMessage转为DatagramPacket。
  1. public class UdpProtocolEncoder extends MessageToMessageEncoder<UdpMessage> {
  2.     private static final Logger logger = LoggerFactory.getLogger("socketserver");
  3.     private final MessageFactory messageFactory;
  4.     private final MessageCodec messageCodec;
  5.     public UdpProtocolEncoder(MessageFactory messageFactory, MessageCodec messageCodec) {
  6.         this.messageFactory = messageFactory;
  7.         this.messageCodec = messageCodec;
  8.     }
  9.     @Override
  10.     protected void encode(ChannelHandlerContext ctx, UdpMessage message, List<Object> out) throws Exception {
  11.         // ----------------protocol pattern-------------------------
  12.         // packetLength | cmd | body
  13.         // int int byte[]
  14.         int  cmd = messageFactory.getMessageId(message.getClass());
  15.         try {
  16.             byte[] body = messageCodec.encode(message);
  17.             //消息内容长度
  18.             ByteBuf buf = Unpooled.buffer(body.length+4);
  19.             // 写入cmd类型
  20.             buf.writeInt(cmd);
  21.             buf.writeBytes(body);
  22.             out.add(new DatagramPacket(buf, new InetSocketAddress(message.getReceiverIp(), message.getReceiverPort())));
  23.         } catch (Exception e) {
  24.             logger.error("wrote message {} failed", cmd, e);
  25.         }
  26.     }
  27. }
复制代码
这里消息pojo编码,使用了jforgame的组件,根据javabean的字段元信息,自动编码为byte数组。
依靠说明如下:
  1. <dependency>
  2.     <groupId>io.github.jforgame</groupId>
  3.     <artifactId>jforgame-codec-struct</artifactId>
  4.     <version>1.1.0</version>
  5. </dependency>
复制代码
3.2.3、私有协议栈消息解码

私有协议栈解码负责将数据包DatagramPacket转为UdpMessage。将底层数据流转为ByteBuf之后,还必要将字节数据进行解码,才可以转换为应用程序熟悉的消息。
  1. public class UdpProtocolDecoder extends MessageToMessageDecoder<DatagramPacket> {
  2.     private final MessageFactory messageFactory;
  3.     private final MessageCodec messageCodec;
  4.     public UdpProtocolDecoder(MessageFactory messageFactory, MessageCodec messageCodec) {
  5.         this.messageFactory = messageFactory;
  6.         this.messageCodec = messageCodec;
  7.     }
  8.     @Override
  9.     protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception {
  10.         ByteBuf in = msg.content();
  11.         int length = in.readableBytes();
  12.         int cmd = in.readInt();
  13.         byte[] body = new byte[length - 4];
  14.         in.readBytes(body);
  15.         Class<?> msgClazz = messageFactory.getMessage(cmd);
  16.         out.add(messageCodec.decode(msgClazz, body));
  17.     }
  18. }
复制代码
4、服务端代码

4.1、会话管理

服务端会话管理(建立及摧毁),以及消息担当(下文的channelRead方法)。
  1. @ChannelHandler.Sharable
  2. public class UdpChannelIoHandler extends ChannelInboundHandlerAdapter {
  3.     private final static Logger logger = LoggerFactory.getLogger("socketserver");
  4.     /** 消息分发器 */
  5.     private final SocketIoDispatcher messageDispatcher;
  6.     public UdpChannelIoHandler(SocketIoDispatcher messageDispatcher) {
  7.         super();
  8.         this.messageDispatcher = messageDispatcher;
  9.     }
  10.     @Override
  11.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
  12.         Channel channel = ctx.channel();
  13.         ChannelUtils.duplicateBindingSession(ctx.channel(), new NSession(channel));
  14.         SessionManager.getInstance().buildSession(ChannelUtils.getSessionBy(channel));
  15.         System.out.println("socket register " + channel);
  16.     }
  17.     @Override
  18.     public void channelRead(ChannelHandlerContext context, Object packet) throws Exception {
  19.         logger.debug("receive pact, content is {}", packet.getClass().getSimpleName());
  20.         final Channel channel = context.channel();
  21.         IdSession session = ChannelUtils.getSessionBy(channel);
  22.         messageDispatcher.dispatch(session, packet);
  23.     }
  24.     @Override
  25.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  26.         Channel channel = ctx.channel();
  27.         System.out.println("socket inactive " + channel);
  28.         IdSession userSession = ChannelUtils.getSessionBy(channel);
  29.         messageDispatcher.onSessionClosed(userSession);
  30.     }
  31. }
复制代码
必要注意的是,UDP Socket Server的链接建立,只在启动的时间触发一次。之后,无论有多少个客户端,上文的channelActive都不会再次触发。也就是说,客户端的链接不是一对一的,全局只有一个服务端链接。这点与tcp不同。那么,怎么区分不同的客户端呢?后文例子发表。
4.2、消息路由

具体的消息处理(通过消息路由及消息处理器注解)。网关担当到消息之后,自动把消息分发到对应的处理器。类似与springmvc的Controller以及RequestMapper功能。
  1. public class MessageIoDispatcher extends ChainedMessageDispatcher {
  2.     private MessageHandlerRegister handlerRegister;
  3.     MessageFactory messageFactory = GameMessageFactory.getInstance();
  4.     private MessageParameterConverter msgParameterConverter= new DefaultMessageParameterConverter(messageFactory);
  5.     public MessageIoDispatcher() {
  6.         LoginRouter router = new LoginRouter();
  7.         this.handlerRegister = new CommonMessageHandlerRegister(Collections.singletonList(router), messageFactory);
  8.         MessageHandler messageHandler = (session, message) -> {
  9.             int cmd = GameMessageFactory.getInstance().getMessageId(message.getClass());
  10.             MessageExecutor cmdExecutor = handlerRegister.getMessageExecutor(cmd);
  11.             if (cmdExecutor == null) {
  12.                 logger.error("message executor missed,  cmd={}", cmd);
  13.                 return true;
  14.             }
  15.             Object[] params = msgParameterConverter.convertToMethodParams(session, cmdExecutor.getParams(), message);
  16.             Object controller = cmdExecutor.getHandler();
  17.             MessageTask task = MessageTask.valueOf(session, session.hashCode(), controller, cmdExecutor.getMethod(), params);
  18.             task.setRequest(message);
  19.             // 丢到任务消息队列,不在io线程进行业务处理
  20.             GameServer.getMonitorGameExecutor().accept(task);
  21.             return true;
  22.         };
  23.         addMessageHandler(messageHandler);
  24.     }
  25.     @Override
  26.     public void onSessionCreated(IdSession session) {
  27.     }
  28.     @Override
  29.     public void onSessionClosed(IdSession session) {
  30.     }
  31. }
复制代码
4.3、服务端启动代码

  1. public class UdpSocketServer implements ServerNode {
  2.     private static final Logger logger = LoggerFactory.getLogger("socketserver");
  3.     private EventLoopGroup group = new NioEventLoopGroup();
  4.     protected HostAndPort nodesConfig = HostAndPort.valueOf(8088);
  5.     public SocketIoDispatcher socketIoDispatcher;
  6.     public MessageFactory messageFactory;
  7.     public MessageCodec messageCodec;
  8.     @Override
  9.     public void start() throws Exception {
  10.         try {
  11.             SessionManager.getInstance().schedule();
  12.             
  13.             Bootstrap bootstrap = new Bootstrap();
  14.             bootstrap.group(group)
  15.                     .channel(NioDatagramChannel.class)
  16.                     .handler(new LoggingHandler(LogLevel.DEBUG))
  17.                     .handler(new ChannelInitializer<DatagramChannel>() {
  18.                         @Override
  19.                         public void initChannel(DatagramChannel ch) throws Exception {
  20.                             ChannelPipeline pipeline = ch.pipeline();
  21.                             pipeline.addLast("protocolDecoder", new UdpProtocolDecoder(messageFactory, messageCodec));
  22.                             pipeline.addLast("protocolEncoder", new UdpProtocolEncoder(messageFactory, messageCodec));
  23.                             pipeline.addLast(new UdpChannelIoHandler(socketIoDispatcher));
  24.                         }
  25.                     });
  26.             logger.info("socket server is listening at " + nodesConfig.getPort() + "......");
  27.             bootstrap.bind(nodesConfig.getPort()).sync().channel().closeFuture().sync();
  28.         } catch (Exception e) {
  29.             logger.error("", e);
  30.             group.shutdownGracefully();
  31.         }
  32.     }
  33.     @Override
  34.     public void shutdown() throws Exception {
  35.         group.shutdownGracefully();
  36.     }
  37.     public static void main(String[] args) throws Exception {
  38.         UdpSocketServer udpSocketServer = new UdpSocketServer();
  39.         udpSocketServer.messageFactory = GameMessageFactory.getInstance();
  40.         udpSocketServer.messageCodec = new StructMessageCodec();
  41.         udpSocketServer.socketIoDispatcher = new MessageIoDispatcher();
  42.         udpSocketServer.start();
  43.     }
  44. }
复制代码
5、客户端代码

5.1、客户端启动代码

  1. public class UdpSocketClient extends AbstractSocketClient {
  2.     private final EventLoopGroup group = new NioEventLoopGroup(1);
  3.     private HostAndPort nativeHostPort;
  4.     public UdpSocketClient(SocketIoDispatcher messageDispatcher, MessageFactory messageFactory, MessageCodec messageCodec, HostAndPort hostPort) {
  5.         this.ioDispatcher = messageDispatcher;
  6.         this.messageFactory = messageFactory;
  7.         this.messageCodec = messageCodec;
  8.         this.targetAddress = hostPort;
  9.     }
  10.     @Override
  11.     public IdSession openSession() throws IOException {
  12.         try {
  13.             final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
  14.             Bootstrap bootstrap = new Bootstrap();
  15.             bootstrap.channel(NioDatagramChannel.class);
  16.             bootstrap.group(nioEventLoopGroup);
  17.             bootstrap.handler(new LoggingHandler(LogLevel.INFO));
  18.             bootstrap.handler(new UdpProtoBufClientChannelInitializer());
  19.             ChannelFuture f = bootstrap.connect(new InetSocketAddress(targetAddress.getHost(), targetAddress.getPort()),
  20.                     new InetSocketAddress(nativeHostPort.getHost(), nativeHostPort.getPort())).sync();
  21.             IdSession session = new NSession(f.channel());
  22.             this.session = session;
  23.             return session;
  24.         } catch (Exception e) {
  25.             group.shutdownGracefully();
  26.             throw new IOException(e);
  27.         }
  28.     }
  29.     @Override
  30.     public void close() throws IOException {
  31.         this.session.close();
  32.     }
  33.     public void send(UdpMessage message) {
  34.         message.setSenderIp(nativeHostPort.getHost());
  35.         message.setSenderPort(nativeHostPort.getPort());
  36.         message.setReceiverIp(targetAddress.getHost());
  37.         message.setReceiverPort(targetAddress.getPort());
  38.         session.send(message);
  39.     }
  40.     class UdpProtoBufClientChannelInitializer extends ChannelInitializer<NioDatagramChannel> {
  41.         @Override
  42.         protected void initChannel(NioDatagramChannel ch) throws Exception {
  43.             ChannelPipeline pipeline = ch.pipeline();
  44.             pipeline.addLast("protocolDecoder", new UdpProtocolDecoder(messageFactory, messageCodec));
  45.             pipeline.addLast("protocolEncoder", new UdpProtocolEncoder(messageFactory, messageCodec));
  46.             pipeline.addLast(new UdpChannelIoHandler(ioDispatcher));
  47.         }
  48.     }
  49. }
复制代码
这里必要注意下:
客户端必要指定端口与服务器通讯,这样才气方便消息包携带本身的地址信息。
5.2、测试代码

客户端测试代码如下:
模拟10个玩家登录。读者可自行修改程序。udp是不可靠链接,不包管交付。如果在外网跑,是会出现消息丢包,或者乱序。在当地跑,是不太可能出现这种环境。读者可自行测试,同一个角色发送一大堆消息(附加上次序字段),看服务器收到的消息序号是否完备有序。
  1. private static AtomicLong idFactory = new AtomicLong(1000);
  2.     public static void main(String[] args) throws Exception {
  3.         MessageCodec messageCodec = new StructMessageCodec();
  4.         GameMessageFactory.getInstance().registeredClassTypes().forEach(Codec::getSerializer);
  5.         for (int i = 0; i < 10; i++) {
  6.             System.out.println("----------i=" + i);
  7.             UdpSocketClient socketClient = new UdpSocketClient(new SocketIoDispatcherAdapter() {
  8.                 @Override
  9.                 public void dispatch(IdSession session, Object message) {
  10.                     System.out.println("receive package ---------" + JsonUtil.object2String(message));
  11.                 }
  12.             }, GameMessageFactory.getInstance(), messageCodec, HostAndPort.valueOf(8088));
  13.             socketClient.nativeHostPort = HostAndPort.valueOf(8099 + i);
  14.             socketClient.openSession();
  15.             for (int j = 0; j < 1; j++) {
  16.                 ReqLogin req = new ReqLogin();
  17.                 req.setPlayerId(idFactory.getAndIncrement());
  18.                 socketClient.send(req);
  19.             }
  20.         }
  21.     }
复制代码
6、游戏服务器示例功能

6.1、demo逻辑

我们以上面的代码,实现一个简朴的游戏逻辑。

  • 玩家根据服务端的地址进行udp通讯。链接建立之后,模拟账号登录逻辑。
  • 服务器吸取到登录消息之后,立马推送登录乐成的协议。
  • 服务器每隔一段时间,向全部注册的客户端推送一个消息包。
6.2、登录哀求/响应包

  1. @MessageMeta(cmd = 55555)
  2. public class ReqLogin extends UdpMessage {
  3.     private long playerId;
  4. }
复制代码
  1. @MessageMeta(cmd = 55556)
  2. public class ResPlayerLogin extends UdpMessage {
  3.     private long playerId;
  4. }
复制代码
6.3、登录路由

  1. @MessageRoute
  2. public class LoginRouter {
  3.     @RequestHandler
  4.     public void reqTime(IdSession session, ReqLogin req) {
  5.         long playerId = req.getPlayerId();
  6.         System.out.println("player login" + playerId);
  7.         Player player = new Player();
  8.         player.setId(playerId);
  9.         player.setRemoteAddr(HostAndPort.valueOf(req.getSenderIp(), req.getSenderPort()));
  10.         SessionManager.getInstance().register(playerId, player);
  11.         ResPlayerLogin resp = new ResPlayerLogin();
  12.         resp.setPlayerId(playerId);
  13.         player.receive(session, resp);
  14.     }
  15. }
复制代码
此中SessionManager 类缓存服务器的全局session,以及各个客户端环境的通讯地址。以及,每隔一段时间自动向客户端推送消息。
  1. public class SessionManager {
  2.     private static SessionManager inst = new SessionManager();
  3.     private ConcurrentMap<Long, Player> id2Players = new ConcurrentHashMap<>();
  4.     private IdSession serverSession;
  5.     public static SessionManager getInstance() {
  6.         return inst;
  7.     }
  8.     public void register(long playerId, Player player) {
  9.         id2Players.put(playerId, player);
  10.     }
  11.     public void buildSession(IdSession session) {
  12.         serverSession = session;
  13.     }
  14.     public void schedule() {
  15.         SchedulerManager.getInstance().scheduleAtFixedRate(()->{
  16.             id2Players.forEach((key, value) -> {
  17.                 ResWelcome push = new ResWelcome();
  18.                 push.setTime(System.currentTimeMillis());
  19.                 value.receive(serverSession, push);
  20.             });
  21.         }, TimeUtil.MILLIS_PER_MINUTE, 10*TimeUtil.MILLIS_PER_SECOND);
  22.     }
  23. }
复制代码
必要注意的是,客户端只有在登录乐成之后,服务器才气绑定玩家与对应的客户端地址,才气自动推送消息。也就是说,在登录之前,服务器也是无法自动推送消息的,在业务上来说,也是没有意义的。
6.4、客户端测试代码输出


7、总结

udp是一种无连接的协议,t提供不可靠性传输。UDP通过尽力交付数据包的方式进行传输,不对数据包的传输状态进行确认和重传,因此速率较快。UDP适用于对实时性要求较高、对数据正确性要求较低的应用场景。比方,如果语音视频服务。如果游戏服务器确实必要使用udp协议的话,必要在应用层办理丢包乱序题目。
对于乱序,担当方可以先把数据都缓存起来,等一段窗口期的数据担当完毕后再分发给业务层。对于丢包,无论是发送方还是担当方,都必要缓存最近发送的消息。以便用于丢包重传。当然,这只是基本的思路,现实处理起来可能会非常复杂。如果考虑到UDP协议的话,可能KCP会是更好的选择。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

科技颠覆者

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表