Spring Boot整合T-IO实现即时通讯

王柳  论坛元老 | 2025-4-18 11:27:58 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1890|帖子 1890|积分 5670

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
在当今的互联网应用中,即时通讯功能已经变得不可或缺。无论是社交应用、在线客服还是实时数据推送,都必要高效的通讯框架来支持。TIO(Try-IO)是一个高性能的Java网络通讯框架,支持TCP/UDP/HTTP/WebSocket等多种协议,非常适合用于即时通讯场景。本文将介绍如何在Spring Boot项目中整合TIO,实现一个简朴的即时通讯功能。
1. TIO简介

TIO是一个基于Java的高性能网络通讯框架,具有以下特点:
支持多种协议:TCP、UDP、HTTP、WebSocket等。
高性能:基于NIO实现,支持高并发。
易用性:提供了丰富的API和机动的配置
2. 情况准备

在开始之前,请确保你已经安装了以下工具:
JDK 1.8及以上版本(本项目利用jdk 17版本)
Maven或Gradle
IntelliJ IDEA或Eclipse
4.添加依赖

在pom.xml文件中添加TIO的相关依赖
  1.         <!-- t-io WebSocket依赖 -->
  2.         <dependency>
  3.             <groupId>org.t-io</groupId>
  4.             <artifactId>tio-websocket-server</artifactId>
  5.             <version>3.8.6.v20240801-RELEASE</version>
  6.         </dependency>
  7.         <!-- t-io 核心依赖 -->
  8.         <dependency>
  9.             <groupId>org.t-io</groupId>
  10.             <artifactId>tio-core</artifactId>
  11.             <version>3.8.6.v20240801-RELEASE</version>
  12.         </dependency>
复制代码
5、创建TIO消息处置惩罚器

创建一个实现IWsMsgHandler接口的类,用于处置惩罚WebSocket消息:
  1. import com.alibaba.fastjson.JSON;
  2. import com.ruoyi.im.config.TioWebSocketAdapterConfig;
  3. import com.ruoyi.im.domain.ImMessage;
  4. import com.ruoyi.im.model.ChatMessage;
  5. import com.ruoyi.im.service.IImMessageService;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. import org.tio.core.ChannelContext;
  11. import org.tio.http.common.HttpRequest;
  12. import org.tio.http.common.HttpResponse;
  13. import org.tio.websocket.common.WsRequest;
  14. import org.tio.websocket.common.WsResponse;
  15. import org.tio.websocket.server.handler.IWsMsgHandler;
  16. import java.util.Date;
  17. import java.util.UUID;
  18. @Component
  19. public class WebSocketMessageHandler implements IWsMsgHandler {
  20.    
  21.     private static final Logger log = LoggerFactory.getLogger(WebSocketMessageHandler.class);
  22.    
  23.     /**
  24.      * 从ChannelContext中获取参数
  25.      */
  26.     private String getParam(ChannelContext channelContext, String paramName) {
  27.         try {
  28.             // 尝试从channelContext的httpConfig或其他属性中获取参数
  29.             Object requestObj = channelContext.getAttribute("httpRequest");
  30.             if (requestObj != null && requestObj instanceof HttpRequest) {
  31.                 HttpRequest httpRequest = (HttpRequest) requestObj;
  32.                 return httpRequest.getParam(paramName);
  33.             }
  34.             
  35.             // 从user属性中获取用户ID
  36.             if ("userId".equals(paramName)) {
  37.                 return (String) channelContext.getAttribute("userId");
  38.             }
  39.             
  40.             return null;
  41.         } catch (Exception e) {
  42.             log.error("获取参数失败: paramName={}", paramName, e);
  43.             return null;
  44.         }
  45.     }
  46.    
  47.     @Override
  48.     public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {
  49.         // 从请求中获取用户ID、token和设备类型
  50.         String userId = httpRequest.getParam("userId");
  51.         String token = httpRequest.getParam("token");
  52.         String deviceTypeStr = httpRequest.getParam("deviceType");
  53.         
  54.         // 验证参数
  55.         if (userId == null || token == null || deviceTypeStr == null) {
  56.             log.error("握手参数不完整: userId={}, token={}, deviceType={}", userId, token, deviceTypeStr);
  57.             return httpResponse;
  58.         }
  59.         
  60.         // 验证token(这里应该调用若依的token验证服务)
  61.         // TODO: 实现token验证逻辑
  62.         
  63.         // 解析设备类型
  64.         UserSessionManager.DeviceType deviceType;
  65.         try {
  66.             deviceType = UserSessionManager.DeviceType.valueOf(deviceTypeStr.toUpperCase());
  67.         } catch (IllegalArgumentException e) {
  68.             log.error("无效的设备类型: {}", deviceTypeStr);
  69.             return httpResponse;
  70.         }
  71.         
  72.         // 保存参数到ChannelContext
  73.         channelContext.setAttribute("httpRequest", httpRequest);
  74.         channelContext.setAttribute("userId", userId);
  75.         
  76.         // 添加用户会话
  77.         userSessionManager.addUserSession(userId, token, deviceType, channelContext);
  78.         
  79.         return httpResponse;
  80.     }
  81.    
  82.     @Override
  83.     public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {
  84.         String userId = httpRequest.getParam("userId");
  85.         log.info("握手成功,userId: {}, channelId: {}", userId, channelContext.getId());
  86.     }
  87.    
  88.     @Override
  89.     public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
  90.         return null;
  91.     }
  92.    
  93.     @Override
  94.     public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
  95.         // 从channelContext获取用户ID
  96.         String userId = getParam(channelContext, "userId");
  97.         // 移除用户会话
  98.         if (userId != null) {
  99.             userSessionManager.removeUserSession(userId);
  100.         }
  101.         log.info("连接关闭,userId: {}, channelId: {}", userId, channelContext.getId());
  102.         return null;
  103.     }
  104.    
  105.     @Override
  106.     public Object onText(WsRequest wsRequest, String msg, ChannelContext channelContext) {
  107.         // 从channelContext获取用户ID
  108.         String userId = getParam(channelContext, "userId");
  109.         log.info("收到消息: {}, userId: {}, channelId: {}", msg, userId, channelContext.getId());
  110.         
  111.         try {
  112.             // 解析消息
  113.             ChatMessage chatMessage = JSON.parseObject(msg, ChatMessage.class);
  114.             
  115.             // 设置消息ID和发送时间
  116.             if (chatMessage.getMessageId() == null) {
  117.                 chatMessage.setMessageId(UUID.randomUUID().toString());
  118.             }
  119.             if (chatMessage.getSendTime() == null) {
  120.                 chatMessage.setSendTime(new Date());
  121.             }
  122.             
  123.             // 设置发送者ID
  124.             chatMessage.setFromUserId(userId);
  125.             
  126.             // 更新用户最后活动时间
  127.             userSessionManager.updateUserLastActiveTime(userId);
  128.             
  129.             // 创建消息实体并持久化
  130.             ImMessage imMessage = new ImMessage();
  131.             imMessage.setMessageId(chatMessage.getMessageId());
  132.             imMessage.setFromUserId(chatMessage.getFromUserId());
  133.             imMessage.setToUserId(chatMessage.getToUserId());
  134.             imMessage.setGroupId(chatMessage.getGroupId());
  135.             imMessage.setContent(chatMessage.getContent());
  136.             imMessage.setMessageType(chatMessage.getMessageType());
  137.             imMessage.setStatus(0); // 0表示未读
  138.             imMessage.setSendTime(chatMessage.getSendTime());
  139.             imMessage.setCreateTime(chatMessage.getSendTime());
  140.             imMessage.setUpdateTime(chatMessage.getSendTime());
  141.             imMessage.setIsSent(1); // 1表示已发送
  142.             imMessage.setRetryCount(0); // 初始重试次数为0
  143.             
  144.             // 保存消息到数据库
  145.             imMessageService.insertImMessage(imMessage);
  146.             
  147.             // 处理消息
  148.             if (chatMessage.getToUserId() != null) {
  149.                 // 私聊消息
  150.                 userSessionManager.sendMessageToUser(chatMessage.getToUserId(), JSON.toJSONString(chatMessage));
  151.             } else if (chatMessage.getGroupId() != null) {
  152.                 // 群聊消息(需要实现群组管理)
  153.                 // TODO: 实现群聊消息处理
  154.             } else {
  155.                 // 广播消息
  156.                 userSessionManager.broadcastMessage(JSON.toJSONString(chatMessage));
  157.             }
  158.             
  159.             // 发送响应消息
  160.             WsResponse wsResponse = WsResponse.fromText("SendMessageSuccess:" + msg, "UTF-8");
  161.             TioWebSocketAdapterConfig.send(channelContext, wsResponse);
  162.             
  163.         } catch (Exception e) {
  164.             log.error("处理消息失败: {}", msg, e);
  165.             WsResponse wsResponse = WsResponse.fromText("处理消息失败: " + e.getMessage(), "UTF-8");
  166.             TioWebSocketAdapterConfig.send(channelContext, wsResponse);
  167.         }
  168.         
  169.         return null;
  170.     }
  171. }
复制代码
  1. import com.ruoyi.im.config.TioWebSocketAdapterConfig;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.stereotype.Component;
  5. import org.tio.core.ChannelContext;
  6. import org.tio.websocket.common.WsResponse;
  7. import java.util.Map;
  8. import java.util.Set;
  9. import java.util.concurrent.ConcurrentHashMap;
  10. @Component
  11. public class UserSessionManager {
  12.    
  13.     private static final Logger log = LoggerFactory.getLogger(UserSessionManager.class);
  14.    
  15.     // 用户ID -> 用户会话信息
  16.     private final Map<String, UserSession> userSessions = new ConcurrentHashMap<>();
  17.    
  18.     // 设备类型枚举
  19.     public enum DeviceType {
  20.         WEB,    // Web端
  21.         MINI,   // 小程序
  22.         APP     // APP
  23.     }
  24.    
  25.     // 用户会话信息类
  26.     public static class UserSession {
  27.         private String userId;
  28.         private String token;
  29.         private DeviceType deviceType;
  30.         private ChannelContext channelContext;
  31.         private long lastActiveTime;
  32.         
  33.         public UserSession(String userId, String token, DeviceType deviceType, ChannelContext channelContext) {
  34.             this.userId = userId;
  35.             this.token = token;
  36.             this.deviceType = deviceType;
  37.             this.channelContext = channelContext;
  38.             this.lastActiveTime = System.currentTimeMillis();
  39.         }
  40.         
  41.         // Getters and setters
  42.         public String getUserId() { return userId; }
  43.         public void setUserId(String userId) { this.userId = userId; }
  44.         public String getToken() { return token; }
  45.         public void setToken(String token) { this.token = token; }
  46.         public DeviceType getDeviceType() { return deviceType; }
  47.         public void setDeviceType(DeviceType deviceType) { this.deviceType = deviceType; }
  48.         public ChannelContext getChannelContext() { return channelContext; }
  49.         public void setChannelContext(ChannelContext channelContext) { this.channelContext = channelContext; }
  50.         public long getLastActiveTime() { return lastActiveTime; }
  51.         public void setLastActiveTime(long lastActiveTime) { this.lastActiveTime = lastActiveTime; }
  52.     }
  53.    
  54.     // 添加用户会话
  55.     public void addUserSession(String userId, String token, DeviceType deviceType, ChannelContext channelContext) {
  56.         UserSession session = new UserSession(userId, token, deviceType, channelContext);
  57.         userSessions.put(userId, session);
  58.         
  59.         // 绑定用户ID到ChannelContext,使用适配器
  60.         TioWebSocketAdapterConfig.bindUser(channelContext, userId);
  61.         
  62.         log.info("用户会话添加成功: userId={}, deviceType={}", userId, deviceType);
  63.     }
  64.    
  65.     // 移除用户会话
  66.     public void removeUserSession(String userId) {
  67.         UserSession session = userSessions.remove(userId);
  68.         if (session != null) {
  69.             // 解绑用户ID,使用适配器
  70.             TioWebSocketAdapterConfig.unbindUser(session.getChannelContext(), userId);
  71.             log.info("用户会话移除成功: userId={}", userId);
  72.         }
  73.     }
  74.    
  75.     // 获取用户会话
  76.     public UserSession getUserSession(String userId) {
  77.         return userSessions.get(userId);
  78.     }
  79.    
  80.     // 更新用户最后活动时间
  81.     public void updateUserLastActiveTime(String userId) {
  82.         UserSession session = userSessions.get(userId);
  83.         if (session != null) {
  84.             session.setLastActiveTime(System.currentTimeMillis());
  85.         }
  86.     }
  87.    
  88.     // 发送消息给指定用户
  89.     public void sendMessageToUser(String userId, String message) {
  90.         UserSession session = userSessions.get(userId);
  91.         if (session != null && session.getChannelContext() != null) {
  92.             WsResponse wsResponse = WsResponse.fromText(message, "UTF-8");
  93.             // 使用适配器发送消息
  94.             TioWebSocketAdapterConfig.send(session.getChannelContext(), wsResponse);
  95.         } else {
  96.             log.warn("用户不在线,无法发送消息: userId={}", userId);
  97.         }
  98.     }
  99.    
  100.     // 广播消息给所有用户
  101.     public void broadcastMessage(String message) {
  102.         if (userSessions.isEmpty()) {
  103.             return;
  104.         }
  105.         
  106.         WsResponse wsResponse = WsResponse.fromText(message, "UTF-8");
  107.         for (UserSession session : userSessions.values()) {
  108.             if (session.getChannelContext() != null) {
  109.                 // 使用适配器发送消息
  110.                 TioWebSocketAdapterConfig.send(session.getChannelContext(), wsResponse);
  111.             }
  112.         }
  113.     }
  114.    
  115.     // 获取在线用户数量
  116.     public int getOnlineUserCount() {
  117.         return userSessions.size();
  118.     }
  119.    
  120.     // 获取所有在线用户ID
  121.     public Set<String> getOnlineUserIds() {
  122.         return userSessions.keySet();
  123.     }
  124. }
复制代码
6、创建TIO配置类和 TIO WebSocket适配器配置

  1. import com.ruoyi.im.websocket.WebSocketMessageHandler;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.tio.websocket.server.WsServerConfig;
  6. import org.tio.websocket.server.WsServerStarter;
  7. import java.io.IOException;
  8. @Configuration
  9. public class TioWebSocketConfig {
  10.    
  11.     @Autowired
  12.     private WebSocketMessageHandler webSocketMessageHandler;
  13.    
  14.     @Bean
  15.     public WsServerStarter wsServerStarter() throws IOException {
  16.         // 配置t-io websocket服务器,指定端口
  17.         WsServerConfig wsServerConfig = new WsServerConfig(9321);
  18.         
  19.         // 创建WebSocket服务器
  20.         WsServerStarter wsServerStarter = new WsServerStarter(wsServerConfig, webSocketMessageHandler);
  21.         
  22.         // 这里不再获取和配置ServerTioConfig
  23.         
  24.         return wsServerStarter;
  25.     }
  26. }
复制代码
  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.tio.core.ChannelContext;
  5. import org.tio.core.Tio;
  6. import org.tio.core.intf.Packet;
  7. /**
  8. * t-io WebSocket适配器配置
  9. * 帮助处理不同版本的t-io兼容性问题
  10. */
  11. @Configuration
  12. public class TioWebSocketAdapterConfig {
  13.    
  14.     private static final Logger log = LoggerFactory.getLogger(TioWebSocketAdapterConfig.class);
  15.     /**
  16.      * 发送消息
  17.      *
  18.      * @param channelContext 通道上下文
  19.      * @param packet 数据包
  20.      */
  21.     public static void send(ChannelContext channelContext, Packet packet) {
  22.         if (channelContext == null || packet == null) {
  23.             log.warn("发送消息失败:通道上下文或数据包为空");
  24.             return;
  25.         }
  26.         try {
  27.             // 直接调用 Tio 的 send 方法
  28.             Tio.send(channelContext, packet);
  29.         } catch (Exception e) {
  30.             log.error("发送消息失败", e);
  31.         }
  32.     }
  33.     /**
  34.      * 绑定用户
  35.      *
  36.      * @param channelContext 通道上下文
  37.      * @param userId 用户ID
  38.      */
  39.     public static void bindUser(ChannelContext channelContext, String userId) {
  40.         if (channelContext == null || userId == null) {
  41.             log.warn("绑定用户失败:通道上下文或用户ID为空");
  42.             return;
  43.         }
  44.         try {
  45.             // 直接调用 Tio 的 bindUser 方法
  46.             Tio.bindUser(channelContext, userId);
  47.         } catch (Exception e) {
  48.             log.error("绑定用户失败", e);
  49.         }
  50.     }
  51.     /**
  52.      * 解绑用户
  53.      *
  54.      * @param channelContext 通道上下文
  55.      * @param userId 用户ID
  56.      */
  57.     public static void unbindUser(ChannelContext channelContext, String userId) {
  58.         if (channelContext == null) {
  59.             log.warn("解绑用户失败:通道上下文为空");
  60.             return;
  61.         }
  62.         try {
  63.             // 直接调用 Tio 的 unbindUser 方法
  64.             Tio.unbindUser(channelContext);
  65.         } catch (Exception e) {
  66.             log.error("解绑用户失败", e);
  67.         }
  68.     }
  69. }
复制代码
7、配置TIO启动类

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import org.tio.websocket.server.WsServerStarter;
  6. import javax.annotation.PostConstruct;
  7. import javax.annotation.PreDestroy;
  8. import java.io.IOException;
  9. import java.lang.reflect.Method;
  10. @Component
  11. public class WebSocketServer {
  12.    
  13.     private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
  14.    
  15.     @PostConstruct
  16.     public void start() {
  17.         try {
  18.             wsServerStarter.start();
  19.             log.info("WebSocket服务器启动成功,监听端口:9321");
  20.         } catch (IOException e) {
  21.             log.error("WebSocket服务器启动失败", e);
  22.         }
  23.     }
  24.    
复制代码
启动springboot主类,即可启动tio服务
  1.   public static void main(String[] args) {
  2.         SpringApplication.run(RuoYiApplication.class, args);
  3.     }
复制代码
8、测试即时通讯功能

可以利用websocket工具进行测试
  1. ##链接地址:
  2. ws://127.0.0.1:9321?userId=1&token=1&deviceType=MINI
  3. ##链接参数说明:
  4. userId:当前用户id
  5. token:token值
  6. deviceType:设备类型  WEB Web端、MINI 小程序 、APP
  7. ##发送消息示例:
  8. {
  9.     "fromUserId": "1",发送者用户ID
  10.     "toUserId": "2",//接收者用户ID
  11.     "content": "测试消息内容",//消息内容
  12.     "messageType": "1"//消息类型1=文本,2=图片,3=语音,4=视频
  13. }
  14. ##发送后响应示例:
  15. SendMessageSuccess:{ "fromUserId": "1", "toUserId": "2", "content": "测试消息内容", "messageType": "1" }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王柳

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表