🎯 本文档具体先容了怎样利用WebSocket协议优化客户端与服务端之间的通讯,特殊是在处理惩罚异步订单创建关照的场景中。通过引入WebSocket取代传统的HTTP哀求-相应模式,实现了服务器主动向客户端推送数据的功能,极大地进步了及时性和服从。文中起首概述了WebSocket的上风,随后深入探究了其在分布式体系中的具体实现,包罗依靠管理、网关设置、WebSocket服务类的操持以及消息队列的利用等关键环节。特殊地,针对分布式架构下WebSocket毗连状态同步标题,提出了一种基于消息队列广播机制的办理方案,确保了体系的可扩展性和稳固性。同时,还夸大了心跳检测机制的告急性,以维护毗连的有效性。
🏠️ HelloDam/场快订(场馆预定 SaaS 平台)
前言
在时间段预定接口 V2 中,用户预定之后,会发送一个消息,让消息队列异步创建订单。此时客户端是无法知道服务端什么时间完成订单创建的,因此须要服务端告知客户端。但是以往都是客户端给服务端发 http 哀求,但是服务端怎样主动告知客户端呢?
这个时间就须要请出我们本日的主角 WebSocket 了
WebSocket 先容
WebSocket是一种在单个TCP毗连上举行全双工通讯的协议。它使得客户端和服务器之间的数据互换变得更加简单,允许服务器直接向客户端推送数据而不必由客户端发起哀求。这种特性让及时性要求较高的应用,如即时通讯工具、在线游戏以及及时买卖业务体系等,可以或许更加高效地举行数据交互。通过WebSocket,开发者可以构建相应更快、性能更高的网络应用,同时镌汰不须要的网络开销和延长。相比传统的HTTP哀求-相应模式,WebSocket提供了更低的延长和更高的服从,特殊是在须要频仍更新数据的应用场景中表现出色。
因此利用了 WebSocket ,一旦客户端和服务端创建了毗连,当订单创建乐成之后,服务端直接别订单数据推送给客户端即可。
流程图
user1、user2 和 user3 分别发起 WebSocket 毗连,起首颠末网关,毗连哀求被分发到差异的服务中。WebSocket 服务吸收到毗连哀求之后,对其举行登录校验,假如校验乐成,将其 Session 信息存储在服务器的内存中,假如校验失败,直接关闭 Session 。此中 user1、user2 的Session信息被存储在 WebSocket 服务1 中,user3 的Session信息被存储在 WebSocket 服务2 中。
当用户预定时间段,天生订单之后,场馆服务向消息队列中发生订单数据。接着消息队列将订单数据广播到 WebSocket 服务1 和 WebSocket 服务2中。WebSocket 服务2 发现本身的内存中存有 user3 的Session,因此将订单数据通过该 Session 发送给 user3 。
临时无法在飞书文档外展示此内容
具体实现
为相识耦 WebSocket 和其他服务,单独创建一个 WebSocket 服务。
依靠
- <dependencies>
- <dependency>
- <groupId>com.vrs</groupId>
- <artifactId>vrs-web</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.dam</groupId>
- <artifactId>vrs-rocketmq</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.vrs</groupId>
- <artifactId>vrs-common</artifactId>
- </dependency>
- <dependency>
- <groupId>com.vrs</groupId>
- <artifactId>vrs-idempotent</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
- </dependency>
- <!-- websocket -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
- </dependencies>
复制代码 网关设置
当访问 /websocket/** 路径时,将哀求转化到 WebSocket 服务,留意,转发的时间添加了前缀ws:
- - id: vrs-websocket
- uri: lb:ws://vrs-websocket
- predicates:
- - Path=/websocket/**
- filters:
- - name: TokenValidate
- args:
- whitePathList:
- - /websocket/**
复制代码 【去除默认过滤器】
假如像如许全局设置了默认过滤器,DedupeResponseHeader过滤器的作用是对指定的相应头(在这个例子中为Vary、Access-Control-Allow-Origin和Access-Control-Allow-Credentials)举行去重。当有多个雷同名称的相应头时,它会按照给定的计谋保存此中的一个。这里的计谋是RETAIN_FIRST,意味着它将保存这些头部中第一次出现的谁人,而删除后续出现的重复头部。
- spring:
- cloud:
- gateway:
- default-filters:
- - DedupeResponseHeader=Vary Access-Control-Allow-Origin Access-Control-Allow-Credentials, RETAIN_FIRST
复制代码 发起 WebSocket 毗连的时间,会报如下错误,这是由于修改了只读的哀求头
- java.lang.UnsupportedOperationException: null
- at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.0.9.jar:6.0.9]
- Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
- Error has been observed at the following site(s):
- *__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
- *__checkpoint ⇢ HTTP GET "/websocket/admin?token=eyJhbGciOiJIUzUxMiIsInppcCI6IkdaSVAifQ.H4sIAAAAAAAA_6tWKi5NUrJScgwN8dANDXYNUtJRSq0oULIyNDe2NDMyNrYw0lEqLU4t8kwBilmYmZgZm5sbG5mbGViYGpgYQyX9EnNTgYYkpuRm5ilBhEIqC4BCRrUAvgeVqmEAAAA.e7wanr0gKu4FD-Y_afO2MEIECxZ6oMKGlf8zarZp-GOmzqL5n354gasKr7GKKs4H3Pq0CYJQECO_Rv9ixGsvZQ" [ExceptionHandlingWebHandler]
- Original Stack Trace:
- at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.0.9.jar:6.0.9]
复制代码 因此须要将上述设置删除,假如还须要这些默认设置,可以到具体的路由下面设置,就像下面一样
- spring:
- cloud:
- gateway:
- routes:
- - id: vrs-admin
- uri: lb://vrs-admin
- predicates:
- - Path=/admin/**
- filters:
- - DedupeResponseHeader=Vary Access-Control-Allow-Origin Access-Control-Allow-Credentials, RETAIN_FIRST
- - name: TokenValidate
- args:
- whitePathList:
- - /admin/user/v1/login
- - /admin/user/v1/wechatLogin
- - ...
复制代码 WebSocket设置类
设置类 WebSocketConfig 重要用于设置和初始化 WebSocket 服务器端点,并处理惩罚与 WebSocket 毗连相干的操纵,具体功能如下:
- Spring Bean 注册:通过 @Configuration 注解标明这是一个 Spring 设置类。在该类中界说了一个 @Bean 方法 serverEndpointExporter(),它返回一个 ServerEndpointExporter 实例。这个实例的作用是主动注册利用了 @ServerEndpoint 注解声明的 WebSocket 端点对象到 Spring 容器中。
- 握手哀求修改:modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) 方法重写了父类中的同名方法,用于在创建 WebSocket 毗连前对握手哀求举行自界说修改。在这个例子中,方法实验从握手哀求参数中获取名为 “token” 的参数,并将其存储在 ServerEndpointConfig 对象的用户属性中(即 sec.getUserProperties().put("token", token);)。这使得后续逻辑可以通过访问端点设置对象来获取令牌信息。
- 端点实例化:getEndpointInstance(Class<T> clazz) 方法重写了父类的方法,用于提供自界说逻辑来实例化被 @ServerEndpoint 标注的 WebSocket 端点类。在这个实现中,它直接调用了父类的实现 super.getEndpointInstance(clazz) 来创建端点实例。通常环境下,除非须要特殊的实例化逻辑,否则可以直接利用父类的默认实现。
- package com.vrs.config;
- import jakarta.websocket.HandshakeResponse;
- import jakarta.websocket.server.HandshakeRequest;
- import jakarta.websocket.server.ServerEndpointConfig;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
- import java.util.List;
- import java.util.Map;
- /**
- * @Author dam
- * @create 2025/1/24 15:25
- */
- @Configuration
- public class WebSocketConfig extends ServerEndpointConfig.Configurator {
- /**
- * 这个bean会自动注册使用了@ServerEndpoint注解声明的对象
- *
- * @return
- */
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- return new ServerEndpointExporter();
- }
- /**
- * 建立握手时,连接前的操作
- */
- @Override
- public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
- // 获取请求参数
- Map<String, List<String>> parameterMap = request.getParameterMap();
- List<String> tokenList = parameterMap.get("token");
- if (tokenList != null && !tokenList.isEmpty()) {
- String token = tokenList.get(0);
- sec.getUserProperties().put("token", token);
- }
- }
- /**
- * 初始化端点对象,也就是被@ServerEndpoint所标注的对象
- */
- @Override
- public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
- return super.getEndpointInstance(clazz);
- }
- }
复制代码 WebSocket服务类
WebSocketServer 类是为实现及时通讯而操持的,可以或许有效地管理多个客户端之间的双向通讯以及保持这些通讯的稳固性和可靠性。它通过 Spring 的 @Component 和 Jakarta WebSocket 的 @ServerEndpoint 注解被注册为一个 Spring Bean,并监听路径为 /websocket/{username} 的 WebSocket 哀求。该类利用一个静态的 ConcurrentHashMap 来存储每个用户的会话 (Session) 和末了一次活动时间,以跟踪在线用户和他们的生动状态。它实现了以下关键功能:
- 毗连管理:处理惩罚用户的毗连创建 (onOpen) 和关闭 (onClose) 事故,包罗校验用户提供的 token 是否有效。
- 消息处理惩罚:吸收来自客户端的消息 (onMessage) 并据此更新用户的末了活动时间,支持发送 PING/PONG 心跳消息来维持毗连。
- 心跳检测:通过定时任务每30秒查抄一次用户的心跳,若某用户凌驾60秒未活动,则主动断开其毗连,确保资源的有效利用。
- 消息发送:提供了一个方法用于向特定用户发送消息。
- package com.vrs.controller;
- import com.vrs.config.WebSocketConfig;
- import com.vrs.constant.RedisCacheConstant;
- import com.vrs.utils.JwtUtil;
- import jakarta.websocket.*;
- import jakarta.websocket.server.PathParam;
- import jakarta.websocket.server.ServerEndpoint;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.stereotype.Component;
- import org.springframework.util.StringUtils;
- import java.io.IOException;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- /**
- * @Author dam
- * @create 2024/1/24 14:32
- */
- // 将WebSocketServer注册为spring的一个bean
- @ServerEndpoint(value = "/websocket/{username}", configurator = WebSocketConfig.class)
- @Component
- @Slf4j(topic = "WebSocketServer")
- public class WebSocketServer {
- /**
- * 心跳检查间隔时间(单位:秒)
- */
- private static final int HEARTBEAT_INTERVAL = 30;
- /**
- * 心跳超时时间(单位:秒)
- */
- private static final int HEARTBEAT_TIMEOUT = 60;
- /**
- * 记录当前在线连接的客户端的session
- */
- private static final Map<String, Session> usernameAndSessionMap = new ConcurrentHashMap<>();
- /**
- * 记录用户最后一次活动时间
- */
- private static final Map<String, Long> lastActivityTimeMap = new ConcurrentHashMap<>();
- /**
- * 直接通过 Autowired 注入的话,redisTemplate为null,因此使用这种引入方式
- */
- private static StringRedisTemplate redisTemplate;
- @Autowired
- public void setRabbitTemplate(StringRedisTemplate redisTemplate) {
- WebSocketServer.redisTemplate = redisTemplate;
- }
- /**
- * 定时任务线程池,用于心跳检查
- */
- private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- // 初始化心跳检查任务
- static {
- scheduler.scheduleAtFixedRate(WebSocketServer::checkHeartbeat, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
- }
- /**
- * 浏览器和服务端连接建立成功之后会调用这个方法
- */
- @OnOpen
- public void onOpen(Session session, @PathParam("username") String username, EndpointConfig config) {
- // 校验 token 是否有效
- String token = (String) config.getUserProperties().get("token");
- boolean validToken = validToken(token);
- if (!validToken) {
- try {
- session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "无效的token,请先登录"));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- // 如果用户已存在,关闭旧连接
- if (usernameAndSessionMap.containsKey(username)) {
- Session oldSession = usernameAndSessionMap.get(username);
- if (oldSession != null && oldSession.isOpen()) {
- try {
- oldSession.close();
- } catch (IOException e) {
- log.error("关闭旧连接时发生错误", e);
- }
- }
- }
- // 记录新连接
- usernameAndSessionMap.put(username, session);
- // 记录用户活动时间
- lastActivityTimeMap.put(username, System.currentTimeMillis());
- log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size());
- }
- /**
- * 连接关闭调用的方法
- */
- @OnClose
- public void onClose(Session session, @PathParam("username") String username) throws IOException {
- try {
- if (session != null && session.isOpen()) {
- session.close();
- }
- } catch (IOException e) {
- log.error("关闭连接时发生错误", e);
- } finally {
- usernameAndSessionMap.remove(username);
- lastActivityTimeMap.remove(username);
- log.info("有一连接关闭,移除username={}的用户session, 当前在线人数为:{}", username, usernameAndSessionMap.size());
- }
- }
- /**
- * 发生错误的时候会调用这个方法
- */
- @OnError
- public void onError(Session session, Throwable error) {
- log.error("发生错误,原因:" + error.getMessage());
- error.printStackTrace();
- }
- /**
- * 收到客户端消息时调用
- */
- @OnMessage
- public void onMessage(String message, Session session, @PathParam("username") String username) {
- // 更新用户最后一次活动时间
- lastActivityTimeMap.put(username, System.currentTimeMillis());
- if ("PING".equals(message)) {
- log.debug("收到来自 {} 的心跳检测请求", username);
- } else {
- log.info("收到来自 {} 的消息: {}", username, message);
- }
- }
- /**
- * 服务端发送消息给客户端
- */
- public void sendMessage(String toUsername, String message) {
- try {
- Session toSession = usernameAndSessionMap.get(toUsername);
- if (toSession != null && toSession.isOpen()) {
- toSession.getBasicRemote().sendText(message);
- } else {
- log.warn("用户 {} 的会话已关闭或不存在", toUsername);
- }
- } catch (Exception e) {
- log.error("服务端发送消息给客户端失败", e);
- }
- }
- /**
- * 关闭心跳检测超时的 session
- */
- private static void checkHeartbeat() {
- long currentTime = System.currentTimeMillis();
- for (Map.Entry<String, Long> entry : lastActivityTimeMap.entrySet()) {
- String username = entry.getKey();
- long lastActivityTime = entry.getValue();
- if (currentTime - lastActivityTime > HEARTBEAT_TIMEOUT * 1000) {
- log.info("用户 {} 心跳超时,关闭连接", username);
- Session session = usernameAndSessionMap.get(username);
- if (session != null) {
- try {
- session.close();
- } catch (IOException e) {
- log.error("关闭连接时发生错误", e);
- }
- }
- usernameAndSessionMap.remove(username);
- lastActivityTimeMap.remove(username);
- }
- }
- }
- /**
- * 校验 token 有效
- *
- * @param token
- * @return
- */
- private boolean validToken(String token) {
- String userName = "";
- try {
- // 如果从 token 中解析用户名错误,说明 token 是捏造的,或者已经失效
- userName = JwtUtil.getUsername(token);
- } catch (Exception e) {
- return false;
- }
- if (StringUtils.hasText(userName) && StringUtils.hasText(token) &&
- (redisTemplate.opsForHash().get(RedisCacheConstant.USER_LOGIN_KEY + userName, token)) != null) {
- // --if-- 如果可以通过 token 从 Redis 中获取到用户的登录信息,说明通过校验
- return true;
- }
- return false;
- }
- }
复制代码 MQ斲丧者
- package com.vrs.rocketMq.listener;
- import com.vrs.constant.RocketMqConstant;
- import com.vrs.controller.WebSocketServer;
- import com.vrs.domain.dto.mq.WebsocketMqDTO;
- import com.vrs.templateMethod.MessageWrapper;
- import lombok.RequiredArgsConstructor;
- import lombok.SneakyThrows;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.spring.annotation.MessageModel;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.annotation.SelectorType;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
- /**
- * 执行预订流程 消费者
- *
- * @Author dam
- * @create 2024/9/20 21:30
- */
- @Slf4j(topic = RocketMqConstant.VENUE_TOPIC)
- @Component
- @RocketMQMessageListener(topic = RocketMqConstant.VENUE_TOPIC,
- consumerGroup = RocketMqConstant.VENUE_CONSUMER_GROUP + "-" + RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG,
- // 需要使用广播模式
- messageModel = MessageModel.BROADCASTING,
- // 监听tag
- selectorType = SelectorType.TAG,
- selectorExpression = RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG
- )
- @RequiredArgsConstructor
- public class WebSocketSendMessageListener implements RocketMQListener<MessageWrapper<WebsocketMqDTO>> {
- private final WebSocketServer webSocketServer;
- /**
- * 消费消息的方法
- * 方法报错就会拒收消息
- *
- * @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数
- */
- @SneakyThrows
- @Override
- public void onMessage(MessageWrapper<WebsocketMqDTO> messageWrapper) {
- // 开头打印日志
,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等) - log.info("[消费者] websocket发生消息给{}", messageWrapper.getMessage().getToUsername());
- webSocketServer.sendMessage(messageWrapper.getMessage().getToUsername(), messageWrapper.getMessage().getMessage());
- }
- }
复制代码 启动类
- package com.vrs;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
- /**
- * @Author dam
- * @create 2025/01/24 16:34
- */
- @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
- public class VrsWebSocketApplication {
- public static void main(String[] args) {
- SpringApplication.run(VrsWebSocketApplication.class, args);
- }
- }
复制代码 设置文件
- server:
- port: 7054
- spring:
- profiles:
- active: dam
- application:
- name: vrs-websocket
- cloud:
- nacos:
- discovery:
- server-addr: 127.0.0.1:8848
- data:
- redis:
- host: 127.0.0.1
- port: 6379
- password: 12345678
- database: 0
- timeout: 1800000
- jedis:
- pool:
- max-active: 20 #最大连接数
- max-wait: -1 #最大阻塞等待时间(负数表示没限制)
- max-idle: 5 #最大空闲
- min-idle: 0 #最小空闲
- rocketmq:
- # rocketMq的nameServer地址
- name-server: 127.0.0.1:9876
- producer:
- # 生产者组别
- group: vrs-websocket-group
- # 消息发送的超时时间
- send-message-timeout: 10000
- # 异步消息发送失败重试次数
- retry-times-when-send-async-failed: 1
- # 发送消息的最大大小,单位字节,这里等于4M
- max-message-size: 999999999
复制代码 留意事项
登录验证
为了防止被人恶意发生大量 WebSocket 毗连,占用服务器资源,因此在创建毗连的时间,须要举行登录验证,用户登录了才可以创建 WebSocket 毗连。
由于创建 WebSocket 毗连时,无法像之前的 http 哀求一样在哀求头携带 token 信息,因此之前网关实现的登录校验机制不收效,须要我们针对 WebSocket 毗连额外实现一套登录验证方式。
假设前端发起 WebSocket 毗连的代码如下:
- new WebSocket("ws://localhost:7049/websocket/admin?token=dahidaho");
复制代码 WebSocket 设置类
在modifyHandshake中,将客户端发起毗连哀求时的 token 设置到属性中,如许背面就可以将 token 获取出来举行校验,假如说校验不通过,就关闭 WebSokcet 毗连
token校验
代码位于WebSocketServer类中,当调用validToken校验失败之后,通过session.close来关闭毗连
- /**
- * 校验 token 有效
- *
- * @param token
- * @return
- */
- private boolean validToken(String token) {
- String userName = "";
- try {
- // 如果从 token 中解析用户名错误,说明 token 是捏造的,或者已经失效
- userName = JwtUtil.getUsername(token);
- } catch (Exception e) {
- return false;
- }
- if (StringUtils.hasText(userName) && StringUtils.hasText(token) &&
- (redisTemplate.opsForHash().get(RedisCacheConstant.USER_LOGIN_KEY + userName, token)) != null) {
- // --if-- 如果可以通过 token 从 Redis 中获取到用户的登录信息,说明通过校验
- return true;
- }
- return false;
- }
- /**
- * 浏览器和服务端连接建立成功之后会调用这个方法
- */
- @OnOpen
- public void onOpen(Session session, @PathParam("username") String username, EndpointConfig config) {
- // 校验 token 是否有效
- String token = (String) config.getUserProperties().get("token");
- boolean validToken = validToken(token);
- if (!validToken) {
- try {
- session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "无效的token,请先登录"));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- // 如果用户已存在,关闭旧连接
- if (usernameAndSessionMap.containsKey(username)) {
- Session oldSession = usernameAndSessionMap.get(username);
- if (oldSession != null && oldSession.isOpen()) {
- try {
- oldSession.close();
- } catch (IOException e) {
- log.error("关闭旧连接时发生错误", e);
- }
- }
- }
- // 记录新连接
- usernameAndSessionMap.put(username, session);
- // 记录用户活动时间
- lastActivityTimeMap.put(username, System.currentTimeMillis());
- log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size());
- }
复制代码 分布式 WebSocket
由于我们的项目是分布式架构的,假如vrs-websocket启动多个服务的话,须要处理惩罚如下标题:
WebSocketServer中的用户名及其对应的session信息usernameAndSessionMap是存储在当地的,假设发起毗连的时间,session被存储在呆板 1 上面。后续服务端要关照客户端时,怎么知道当前用户的信息是存储在呆板1、呆板 2 照旧呆板 3 呢?
由于 Session 无法直接序列化存储到 Redis 中,为相识决这个标题,本文通过借助消息队列来办理。
服务端要发送消息给客户端时,先将消息发送至消息队列中,消息设置为广播模式。后续多台摆设了vrs-websocket的呆板去消息队列中获取消息来斲丧,假如呆板查抄到了这条消息的吸收者 session 就在呆板上,则实验发送,否则直接 return 即可。
【消息生产者】
- package com.vrs.rocketMq.producer;
- import cn.hutool.core.util.StrUtil;
- import com.vrs.constant.RocketMqConstant;
- import com.vrs.domain.dto.mq.WebsocketMqDTO;
- import com.vrs.templateMethod.AbstractCommonSendProduceTemplate;
- import com.vrs.templateMethod.BaseSendExtendDTO;
- import com.vrs.templateMethod.MessageWrapper;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.common.message.MessageConst;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Component;
- import java.util.UUID;
- /**
- * websocket发送消息 生产者
- *
- * @Author dam
- * @create 2024/9/20 16:00
- */
- @Slf4j
- @Component
- public class WebsocketSendMessageProducer extends AbstractCommonSendProduceTemplate<WebsocketMqDTO> {
- @Override
- protected BaseSendExtendDTO buildBaseSendExtendParam(WebsocketMqDTO messageSendEvent) {
- return BaseSendExtendDTO.builder()
- .eventName("执行时间段预定")
- .topic(RocketMqConstant.VENUE_TOPIC)
- .tag(RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG)
- .sentTimeout(2000L)
- .build();
- }
- @Override
- protected Message<?> buildMessage(WebsocketMqDTO messageSendEvent, BaseSendExtendDTO requestParam) {
- String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();
- return MessageBuilder
- .withPayload(new MessageWrapper(keys, messageSendEvent))
- .setHeader(MessageConst.PROPERTY_KEYS, keys)
- .setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag())
- .build();
- }
- }
复制代码 【消息斲丧者】
斲丧者的代码就在具体实现中,这里不重复放
【利用】
- // 通过 websocket 发送消息,通知前端
- websocketSendMessageProducer.sendMessage(WebsocketMqDTO.builder()
- .toUsername(orderDO.getUserName())
- .message(JSON.toJSONString(orderDO))
- .build());
复制代码 心跳检测
用户创建 WebSocket 毗连之后的 session 数据是存储在服务器当地的,随着毗连数目标增长,session会占用大量的内存,心跳检测是为了定期整理那些无效的毗连。
在WebSocketServer中,通过定时任务每30秒查抄一次客户端的心跳状态,记录每个用户的末了活动时间。假如当前时间与某用户末了活动时间之差凌驾60秒,则以为该用户心跳超时,服务端将关闭其WebSocket毗连并整理相干记录。客户端需定期向服务端发送" ING"消息以维持毗连生动,确保不会因超时而被服务端断开。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |