马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
在当今的互联网应用中,即时通讯功能已经变得不可或缺。无论是社交应用、在线客服还是实时数据推送,都必要高效的通讯框架来支持。TIO(Try-IO)是一个高性能的Java网络通讯框架,支持TCP/UDP/HTTP/WebSocket等多种协议,非常适合用于即时通讯场景。本文将介绍如何在Spring Boot项目中整合TIO,实现一个简朴的即时通讯功能。
1. TIO简介
TIO是一个基于Java的高性能网络通讯框架,具有以下特点:
支持多种协议:TCP、UDP、HTTP、WebSocket等。
高性能:基于NIO实现,支持高并发。
易用性:提供了丰富的API和机动的配置
2. 情况准备
在开始之前,请确保你已经安装了以下工具:
JDK 1.8及以上版本(本项目利用jdk 17版本)
Maven或Gradle
IntelliJ IDEA或Eclipse
4.添加依赖
在pom.xml文件中添加TIO的相关依赖
- <!-- t-io WebSocket依赖 -->
- <dependency>
- <groupId>org.t-io</groupId>
- <artifactId>tio-websocket-server</artifactId>
- <version>3.8.6.v20240801-RELEASE</version>
- </dependency>
- <!-- t-io 核心依赖 -->
- <dependency>
- <groupId>org.t-io</groupId>
- <artifactId>tio-core</artifactId>
- <version>3.8.6.v20240801-RELEASE</version>
- </dependency>
复制代码 5、创建TIO消息处置惩罚器
创建一个实现IWsMsgHandler接口的类,用于处置惩罚WebSocket消息:
- import com.alibaba.fastjson.JSON;
- import com.ruoyi.im.config.TioWebSocketAdapterConfig;
- import com.ruoyi.im.domain.ImMessage;
- import com.ruoyi.im.model.ChatMessage;
- import com.ruoyi.im.service.IImMessageService;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.tio.core.ChannelContext;
- import org.tio.http.common.HttpRequest;
- import org.tio.http.common.HttpResponse;
- import org.tio.websocket.common.WsRequest;
- import org.tio.websocket.common.WsResponse;
- import org.tio.websocket.server.handler.IWsMsgHandler;
- import java.util.Date;
- import java.util.UUID;
- @Component
- public class WebSocketMessageHandler implements IWsMsgHandler {
-
- private static final Logger log = LoggerFactory.getLogger(WebSocketMessageHandler.class);
-
- /**
- * 从ChannelContext中获取参数
- */
- private String getParam(ChannelContext channelContext, String paramName) {
- try {
- // 尝试从channelContext的httpConfig或其他属性中获取参数
- Object requestObj = channelContext.getAttribute("httpRequest");
- if (requestObj != null && requestObj instanceof HttpRequest) {
- HttpRequest httpRequest = (HttpRequest) requestObj;
- return httpRequest.getParam(paramName);
- }
-
- // 从user属性中获取用户ID
- if ("userId".equals(paramName)) {
- return (String) channelContext.getAttribute("userId");
- }
-
- return null;
- } catch (Exception e) {
- log.error("获取参数失败: paramName={}", paramName, e);
- return null;
- }
- }
-
- @Override
- public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {
- // 从请求中获取用户ID、token和设备类型
- String userId = httpRequest.getParam("userId");
- String token = httpRequest.getParam("token");
- String deviceTypeStr = httpRequest.getParam("deviceType");
-
- // 验证参数
- if (userId == null || token == null || deviceTypeStr == null) {
- log.error("握手参数不完整: userId={}, token={}, deviceType={}", userId, token, deviceTypeStr);
- return httpResponse;
- }
-
- // 验证token(这里应该调用若依的token验证服务)
- // TODO: 实现token验证逻辑
-
- // 解析设备类型
- UserSessionManager.DeviceType deviceType;
- try {
- deviceType = UserSessionManager.DeviceType.valueOf(deviceTypeStr.toUpperCase());
- } catch (IllegalArgumentException e) {
- log.error("无效的设备类型: {}", deviceTypeStr);
- return httpResponse;
- }
-
- // 保存参数到ChannelContext
- channelContext.setAttribute("httpRequest", httpRequest);
- channelContext.setAttribute("userId", userId);
-
- // 添加用户会话
- userSessionManager.addUserSession(userId, token, deviceType, channelContext);
-
- return httpResponse;
- }
-
- @Override
- public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {
- String userId = httpRequest.getParam("userId");
- log.info("握手成功,userId: {}, channelId: {}", userId, channelContext.getId());
- }
-
- @Override
- public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
- return null;
- }
-
- @Override
- public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
- // 从channelContext获取用户ID
- String userId = getParam(channelContext, "userId");
- // 移除用户会话
- if (userId != null) {
- userSessionManager.removeUserSession(userId);
- }
- log.info("连接关闭,userId: {}, channelId: {}", userId, channelContext.getId());
- return null;
- }
-
- @Override
- public Object onText(WsRequest wsRequest, String msg, ChannelContext channelContext) {
- // 从channelContext获取用户ID
- String userId = getParam(channelContext, "userId");
- log.info("收到消息: {}, userId: {}, channelId: {}", msg, userId, channelContext.getId());
-
- try {
- // 解析消息
- ChatMessage chatMessage = JSON.parseObject(msg, ChatMessage.class);
-
- // 设置消息ID和发送时间
- if (chatMessage.getMessageId() == null) {
- chatMessage.setMessageId(UUID.randomUUID().toString());
- }
- if (chatMessage.getSendTime() == null) {
- chatMessage.setSendTime(new Date());
- }
-
- // 设置发送者ID
- chatMessage.setFromUserId(userId);
-
- // 更新用户最后活动时间
- userSessionManager.updateUserLastActiveTime(userId);
-
- // 创建消息实体并持久化
- ImMessage imMessage = new ImMessage();
- imMessage.setMessageId(chatMessage.getMessageId());
- imMessage.setFromUserId(chatMessage.getFromUserId());
- imMessage.setToUserId(chatMessage.getToUserId());
- imMessage.setGroupId(chatMessage.getGroupId());
- imMessage.setContent(chatMessage.getContent());
- imMessage.setMessageType(chatMessage.getMessageType());
- imMessage.setStatus(0); // 0表示未读
- imMessage.setSendTime(chatMessage.getSendTime());
- imMessage.setCreateTime(chatMessage.getSendTime());
- imMessage.setUpdateTime(chatMessage.getSendTime());
- imMessage.setIsSent(1); // 1表示已发送
- imMessage.setRetryCount(0); // 初始重试次数为0
-
- // 保存消息到数据库
- imMessageService.insertImMessage(imMessage);
-
- // 处理消息
- if (chatMessage.getToUserId() != null) {
- // 私聊消息
- userSessionManager.sendMessageToUser(chatMessage.getToUserId(), JSON.toJSONString(chatMessage));
- } else if (chatMessage.getGroupId() != null) {
- // 群聊消息(需要实现群组管理)
- // TODO: 实现群聊消息处理
- } else {
- // 广播消息
- userSessionManager.broadcastMessage(JSON.toJSONString(chatMessage));
- }
-
- // 发送响应消息
- WsResponse wsResponse = WsResponse.fromText("SendMessageSuccess:" + msg, "UTF-8");
- TioWebSocketAdapterConfig.send(channelContext, wsResponse);
-
- } catch (Exception e) {
- log.error("处理消息失败: {}", msg, e);
- WsResponse wsResponse = WsResponse.fromText("处理消息失败: " + e.getMessage(), "UTF-8");
- TioWebSocketAdapterConfig.send(channelContext, wsResponse);
- }
-
- return null;
- }
- }
复制代码- import com.ruoyi.im.config.TioWebSocketAdapterConfig;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
- import org.tio.core.ChannelContext;
- import org.tio.websocket.common.WsResponse;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- @Component
- public class UserSessionManager {
-
- private static final Logger log = LoggerFactory.getLogger(UserSessionManager.class);
-
- // 用户ID -> 用户会话信息
- private final Map<String, UserSession> userSessions = new ConcurrentHashMap<>();
-
- // 设备类型枚举
- public enum DeviceType {
- WEB, // Web端
- MINI, // 小程序
- APP // APP
- }
-
- // 用户会话信息类
- public static class UserSession {
- private String userId;
- private String token;
- private DeviceType deviceType;
- private ChannelContext channelContext;
- private long lastActiveTime;
-
- public UserSession(String userId, String token, DeviceType deviceType, ChannelContext channelContext) {
- this.userId = userId;
- this.token = token;
- this.deviceType = deviceType;
- this.channelContext = channelContext;
- this.lastActiveTime = System.currentTimeMillis();
- }
-
- // Getters and setters
- public String getUserId() { return userId; }
- public void setUserId(String userId) { this.userId = userId; }
- public String getToken() { return token; }
- public void setToken(String token) { this.token = token; }
- public DeviceType getDeviceType() { return deviceType; }
- public void setDeviceType(DeviceType deviceType) { this.deviceType = deviceType; }
- public ChannelContext getChannelContext() { return channelContext; }
- public void setChannelContext(ChannelContext channelContext) { this.channelContext = channelContext; }
- public long getLastActiveTime() { return lastActiveTime; }
- public void setLastActiveTime(long lastActiveTime) { this.lastActiveTime = lastActiveTime; }
- }
-
- // 添加用户会话
- public void addUserSession(String userId, String token, DeviceType deviceType, ChannelContext channelContext) {
- UserSession session = new UserSession(userId, token, deviceType, channelContext);
- userSessions.put(userId, session);
-
- // 绑定用户ID到ChannelContext,使用适配器
- TioWebSocketAdapterConfig.bindUser(channelContext, userId);
-
- log.info("用户会话添加成功: userId={}, deviceType={}", userId, deviceType);
- }
-
- // 移除用户会话
- public void removeUserSession(String userId) {
- UserSession session = userSessions.remove(userId);
- if (session != null) {
- // 解绑用户ID,使用适配器
- TioWebSocketAdapterConfig.unbindUser(session.getChannelContext(), userId);
- log.info("用户会话移除成功: userId={}", userId);
- }
- }
-
- // 获取用户会话
- public UserSession getUserSession(String userId) {
- return userSessions.get(userId);
- }
-
- // 更新用户最后活动时间
- public void updateUserLastActiveTime(String userId) {
- UserSession session = userSessions.get(userId);
- if (session != null) {
- session.setLastActiveTime(System.currentTimeMillis());
- }
- }
-
- // 发送消息给指定用户
- public void sendMessageToUser(String userId, String message) {
- UserSession session = userSessions.get(userId);
- if (session != null && session.getChannelContext() != null) {
- WsResponse wsResponse = WsResponse.fromText(message, "UTF-8");
- // 使用适配器发送消息
- TioWebSocketAdapterConfig.send(session.getChannelContext(), wsResponse);
- } else {
- log.warn("用户不在线,无法发送消息: userId={}", userId);
- }
- }
-
- // 广播消息给所有用户
- public void broadcastMessage(String message) {
- if (userSessions.isEmpty()) {
- return;
- }
-
- WsResponse wsResponse = WsResponse.fromText(message, "UTF-8");
- for (UserSession session : userSessions.values()) {
- if (session.getChannelContext() != null) {
- // 使用适配器发送消息
- TioWebSocketAdapterConfig.send(session.getChannelContext(), wsResponse);
- }
- }
- }
-
- // 获取在线用户数量
- public int getOnlineUserCount() {
- return userSessions.size();
- }
-
- // 获取所有在线用户ID
- public Set<String> getOnlineUserIds() {
- return userSessions.keySet();
- }
- }
复制代码 6、创建TIO配置类和 TIO WebSocket适配器配置
- import com.ruoyi.im.websocket.WebSocketMessageHandler;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.tio.websocket.server.WsServerConfig;
- import org.tio.websocket.server.WsServerStarter;
- import java.io.IOException;
- @Configuration
- public class TioWebSocketConfig {
-
- @Autowired
- private WebSocketMessageHandler webSocketMessageHandler;
-
- @Bean
- public WsServerStarter wsServerStarter() throws IOException {
- // 配置t-io websocket服务器,指定端口
- WsServerConfig wsServerConfig = new WsServerConfig(9321);
-
- // 创建WebSocket服务器
- WsServerStarter wsServerStarter = new WsServerStarter(wsServerConfig, webSocketMessageHandler);
-
- // 这里不再获取和配置ServerTioConfig
-
- return wsServerStarter;
- }
- }
复制代码- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.annotation.Configuration;
- import org.tio.core.ChannelContext;
- import org.tio.core.Tio;
- import org.tio.core.intf.Packet;
- /**
- * t-io WebSocket适配器配置
- * 帮助处理不同版本的t-io兼容性问题
- */
- @Configuration
- public class TioWebSocketAdapterConfig {
-
- private static final Logger log = LoggerFactory.getLogger(TioWebSocketAdapterConfig.class);
- /**
- * 发送消息
- *
- * @param channelContext 通道上下文
- * @param packet 数据包
- */
- public static void send(ChannelContext channelContext, Packet packet) {
- if (channelContext == null || packet == null) {
- log.warn("发送消息失败:通道上下文或数据包为空");
- return;
- }
- try {
- // 直接调用 Tio 的 send 方法
- Tio.send(channelContext, packet);
- } catch (Exception e) {
- log.error("发送消息失败", e);
- }
- }
- /**
- * 绑定用户
- *
- * @param channelContext 通道上下文
- * @param userId 用户ID
- */
- public static void bindUser(ChannelContext channelContext, String userId) {
- if (channelContext == null || userId == null) {
- log.warn("绑定用户失败:通道上下文或用户ID为空");
- return;
- }
- try {
- // 直接调用 Tio 的 bindUser 方法
- Tio.bindUser(channelContext, userId);
- } catch (Exception e) {
- log.error("绑定用户失败", e);
- }
- }
- /**
- * 解绑用户
- *
- * @param channelContext 通道上下文
- * @param userId 用户ID
- */
- public static void unbindUser(ChannelContext channelContext, String userId) {
- if (channelContext == null) {
- log.warn("解绑用户失败:通道上下文为空");
- return;
- }
- try {
- // 直接调用 Tio 的 unbindUser 方法
- Tio.unbindUser(channelContext);
- } catch (Exception e) {
- log.error("解绑用户失败", e);
- }
- }
- }
复制代码 7、配置TIO启动类
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.tio.websocket.server.WsServerStarter;
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import java.io.IOException;
- import java.lang.reflect.Method;
- @Component
- public class WebSocketServer {
-
- private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
-
- @PostConstruct
- public void start() {
- try {
- wsServerStarter.start();
- log.info("WebSocket服务器启动成功,监听端口:9321");
- } catch (IOException e) {
- log.error("WebSocket服务器启动失败", e);
- }
- }
-
复制代码 启动springboot主类,即可启动tio服务
- public static void main(String[] args) {
- SpringApplication.run(RuoYiApplication.class, args);
- }
复制代码 8、测试即时通讯功能
可以利用websocket工具进行测试
- ##链接地址:
- ws://127.0.0.1:9321?userId=1&token=1&deviceType=MINI
- ##链接参数说明:
- userId:当前用户id
- token:token值
- deviceType:设备类型 WEB Web端、MINI 小程序 、APP
- ##发送消息示例:
- {
- "fromUserId": "1",发送者用户ID
- "toUserId": "2",//接收者用户ID
- "content": "测试消息内容",//消息内容
- "messageType": "1"//消息类型1=文本,2=图片,3=语音,4=视频
- }
- ##发送后响应示例:
- SendMessageSuccess:{ "fromUserId": "1", "toUserId": "2", "content": "测试消息内容", "messageType": "1" }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |