IT评测·应用市场-qidao123.com技术社区
标题:
Spring Boot整合T-IO实现即时通讯
[打印本页]
作者:
王柳
时间:
2025-4-18 11:27
标题:
Spring Boot整合T-IO实现即时通讯
在当今的互联网应用中,即时通讯功能已经变得不可或缺。无论是社交应用、在线客服还是实时数据推送,都必要高效的通讯框架来支持。TIO(Try-IO)是一个高性能的Java网络通讯框架,支持TCP/UDP/HTTP/WebSocket等多种协议,非常适合用于即时通讯场景。本文将介绍如何在Spring Boot项目中整合TIO,实现一个简朴的即时通讯功能。
1. TIO简介
TIO是一个基于Java的高性能网络通讯框架,具有以下特点:
支持多种协议:TCP、UDP、HTTP、WebSocket等。
高性能:基于NIO实现,支持高并发。
易用性:提供了丰富的API和机动的配置
2. 情况准备
在开始之前,请确保你已经安装了以下工具:
JDK 1.8及以上版本(本项目利用jdk 17版本)
Maven或Gradle
IntelliJ IDEA或Eclipse
4.添加依赖
在pom.xml文件中添加TIO的相关依赖
<!-- t-io WebSocket依赖 -->
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-websocket-server</artifactId>
<version>3.8.6.v20240801-RELEASE</version>
</dependency>
<!-- t-io 核心依赖 -->
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-core</artifactId>
<version>3.8.6.v20240801-RELEASE</version>
</dependency>
复制代码
5、创建TIO消息处置惩罚器
创建一个实现IWsMsgHandler接口的类,用于处置惩罚WebSocket消息:
import com.alibaba.fastjson.JSON;
import com.ruoyi.im.config.TioWebSocketAdapterConfig;
import com.ruoyi.im.domain.ImMessage;
import com.ruoyi.im.model.ChatMessage;
import com.ruoyi.im.service.IImMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.server.handler.IWsMsgHandler;
import java.util.Date;
import java.util.UUID;
@Component
public class WebSocketMessageHandler implements IWsMsgHandler {
private static final Logger log = LoggerFactory.getLogger(WebSocketMessageHandler.class);
/**
* 从ChannelContext中获取参数
*/
private String getParam(ChannelContext channelContext, String paramName) {
try {
// 尝试从channelContext的httpConfig或其他属性中获取参数
Object requestObj = channelContext.getAttribute("httpRequest");
if (requestObj != null && requestObj instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) requestObj;
return httpRequest.getParam(paramName);
}
// 从user属性中获取用户ID
if ("userId".equals(paramName)) {
return (String) channelContext.getAttribute("userId");
}
return null;
} catch (Exception e) {
log.error("获取参数失败: paramName={}", paramName, e);
return null;
}
}
@Override
public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {
// 从请求中获取用户ID、token和设备类型
String userId = httpRequest.getParam("userId");
String token = httpRequest.getParam("token");
String deviceTypeStr = httpRequest.getParam("deviceType");
// 验证参数
if (userId == null || token == null || deviceTypeStr == null) {
log.error("握手参数不完整: userId={}, token={}, deviceType={}", userId, token, deviceTypeStr);
return httpResponse;
}
// 验证token(这里应该调用若依的token验证服务)
// TODO: 实现token验证逻辑
// 解析设备类型
UserSessionManager.DeviceType deviceType;
try {
deviceType = UserSessionManager.DeviceType.valueOf(deviceTypeStr.toUpperCase());
} catch (IllegalArgumentException e) {
log.error("无效的设备类型: {}", deviceTypeStr);
return httpResponse;
}
// 保存参数到ChannelContext
channelContext.setAttribute("httpRequest", httpRequest);
channelContext.setAttribute("userId", userId);
// 添加用户会话
userSessionManager.addUserSession(userId, token, deviceType, channelContext);
return httpResponse;
}
@Override
public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) {
String userId = httpRequest.getParam("userId");
log.info("握手成功,userId: {}, channelId: {}", userId, channelContext.getId());
}
@Override
public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
return null;
}
@Override
public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) {
// 从channelContext获取用户ID
String userId = getParam(channelContext, "userId");
// 移除用户会话
if (userId != null) {
userSessionManager.removeUserSession(userId);
}
log.info("连接关闭,userId: {}, channelId: {}", userId, channelContext.getId());
return null;
}
@Override
public Object onText(WsRequest wsRequest, String msg, ChannelContext channelContext) {
// 从channelContext获取用户ID
String userId = getParam(channelContext, "userId");
log.info("收到消息: {}, userId: {}, channelId: {}", msg, userId, channelContext.getId());
try {
// 解析消息
ChatMessage chatMessage = JSON.parseObject(msg, ChatMessage.class);
// 设置消息ID和发送时间
if (chatMessage.getMessageId() == null) {
chatMessage.setMessageId(UUID.randomUUID().toString());
}
if (chatMessage.getSendTime() == null) {
chatMessage.setSendTime(new Date());
}
// 设置发送者ID
chatMessage.setFromUserId(userId);
// 更新用户最后活动时间
userSessionManager.updateUserLastActiveTime(userId);
// 创建消息实体并持久化
ImMessage imMessage = new ImMessage();
imMessage.setMessageId(chatMessage.getMessageId());
imMessage.setFromUserId(chatMessage.getFromUserId());
imMessage.setToUserId(chatMessage.getToUserId());
imMessage.setGroupId(chatMessage.getGroupId());
imMessage.setContent(chatMessage.getContent());
imMessage.setMessageType(chatMessage.getMessageType());
imMessage.setStatus(0); // 0表示未读
imMessage.setSendTime(chatMessage.getSendTime());
imMessage.setCreateTime(chatMessage.getSendTime());
imMessage.setUpdateTime(chatMessage.getSendTime());
imMessage.setIsSent(1); // 1表示已发送
imMessage.setRetryCount(0); // 初始重试次数为0
// 保存消息到数据库
imMessageService.insertImMessage(imMessage);
// 处理消息
if (chatMessage.getToUserId() != null) {
// 私聊消息
userSessionManager.sendMessageToUser(chatMessage.getToUserId(), JSON.toJSONString(chatMessage));
} else if (chatMessage.getGroupId() != null) {
// 群聊消息(需要实现群组管理)
// TODO: 实现群聊消息处理
} else {
// 广播消息
userSessionManager.broadcastMessage(JSON.toJSONString(chatMessage));
}
// 发送响应消息
WsResponse wsResponse = WsResponse.fromText("SendMessageSuccess:" + msg, "UTF-8");
TioWebSocketAdapterConfig.send(channelContext, wsResponse);
} catch (Exception e) {
log.error("处理消息失败: {}", msg, e);
WsResponse wsResponse = WsResponse.fromText("处理消息失败: " + e.getMessage(), "UTF-8");
TioWebSocketAdapterConfig.send(channelContext, wsResponse);
}
return null;
}
}
复制代码
import com.ruoyi.im.config.TioWebSocketAdapterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.websocket.common.WsResponse;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class UserSessionManager {
private static final Logger log = LoggerFactory.getLogger(UserSessionManager.class);
// 用户ID -> 用户会话信息
private final Map<String, UserSession> userSessions = new ConcurrentHashMap<>();
// 设备类型枚举
public enum DeviceType {
WEB, // Web端
MINI, // 小程序
APP // APP
}
// 用户会话信息类
public static class UserSession {
private String userId;
private String token;
private DeviceType deviceType;
private ChannelContext channelContext;
private long lastActiveTime;
public UserSession(String userId, String token, DeviceType deviceType, ChannelContext channelContext) {
this.userId = userId;
this.token = token;
this.deviceType = deviceType;
this.channelContext = channelContext;
this.lastActiveTime = System.currentTimeMillis();
}
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getToken() { return token; }
public void setToken(String token) { this.token = token; }
public DeviceType getDeviceType() { return deviceType; }
public void setDeviceType(DeviceType deviceType) { this.deviceType = deviceType; }
public ChannelContext getChannelContext() { return channelContext; }
public void setChannelContext(ChannelContext channelContext) { this.channelContext = channelContext; }
public long getLastActiveTime() { return lastActiveTime; }
public void setLastActiveTime(long lastActiveTime) { this.lastActiveTime = lastActiveTime; }
}
// 添加用户会话
public void addUserSession(String userId, String token, DeviceType deviceType, ChannelContext channelContext) {
UserSession session = new UserSession(userId, token, deviceType, channelContext);
userSessions.put(userId, session);
// 绑定用户ID到ChannelContext,使用适配器
TioWebSocketAdapterConfig.bindUser(channelContext, userId);
log.info("用户会话添加成功: userId={}, deviceType={}", userId, deviceType);
}
// 移除用户会话
public void removeUserSession(String userId) {
UserSession session = userSessions.remove(userId);
if (session != null) {
// 解绑用户ID,使用适配器
TioWebSocketAdapterConfig.unbindUser(session.getChannelContext(), userId);
log.info("用户会话移除成功: userId={}", userId);
}
}
// 获取用户会话
public UserSession getUserSession(String userId) {
return userSessions.get(userId);
}
// 更新用户最后活动时间
public void updateUserLastActiveTime(String userId) {
UserSession session = userSessions.get(userId);
if (session != null) {
session.setLastActiveTime(System.currentTimeMillis());
}
}
// 发送消息给指定用户
public void sendMessageToUser(String userId, String message) {
UserSession session = userSessions.get(userId);
if (session != null && session.getChannelContext() != null) {
WsResponse wsResponse = WsResponse.fromText(message, "UTF-8");
// 使用适配器发送消息
TioWebSocketAdapterConfig.send(session.getChannelContext(), wsResponse);
} else {
log.warn("用户不在线,无法发送消息: userId={}", userId);
}
}
// 广播消息给所有用户
public void broadcastMessage(String message) {
if (userSessions.isEmpty()) {
return;
}
WsResponse wsResponse = WsResponse.fromText(message, "UTF-8");
for (UserSession session : userSessions.values()) {
if (session.getChannelContext() != null) {
// 使用适配器发送消息
TioWebSocketAdapterConfig.send(session.getChannelContext(), wsResponse);
}
}
}
// 获取在线用户数量
public int getOnlineUserCount() {
return userSessions.size();
}
// 获取所有在线用户ID
public Set<String> getOnlineUserIds() {
return userSessions.keySet();
}
}
复制代码
6、创建TIO配置类和 TIO WebSocket适配器配置
import com.ruoyi.im.websocket.WebSocketMessageHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.tio.websocket.server.WsServerConfig;
import org.tio.websocket.server.WsServerStarter;
import java.io.IOException;
@Configuration
public class TioWebSocketConfig {
@Autowired
private WebSocketMessageHandler webSocketMessageHandler;
@Bean
public WsServerStarter wsServerStarter() throws IOException {
// 配置t-io websocket服务器,指定端口
WsServerConfig wsServerConfig = new WsServerConfig(9321);
// 创建WebSocket服务器
WsServerStarter wsServerStarter = new WsServerStarter(wsServerConfig, webSocketMessageHandler);
// 这里不再获取和配置ServerTioConfig
return wsServerStarter;
}
}
复制代码
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
/**
* t-io WebSocket适配器配置
* 帮助处理不同版本的t-io兼容性问题
*/
@Configuration
public class TioWebSocketAdapterConfig {
private static final Logger log = LoggerFactory.getLogger(TioWebSocketAdapterConfig.class);
/**
* 发送消息
*
* @param channelContext 通道上下文
* @param packet 数据包
*/
public static void send(ChannelContext channelContext, Packet packet) {
if (channelContext == null || packet == null) {
log.warn("发送消息失败:通道上下文或数据包为空");
return;
}
try {
// 直接调用 Tio 的 send 方法
Tio.send(channelContext, packet);
} catch (Exception e) {
log.error("发送消息失败", e);
}
}
/**
* 绑定用户
*
* @param channelContext 通道上下文
* @param userId 用户ID
*/
public static void bindUser(ChannelContext channelContext, String userId) {
if (channelContext == null || userId == null) {
log.warn("绑定用户失败:通道上下文或用户ID为空");
return;
}
try {
// 直接调用 Tio 的 bindUser 方法
Tio.bindUser(channelContext, userId);
} catch (Exception e) {
log.error("绑定用户失败", e);
}
}
/**
* 解绑用户
*
* @param channelContext 通道上下文
* @param userId 用户ID
*/
public static void unbindUser(ChannelContext channelContext, String userId) {
if (channelContext == null) {
log.warn("解绑用户失败:通道上下文为空");
return;
}
try {
// 直接调用 Tio 的 unbindUser 方法
Tio.unbindUser(channelContext);
} catch (Exception e) {
log.error("解绑用户失败", e);
}
}
}
复制代码
7、配置TIO启动类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.websocket.server.WsServerStarter;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.lang.reflect.Method;
@Component
public class WebSocketServer {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
@PostConstruct
public void start() {
try {
wsServerStarter.start();
log.info("WebSocket服务器启动成功,监听端口:9321");
} catch (IOException e) {
log.error("WebSocket服务器启动失败", e);
}
}
复制代码
启动springboot主类,即可启动tio服务
public static void main(String[] args) {
SpringApplication.run(RuoYiApplication.class, args);
}
复制代码
8、测试即时通讯功能
可以利用websocket工具进行测试
##链接地址:
ws://127.0.0.1:9321?userId=1&token=1&deviceType=MINI
##链接参数说明:
userId:当前用户id
token:token值
deviceType:设备类型 WEB Web端、MINI 小程序 、APP
##发送消息示例:
{
"fromUserId": "1",发送者用户ID
"toUserId": "2",//接收者用户ID
"content": "测试消息内容",//消息内容
"messageType": "1"//消息类型1=文本,2=图片,3=语音,4=视频
}
##发送后响应示例:
SendMessageSuccess:{ "fromUserId": "1", "toUserId": "2", "content": "测试消息内容", "messageType": "1" }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/)
Powered by Discuz! X3.4