UPD与TCP对比:
UDP是无连接的协议,也不保证可靠交付,只在IP数据报服务之上增长了很少的功能,主要是复用和分用以及差错检测的功能。这实用于要求源主机以恒定速率发送数据,允许网络拥塞时丢失数据,却不允许数据有太大时延的及时应用。TCP则是面向连接的传输层协议,提供可靠的交付服务。TCP把连接作为最基本的抽象,连接的端点叫做套接字(socket)。每一条TCP连接唯一地被通讯两头的两个端点(即套接字对 socket pair)所确定,即每一条TCP连接只能是点对点的 [2]。应用程序在利用TCP协议之前必须先建立TCP连接,在传送数据完毕后必须开释已经建立的TCP连接。TCP提供全双工通讯,允许通讯双方的应用进程在任何时候都能发送数据,TCP连接的两头都设有发送缓存和接收缓存,用到临时存放双向通讯的数据。与面向报文的UDP不同,TCP是面向字节流的。这里的“流”(stream)指的是流入到进程或者从进程流出的字节序列。虽然应用程序和TCP的交互是一次一个大小不等的数据块,但TCP把应用程序交下来的数据仅仅看成是一连串的无结构的字节流,并不知道所传送的字节流的含义。TCP实用于需要有序、可靠地传输数据流的应用程序。UPD的利用限定:
在 IPv4 中,IP 首部包含一个 16 位的总长度字段,因此 IPv4 数据包的最大长度为65536 字节,不过这其中包含了 IP 首部(通常为 20 字节)和 UDP 首部(8 字节),所以实际可用于 UDP 数据的最大长度为 65507 字节。
然而,在实际网络环境中,由于不同网络设备(如路由器)可能有不同的最大传输单元(MTU,Maximum Transmission Unit),数据包大小可能会受到进一步限定。以太网的 MTU 通常为 1500 字节,假如 UDP 数据包大小凌驾 MTU,就需要举行分片处理。分片会增长网络传输的复杂性和堕落的可能性,因此在实际应用中,为了避免分片,建议将 UDP 数据包大小控制在 MTU 以下。在 UTF - 8 编码下,1500 字节大约能存储 500 个常见的中文。
总结:
1、500字基本满足文字类聊天所需;
代码部分:
- package tangzeqi.com.service;
- import com.alibaba.fastjson.JSON;
- import org.apache.commons.lang.time.DateUtils;
- import org.apache.commons.lang3.ObjectUtils;
- import tangzeqi.com.project.MyProject;
- import tangzeqi.com.stroge.BaseMessage;
- import tangzeqi.com.stroge.BaseUser;
- import tangzeqi.com.stroge.TextMessage;
- import tangzeqi.com.stroge.UPDMessage;
- import tangzeqi.com.utils.Md5Utils;
- import tangzeqi.com.utils.MessageUtils;
- import java.io.IOException;
- import java.net.*;
- import java.nio.charset.StandardCharsets;
- import java.util.Date;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.atomic.AtomicLong;
- /**
- * 局域网广播通讯,UPD
- */
- public class UPDService {
- private volatile boolean doing = false;
- private volatile AtomicLong msgIndesx = new AtomicLong(0);
- private final String project;
- /**
- * 广播模式
- */
- private volatile DatagramSocket socket;
- private final DatagramPacket packet = new DatagramPacket("".getBytes(), "".getBytes().length, InetAddress.getByName("255.255.255.255"), 0);
- private volatile ConcurrentHashMap<String, UPDInetSocketAddress> addresses = new ConcurrentHashMap<>();
- public UPDService(String project) throws SocketException, UnknownHostException {
- this.project = project;
- }
- public void start() {
- try {
- socket = new DatagramSocket();
- socket.setBroadcast(true);
- scan();
- } catch (SocketException e) {
- throw new RuntimeException(e);
- }
- }
- public void scan() {
- if (doing) {
- } else {
- doing = true;
- //感知
- MyProject.cache(project).executor.execute(() -> {
- MyProject.cache(project).executor.execute(new Scaner(0, 5000));
- MyProject.cache(project).executor.execute(new Scaner(5001, 10000));
- MyProject.cache(project).executor.execute(new Scaner(10001, 15000));
- MyProject.cache(project).executor.execute(new Scaner(15001, 20000));
- MyProject.cache(project).executor.execute(new Scaner(20001, 25000));
- MyProject.cache(project).executor.execute(new Scaner(25001, 30000));
- MyProject.cache(project).executor.execute(new Scaner(30001, 35000));
- MyProject.cache(project).executor.execute(new Scaner(35001, 40000));
- MyProject.cache(project).executor.execute(new Scaner(40001, 45000));
- MyProject.cache(project).executor.execute(new Scaner(45001, 50000));
- MyProject.cache(project).executor.execute(new Scaner(50001, 55000));
- MyProject.cache(project).executor.execute(new Scaner(55001, 60000));
- MyProject.cache(project).executor.execute(new Scaner(60001, 65535));
- });
- //感知、信息、保活
- MyProject.cache(project).executor.execute(() -> {
- byte[] receiveBuffer = new byte[1500];
- final DatagramPacket receivePacket = new DatagramPacket(receiveBuffer, receiveBuffer.length);
- while (!socket.isClosed()) {
- try {
- socket.receive(receivePacket);
- String receivedData = new String(receivePacket.getData(), 0, receivePacket.getLength(), StandardCharsets.UTF_8);
- BaseMessage message = MessageUtils.resolve(receivedData);
- if (message.getMessage() instanceof UPDMessage) {
- if (check(((UPDMessage) message.getMessage()).getToken())) {
- if (!addresses.containsKey(receivePacket.getAddress() + ":" + receivePacket.getPort())) {
- MyProject.cache(project).sysMessage("发现::" + receivePacket.getAddress() + ":" + receivePacket.getPort());
- }
- addresses.put(receivePacket.getAddress() + ":" + receivePacket.getPort(), new UPDInetSocketAddress(receivePacket.getAddress(), receivePacket.getPort()));
- }
- } else if (message.getMessage() instanceof TextMessage) {
- if (msgIndesx.get() < message.getId() && msgIndesx.compareAndSet(msgIndesx.get(), message.getId())) {
- MyProject.cache(project).chatMessage(((TextMessage) message.getMessage()).getMessage(), message.getMessage().getName());
- }
- }
- } catch (IOException e) {
- } catch (Exception e) {
- }
- }
- });
- }
- }
- public <T extends BaseUser> void send(T o) {
- byte[] message = new byte[0];
- message = JSON.toJSONString(BaseMessage.builder().id(System.nanoTime()).message(o).build()).getBytes(StandardCharsets.UTF_8);
- packet.setData(message);
- packet.setLength(message.length);
- long time = new Date().getTime();
- for (UPDInetSocketAddress address : addresses.values()) {
- if (address.outTime >= time) {
- packet.setSocketAddress(address);
- try {
- socket.send(packet);
- } catch (IOException e) {
- }
- }
- }
- }
- private boolean check(String token) {
- if (ObjectUtils.isEmpty(token)) return false;
- return Md5Utils.getMD5(new Date().getTime() / 1000000 + "", StandardCharsets.UTF_8.name()).equalsIgnoreCase(token);
- }
- public void shutDowm() {
- socket.close();
- doing = false;
- addresses.clear();
- }
- private class UPDInetSocketAddress extends InetSocketAddress {
- public final long outTime = DateUtils.addMinutes(new Date(), 1).getTime();
- public UPDInetSocketAddress(int port) {
- super(port);
- }
- public UPDInetSocketAddress(InetAddress addr, int port) {
- super(addr, port);
- }
- public UPDInetSocketAddress(String hostname, int port) {
- super(hostname, port);
- }
- }
- private class Scaner implements Runnable {
- private final DatagramPacket scanPacket;
- {
- try {
- scanPacket = new DatagramPacket("".getBytes(), "".getBytes().length, InetAddress.getByName("255.255.255.255"), 0);
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- }
- int start;
- int end;
- public Scaner(int start, int end) {
- this.start = start;
- this.end = end;
- }
- @Override
- public void run() {
- while (!socket.isClosed()) {
- for (int portD = (start + end) / 2, portU = ((start + end) / 2) + 1; portU <= end && portD >= start && !socket.isClosed(); portD--, portU++) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- }
- try {
- BaseMessage<BaseUser> build = BaseMessage.builder().message(UPDMessage.builder().token(Md5Utils.getMD5(new Date().getTime() / 1000000 + "", StandardCharsets.UTF_8.name())).build()).build();
- String string = JSON.toJSONString(build);
- byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
- scanPacket.setData(bytes);
- scanPacket.setLength(bytes.length);
- scanPacket.setPort(portU);
- socket.send(scanPacket);
- // MyProject.cache(project).config.updconnectStatus(true,"开始感知端口:"+portU);
- // MyProject.cache(project).sysMessage("开始感知端口:"+portU);
- scanPacket.setPort(portD);
- socket.send(scanPacket);
- // MyProject.cache(project).config.updconnectStatus(true,"开始感知端口:"+portD);
- // MyProject.cache(project).sysMessage("开始感知端口:"+portD);
- } catch (IOException e) {
- }
- }
- }
- }
- }
- }
复制代码 代码介绍:
start(): 构建一个基础的UPD类,并设置为广播模式。
InetAddress.getByName("255.255.255.255"):在 IPv4 网络中,255.255.255.255 是一个特殊的 IP 地址,被称作有限广播地址(Limited Broadcast Address)。当数据包被发送到这个地址时,它会在本地网络(也就是当前子网)内举行广播,意味着本地网络中的全部设备都会接收到这个数据包。
scan():开启线程任务,循环向局域网内广播一段有特殊含义的字符串。
Md5Utils.getMD5(new Date().getTime() / 1000000):这是一个简单的小盘算,主要到场check()方法中的校验,当其他UPD服务接收到后也会接纳同样的算法并与之匹配,假如匹配成功则加入信息组队列。后期可以扩展成任何算法,也可以利用这个模式给房间分组。
UPDMessage:广播信息体
TextMessage:平凡聊天信息体
send(): 当需要发送平凡聊天信息时,构建TextMessage,并给感知到的有效信息构成员发送信息。
UPDInetSocketAddress:简单封装过的InetSocketAddress,在原基础上加入了失效机制,目前为1分钟后失效,当感知到成员时会更新失效时间,当发送聊天信息时会验证有效性。理论上目前全部端口的感知频率在一分钟以内,所以省略了保活线程,后期可以加上。其实也可以接纳本地超时缓存,不过考虑开销问题,这次代码中就倒霉用了。
效果图(来自本人上架idea插件商城的Wchat):
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |