qidao123.com技术社区-IT企服评测·应用市场

标题: 基于UPD的快速局域网聊天室 [打印本页]

作者: 铁佛    时间: 2025-4-9 08:38
标题: 基于UPD的快速局域网聊天室
UPD与TCP对比:

UDP是无连接的协议,也不保证可靠交付,只在IP数据报服务之上增长了很少的功能,主要是复用和分用以及差错检测的功能。这实用于要求源主机以恒定速率发送数据,允许网络拥塞时丢失数据,却不允许数据有太大时延的及时应用。TCP则是面向连接的传输层协议,提供可靠的交付服务。TCP把连接作为最基本的抽象,连接的端点叫做套接字(socket)。每一条TCP连接唯一地被通讯两头的两个端点(即套接字对 socket pair)所确定,即每一条TCP连接只能是点对点的 [2]。应用程序在利用TCP协议之前必须先建立TCP连接,在传送数据完毕后必须开释已经建立的TCP连接。TCP提供全双工通讯,允许通讯双方的应用进程在任何时候都能发送数据,TCP连接的两头都设有发送缓存和接收缓存,用到临时存放双向通讯的数据。与面向报文的UDP不同,TCP是面向字节流的。这里的“流”(stream)指的是流入到进程或者从进程流出的字节序列。虽然应用程序和TCP的交互是一次一个大小不等的数据块,但TCP把应用程序交下来的数据仅仅看成是一连串的无结构的字节流,并不知道所传送的字节流的含义。TCP实用于需要有序、可靠地传输数据流的应用程序。UPD的利用限定:

在 IPv4 中,IP 首部包含一个 16 位的总长度字段,因此 IPv4 数据包的最大长度为65536 字节,不过这其中包含了 IP 首部(通常为 20 字节)和 UDP 首部(8 字节),所以实际可用于 UDP 数据的最大长度为 65507 字节
然而,在实际网络环境中,由于不同网络设备(如路由器)可能有不同的最大传输单元(MTU,Maximum Transmission Unit),数据包大小可能会受到进一步限定。以太网的 MTU 通常为 1500 字节,假如 UDP 数据包大小凌驾 MTU,就需要举行分片处理。分片会增长网络传输的复杂性和堕落的可能性,因此在实际应用中,为了避免分片,建议将 UDP 数据包大小控制在 MTU 以下。在 UTF - 8 编码下,1500 字节大约能存储 500 个常见的中文。
总结:

1、500字基本满足文字类聊天所需;
代码部分:
  1. package tangzeqi.com.service;
  2. import com.alibaba.fastjson.JSON;
  3. import org.apache.commons.lang.time.DateUtils;
  4. import org.apache.commons.lang3.ObjectUtils;
  5. import tangzeqi.com.project.MyProject;
  6. import tangzeqi.com.stroge.BaseMessage;
  7. import tangzeqi.com.stroge.BaseUser;
  8. import tangzeqi.com.stroge.TextMessage;
  9. import tangzeqi.com.stroge.UPDMessage;
  10. import tangzeqi.com.utils.Md5Utils;
  11. import tangzeqi.com.utils.MessageUtils;
  12. import java.io.IOException;
  13. import java.net.*;
  14. import java.nio.charset.StandardCharsets;
  15. import java.util.Date;
  16. import java.util.concurrent.ConcurrentHashMap;
  17. import java.util.concurrent.atomic.AtomicLong;
  18. /**
  19. * 局域网广播通讯,UPD
  20. */
  21. public class UPDService {
  22.     private volatile boolean doing = false;
  23.     private volatile AtomicLong msgIndesx = new AtomicLong(0);
  24.     private final String project;
  25.     /**
  26.      * 广播模式
  27.      */
  28.     private volatile DatagramSocket socket;
  29.     private final DatagramPacket packet = new DatagramPacket("".getBytes(), "".getBytes().length, InetAddress.getByName("255.255.255.255"), 0);
  30.     private volatile ConcurrentHashMap<String, UPDInetSocketAddress> addresses = new ConcurrentHashMap<>();
  31.     public UPDService(String project) throws SocketException, UnknownHostException {
  32.         this.project = project;
  33.     }
  34.     public void start() {
  35.         try {
  36.             socket = new DatagramSocket();
  37.             socket.setBroadcast(true);
  38.             scan();
  39.         } catch (SocketException e) {
  40.             throw new RuntimeException(e);
  41.         }
  42.     }
  43.     public void scan() {
  44.         if (doing) {
  45.         } else {
  46.             doing = true;
  47.             //感知
  48.             MyProject.cache(project).executor.execute(() -> {
  49.                 MyProject.cache(project).executor.execute(new Scaner(0, 5000));
  50.                 MyProject.cache(project).executor.execute(new Scaner(5001, 10000));
  51.                 MyProject.cache(project).executor.execute(new Scaner(10001, 15000));
  52.                 MyProject.cache(project).executor.execute(new Scaner(15001, 20000));
  53.                 MyProject.cache(project).executor.execute(new Scaner(20001, 25000));
  54.                 MyProject.cache(project).executor.execute(new Scaner(25001, 30000));
  55.                 MyProject.cache(project).executor.execute(new Scaner(30001, 35000));
  56.                 MyProject.cache(project).executor.execute(new Scaner(35001, 40000));
  57.                 MyProject.cache(project).executor.execute(new Scaner(40001, 45000));
  58.                 MyProject.cache(project).executor.execute(new Scaner(45001, 50000));
  59.                 MyProject.cache(project).executor.execute(new Scaner(50001, 55000));
  60.                 MyProject.cache(project).executor.execute(new Scaner(55001, 60000));
  61.                 MyProject.cache(project).executor.execute(new Scaner(60001, 65535));
  62.             });
  63.             //感知、信息、保活
  64.             MyProject.cache(project).executor.execute(() -> {
  65.                 byte[] receiveBuffer = new byte[1500];
  66.                 final DatagramPacket receivePacket = new DatagramPacket(receiveBuffer, receiveBuffer.length);
  67.                 while (!socket.isClosed()) {
  68.                     try {
  69.                         socket.receive(receivePacket);
  70.                         String receivedData = new String(receivePacket.getData(), 0, receivePacket.getLength(), StandardCharsets.UTF_8);
  71.                         BaseMessage message = MessageUtils.resolve(receivedData);
  72.                         if (message.getMessage() instanceof UPDMessage) {
  73.                             if (check(((UPDMessage) message.getMessage()).getToken())) {
  74.                                 if (!addresses.containsKey(receivePacket.getAddress() + ":" + receivePacket.getPort())) {
  75.                                     MyProject.cache(project).sysMessage("发现::" + receivePacket.getAddress() + ":" + receivePacket.getPort());
  76.                                 }
  77.                                 addresses.put(receivePacket.getAddress() + ":" + receivePacket.getPort(), new UPDInetSocketAddress(receivePacket.getAddress(), receivePacket.getPort()));
  78.                             }
  79.                         } else if (message.getMessage() instanceof TextMessage) {
  80.                             if (msgIndesx.get() < message.getId() && msgIndesx.compareAndSet(msgIndesx.get(), message.getId())) {
  81.                                 MyProject.cache(project).chatMessage(((TextMessage) message.getMessage()).getMessage(), message.getMessage().getName());
  82.                             }
  83.                         }
  84.                     } catch (IOException e) {
  85.                     } catch (Exception e) {
  86.                     }
  87.                 }
  88.             });
  89.         }
  90.     }
  91.     public <T extends BaseUser> void send(T o) {
  92.         byte[] message = new byte[0];
  93.         message = JSON.toJSONString(BaseMessage.builder().id(System.nanoTime()).message(o).build()).getBytes(StandardCharsets.UTF_8);
  94.         packet.setData(message);
  95.         packet.setLength(message.length);
  96.         long time = new Date().getTime();
  97.         for (UPDInetSocketAddress address : addresses.values()) {
  98.             if (address.outTime >= time) {
  99.                 packet.setSocketAddress(address);
  100.                 try {
  101.                     socket.send(packet);
  102.                 } catch (IOException e) {
  103.                 }
  104.             }
  105.         }
  106.     }
  107.     private boolean check(String token) {
  108.         if (ObjectUtils.isEmpty(token)) return false;
  109.         return Md5Utils.getMD5(new Date().getTime() / 1000000 + "", StandardCharsets.UTF_8.name()).equalsIgnoreCase(token);
  110.     }
  111.     public void shutDowm() {
  112.         socket.close();
  113.         doing = false;
  114.         addresses.clear();
  115.     }
  116.     private class UPDInetSocketAddress extends InetSocketAddress {
  117.         public final long outTime = DateUtils.addMinutes(new Date(), 1).getTime();
  118.         public UPDInetSocketAddress(int port) {
  119.             super(port);
  120.         }
  121.         public UPDInetSocketAddress(InetAddress addr, int port) {
  122.             super(addr, port);
  123.         }
  124.         public UPDInetSocketAddress(String hostname, int port) {
  125.             super(hostname, port);
  126.         }
  127.     }
  128.     private class Scaner implements Runnable {
  129.         private final DatagramPacket scanPacket;
  130.         {
  131.             try {
  132.                 scanPacket = new DatagramPacket("".getBytes(), "".getBytes().length, InetAddress.getByName("255.255.255.255"), 0);
  133.             } catch (UnknownHostException e) {
  134.                 throw new RuntimeException(e);
  135.             }
  136.         }
  137.         int start;
  138.         int end;
  139.         public Scaner(int start, int end) {
  140.             this.start = start;
  141.             this.end = end;
  142.         }
  143.         @Override
  144.         public void run() {
  145.             while (!socket.isClosed()) {
  146.                 for (int portD = (start + end) / 2, portU = ((start + end) / 2) + 1; portU <= end && portD >= start && !socket.isClosed(); portD--, portU++) {
  147.                     try {
  148.                         Thread.sleep(1);
  149.                     } catch (InterruptedException e) {
  150.                     }
  151.                     try {
  152.                         BaseMessage<BaseUser> build = BaseMessage.builder().message(UPDMessage.builder().token(Md5Utils.getMD5(new Date().getTime() / 1000000 + "", StandardCharsets.UTF_8.name())).build()).build();
  153.                         String string = JSON.toJSONString(build);
  154.                         byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
  155.                         scanPacket.setData(bytes);
  156.                         scanPacket.setLength(bytes.length);
  157.                         scanPacket.setPort(portU);
  158.                         socket.send(scanPacket);
  159. //                        MyProject.cache(project).config.updconnectStatus(true,"开始感知端口:"+portU);
  160. //                        MyProject.cache(project).sysMessage("开始感知端口:"+portU);
  161.                         scanPacket.setPort(portD);
  162.                         socket.send(scanPacket);
  163. //                        MyProject.cache(project).config.updconnectStatus(true,"开始感知端口:"+portD);
  164. //                        MyProject.cache(project).sysMessage("开始感知端口:"+portD);
  165.                     } catch (IOException e) {
  166.                     }
  167.                 }
  168.             }
  169.         }
  170.     }
  171. }
复制代码
代码介绍:

start(): 构建一个基础的UPD类,并设置为广播模式。
InetAddress.getByName("255.255.255.255"):在 IPv4 网络中,255.255.255.255 是一个特殊的 IP 地址,被称作有限广播地址(Limited Broadcast Address)。当数据包被发送到这个地址时,它会在本地网络(也就是当前子网)内举行广播,意味着本地网络中的全部设备都会接收到这个数据包。
scan():开启线程任务,循环向局域网内广播一段有特殊含义的字符串。
Md5Utils.getMD5(new Date().getTime() / 1000000):这是一个简单的小盘算,主要到场check()方法中的校验,当其他UPD服务接收到后也会接纳同样的算法并与之匹配,假如匹配成功则加入信息组队列。后期可以扩展成任何算法,也可以利用这个模式给房间分组。
UPDMessage:广播信息体
TextMessage:平凡聊天信息体
send(): 当需要发送平凡聊天信息时,构建TextMessage,并给感知到的有效信息构成员发送信息。
UPDInetSocketAddress:简单封装过的InetSocketAddress,在原基础上加入了失效机制,目前为1分钟后失效,当感知到成员时会更新失效时间,当发送聊天信息时会验证有效性。理论上目前全部端口的感知频率在一分钟以内,所以省略了保活线程,后期可以加上。其实也可以接纳本地超时缓存,不过考虑开销问题,这次代码中就倒霉用了。
 效果图(来自本人上架idea插件商城的Wchat):

 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 qidao123.com技术社区-IT企服评测·应用市场 (https://dis.qidao123.com/) Powered by Discuz! X3.4