前言
信赖很多同学都用过websocket来实现服务端主动向客户端推送消息吧,基本上全部的管理类系统都会有这个功能。由于有websocket的存在,使得前后的主动交互变得容易和低本钱。其实在JAVA领域用SpringBoot框架集成Websoket照旧很简单的,本日我们重点不是集成而是通过Redis的发布订阅实现Websocket集群通讯,当然有条件的也可以用MQ代替。
技术积累
什么是Websocket
WebSocket是一种在单个TCP连接上举行全双工通讯的协议。WebSocket通讯协议于2011年被IETF定为标准RFC 6455,并由RFC7936增补规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只必要完成一次握手,两者之间就直接可以创建持久性的连接,并举行双向数据传输。
什么是Redis发布订阅
Redis 的发布订阅(Pub/Sub)是一种消息通讯模式:发送者(pub)发送消息,订阅者(sub)接收消息。Redis 客户端可以订阅恣意数目的频道。当有新消息通过 PUBLISH 下令发送给频道时,这个消息会被发送给订阅它的全部客户端。
Redis发布订阅与消息队列的区别
Redis的发布订阅(Pub/Sub)和消息队列是两种不同的消息通报模式,它们的紧张区别在于消息的处理方式和使用场景。
消息的处理方式:
在 Redis 的发布订阅模式中,消息是即时的,也就是说,当消息发布后,只有当前在线且订阅了该频道的客户端才能收到这个消息,消息不会被存储,一旦发布,当前没有在线的客户端将无法接收到这个消息。
在消息队列中,消息是持久化的,消息被发送到队列后,会一直在队列中等候被消费,即使没有在线的消费者,消息也不会丢失,消费者下次上线后可以继承从队列中获取到消息。
实战演示
本次演示采用demo情势,仅仅提供演示Websocket集群的实现方式,以及解决消息负载均衡的问题。演示案例重点偏后端实现Websoket集群通讯,不涉及前后端心跳检测,如果应用在生产环境前端必要增加心跳检测与重复创建。
本实战采用原生spring websocket,后续有时间再提供netty版本,以及产线版本。有条件的同学可以自己实现,原理都差不多。
SpringBoot整合Websoket
1、项目结构
2、springcloud 版本
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.3.12.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <properties>
- <java.version>8</java.version>
- <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
- </properties>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>${spring-cloud.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
复制代码 3、maven依赖
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.68</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
- <!-- 整合thymeleaf前端页面 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-thymeleaf</artifactId>
- </dependency>
复制代码 4、设置文件
- server:
- port: 8888
- spring:
- profiles:
- active: dev
- mvc:
- pathmatch:
- # Springfox使用的路径匹配是基于AntPathMatcher的,而Spring Boot 2.6.X使用的是PathPatternMatcher
- matching-strategy: ant_path_matcher
- thymeleaf:
- mode: HTML
- encoding: UTF-8
- content-type: text/html
- cache: false
- prefix: classpath:/templates/
复制代码 5、thymeleaf 页面
websocket.html
- <!DOCTYPE html>
- <html>
- <head>
- <meta charset="utf-8">
- <title>websocket通讯</title>
- </head>
- <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
- <script>
- var socket;
- function openSocket() {
- if (typeof (WebSocket) == "undefined") {
- console.log("您的浏览器不支持WebSocket");
- } else {
- console.log("您的浏览器支持WebSocket");
- //实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
- //等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
- //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
- var socketUrl = "ws://192.168.1.4:7777/ws/" + $("#userId").val();
- socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
- console.log(socketUrl);
- if (socket != null) {
- socket.close();
- socket = null;
- }
- socket = new WebSocket(socketUrl);
- //打开事件
- socket.onopen = function () {
- console.log("websocket已打开");
- //socket.send("这是来自客户端的消息" + location.href + new Date());
- };
- //获得消息事件
- socket.onmessage = function (msg) {
- console.log("接收消息为:"+msg.data);
- };
- //关闭事件
- socket.onclose = function () {
- console.log("websocket已关闭");
- };
- //发生了错误事件
- socket.onerror = function () {
- console.log("websocket发生了错误");
- }
- }
- }
- //心跳检测与重复验证自己实现
- function sendMessage() {
- if (typeof (WebSocket) == "undefined") {
- console.log("您的浏览器不支持WebSocket");
- } else {
- console.log("您的浏览器支持WebSocket");
- console.log('发送消息为:{"fromUserId":"' + $("#userId").val() + '","toUserId":"' + $("#toUserId").val() + '","contentText":"' + $("#contentText").val() + '"}');
- socket.send('{"fromUserId":"' + $("#userId").val() + '","toUserId":"' + $("#toUserId").val() + '","contentText":"' + $("#contentText").val() + '"}');
- }
- }
- </script>
- <body>
- <p>【userId】:<div><input id="userId" name="userId" type="text" value="10"></div>
- <p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
- <p>【内容】:<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
- <p>【操作】:<div><button onclick="openSocket()">开启socket</button></div>
- <p>【操作】:<div><button onclick="sendMessage()">发送消息</button></div>
- </body>
- </html>
复制代码 6、消息实体
Message.java
- import lombok.Data;
- /**
- * Message
- * @author senfel
- * @version 1.0
- * @date 2024/5/17 14:39
- */
- @Data
- public class Message {
- /**
- * 消息编码
- */
- private String code;
- /**
- * 来自(保证唯一)
- */
- private String fromUserId;
- /**
- * 去自(保证唯一)
- */
- private String toUserId;
- /**
- * 内容
- */
- private String contentText;
- }
复制代码 7、Websocket设置类
WebSocketConfig.java
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
- /**
- * WebSocketConfig
- * @author senfel
- * @version 1.0
- * @date 2024/5/16 16:51
- */
- @Component
- public class WebSocketConfig {
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- System.out.println("启动websocket支持");
- return new ServerEndpointExporter();
- }
- }
- Websocket 服务类
- WebSocketServer.java
- import com.example.ccedemo.config.SpringUtil;
- import org.apache.commons.lang3.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.stereotype.Component;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.CopyOnWriteArraySet;
- /**
- * WebSocketServer
- * @author senfel
- * @version 1.0
- * @date 2024/5/16 16:59
- */
- @ConditionalOnClass(value = WebSocketConfig.class)
- @Component
- @ServerEndpoint("/ws/{deviceId}")
- public class WebSocketServer {
- protected Logger logger = LoggerFactory.getLogger(this.getClass());
- /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
- private Session session;
- /**
- * 设备ID
- */
- private String deviceId;
- /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
- 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
- 注:底下WebSocket是当前类名*/
- private static CopyOnWriteArraySet<WebSocketServer> webSockets =new CopyOnWriteArraySet<>();
- /**用来存在线连接用户信息*/
- private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>();
- /**
- * 链接成功调用的方法
- */
- @OnOpen
- public void onOpen(Session session, @PathParam(value="deviceId")String deviceId) {
- try {
- if(StringUtils.isEmpty(deviceId)||deviceId.equals("undefined")){
- return;
- }
- this.session = session;
- this.deviceId = deviceId;
- webSockets.add(this);
- sessionPool.put(deviceId, session);
- logger.info("【websocket消息】有新的连接,总数为:"+webSockets.size());
- StringBuffer stringBuffer = new StringBuffer();
- sessionPool.forEach((key, value) -> {
- stringBuffer.append(key).append(";");
- });
- logger.info("当前服务器连接有客户端有:"+stringBuffer.toString());
- } catch (Exception e) {
- }
- }
- /**
- * 链接关闭调用的方法
- */
- @OnClose
- public void onClose() {
- try {
- webSockets.remove(this);
- sessionPool.remove(this.deviceId);
- logger.info("【websocket消息】连接断开,总数为:"+webSockets.size());
- StringBuffer stringBuffer = new StringBuffer();
- sessionPool.forEach((key, value) -> {
- stringBuffer.append(key).append(";");
- });
- logger.info("当前服务器连接有客户端有:"+stringBuffer.toString());
- } catch (Exception e) {
- }
- }
- /**
- * 收到客户端消息后调用的方法
- * @param message
- */
- @OnMessage
- public void onMessage(String message) {
- logger.info("【websocket消息】收到客户端消息:"+message);
- SpringUtil.getBean(StringRedisTemplate.class).convertAndSend("webSocketMsgPush",message);
- }
- /** 发送错误时的处理
- * @param session
- * @param error
- */
- @OnError
- public void onError(Session session, Throwable error) {
- logger.error("用户错误,原因:"+error.getMessage());
- error.printStackTrace();
- }
- /**
- * 广播消息
- * @author senfel
- * @date 2024/5/17 17:10
- * @return void
- */
- public void sendAllMessage(String message) {
- logger.info("【websocket消息】广播消息:"+message);
- for(WebSocketServer webSocket : webSockets) {
- try {
- if(webSocket.session.isOpen()) {
- webSocket.session.getAsyncRemote().sendText(message);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 单点消息 单人
- * @param deviceId
- * @param message
- * @author senfel
- * @date 2024/5/17 17:10
- * @return void
- */
- public void sendOneMessage(String deviceId, String message) {
- Session session = sessionPool.get(deviceId);
- if (session != null&&session.isOpen()) {
- try {
- logger.info("【websocket消息】 单点消息:"+message);
- session.getAsyncRemote().sendText(message);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 单点消息
- * @param deviceId
- * @param object
- * @author senfel
- * @date 2024/5/17 17:10
- * @return void
- */
- public void sendOneObject(String deviceId, Object object) {
- Session session = sessionPool.get(deviceId);
- if (session != null&&session.isOpen()) {
- try {
- logger.info("【websocket消息】 单点消息(对象):"+object);
- session.getAsyncRemote().sendObject(object);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 单点消息(多人)
- * @param deviceIds
- * @param message
- * @author senfel
- * @date 2024/5/17 17:11
- * @return void
- */
- public void sendMoreMessage(String[] deviceIds, String message) {
- for(String deviceId:deviceIds) {
- Session session = sessionPool.get(deviceId);
- if (session != null&&session.isOpen()) {
- try {
- logger.info("【websocket消息】 单点消息:"+message);
- session.getAsyncRemote().sendText(message);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
复制代码 8、controller提供接口
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- import org.springframework.web.servlet.ModelAndView;
- /**
- * TestController
- * @author senfel
- * @version 1.0
- * @date 2024/5/17 17:49
- */
- @RestController
- @RequestMapping("/api/websocket")
- public class BaseController {
- @GetMapping("page")
- public ModelAndView page(Long userId){
- ModelAndView websocket = new ModelAndView("websocket");
- websocket.addObject("userId",userId);
- return websocket;
- }
- }
复制代码 Websoket集群负载均衡
大家都知道Websoket是一个长链接,在不断开的情况下服务端与客户端是可以自由通讯的,这是由于服务端缓存了会话。
如果我们后端采用集群部署,那么可能多个用户的缓存会话会分散在各个服务器上。在我们给指定用户推送消息时就有可能调用服务器上并没有这个用户的会话。
以是,我们引入Redis发布订阅,将消息举行转发到全部的服务端,只有有会话缓存的服务端才会乐成推送消息。讲到这里就比力明显了吧,完美解决Websoket负载均衡的问题。
1、maven引入Redis
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
复制代码 2、设置文件
- spring:
- redis:
- host: 127.0.0.1
- port: 6379
复制代码 3、Redis设置类
RedisConfig.java
- import com.fasterxml.jackson.annotation.JsonAutoDetect;
- import com.fasterxml.jackson.annotation.PropertyAccessor;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.springframework.cache.annotation.CachingConfigurerSupport;
- import org.springframework.cache.annotation.EnableCaching;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
- import org.springframework.data.redis.connection.RedisConnectionFactory;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.data.redis.listener.PatternTopic;
- import org.springframework.data.redis.listener.RedisMessageListenerContainer;
- import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
- import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
- import org.springframework.data.redis.serializer.StringRedisSerializer;
- /**
- * RedisConfig
- * @author senfel
- * @version 1.0
- * @date 2024/5/17 14:31
- */
- @Configuration
- @EnableCaching
- public class RedisConfig extends CachingConfigurerSupport {
- @Bean
- @Primary
- public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
- RedisTemplate<String, Object> template = new RedisTemplate<>();
- template.setConnectionFactory(factory);
- Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
- StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
- ObjectMapper om = new ObjectMapper();
- om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
- om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
- jacksonSeial.setObjectMapper(om);
- template.setValueSerializer(jacksonSeial);
- template.setKeySerializer(stringRedisSerializer);
- template.setHashKeySerializer(stringRedisSerializer);
- template.setHashValueSerializer(jacksonSeial);
- template.afterPropertiesSet();
- return template;
- }
- @Bean
- RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
- MessageListenerAdapter topicAdapter) {
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- //订阅了主题 webSocketMsgPush
- container.addMessageListener(topicAdapter, new PatternTopic("webSocketMsgPush"));
- return container;
- }
- /**
- * 消息监听器适配器,绑定消息处理器
- *
- * @return
- */
- @Bean
- MessageListenerAdapter topicAdapter() {
- return new MessageListenerAdapter(new RedisListener());
- }
- }
复制代码 4、Redis订阅监听
RedisListener.java
- import com.alibaba.fastjson.JSONObject;
- import com.example.ccedemo.config.SpringUtil;
- import org.springframework.data.redis.connection.Message;
- import org.springframework.data.redis.connection.MessageListener;
- /**
- * RedisListener
- * @author senfel
- * @version 1.0
- * @date 2024/5/17 14:37
- */
- public class RedisListener implements MessageListener {
- @Override
- public void onMessage(Message msg, byte[] bytes) {
- System.out.println(".监听到需要进行负载转发的消息:" + msg.toString());
- com.example.ccedemo.redissocket.Message message = JSONObject.parseObject(msg.toString(), com.example.ccedemo.redissocket.Message.class);
- SpringUtil.getBean(WebSocketServer.class).sendOneMessage(message.getToUserId(), message.getContentText());
- }
- }
复制代码 实战测试
我们本地启动两个服务,分别开启端口8888、9999,然后用nginx袒露7777端口做一个负载均衡。
IDEA启动两台服务端
设置nginx负载均衡
- #服务器url变量定义
- upstream api_service1 {
- server 192.168.1.4:8888;
- server 192.168.1.4:9999;
- }
- #nginx配置websocket
- map $http_upgrade $connection_upgrade {
- default upgrade;
- '' close;
- }
- server {
- listen 7777;
- large_client_header_buffers 4 16k;
- client_max_body_size 300m;
- client_body_buffer_size 128k;
- proxy_connect_timeout 600;
- proxy_read_timeout 600;
- proxy_send_timeout 600;
- proxy_buffer_size 64k;
- proxy_buffers 4 32k;
- proxy_busy_buffers_size 64k;
- proxy_temp_file_write_size 64k;
- proxy_http_version 1.1;
-
- root /demo/page/dist;
- index index.html;
-
-
- #api
- location /api/ {
- proxy_pass http://api_service1/api/;
- proxy_set_header Host $http_host;
- }
-
-
- #nginx配置websocket
- location /ws/ {
- proxy_http_version 1.1;
- proxy_pass http://api_service1/ws/;
- proxy_redirect off;
- proxy_set_header Host $host;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_read_timeout 3600s;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header Upgrade $http_upgrade;
- proxy_set_header Connection $connection_upgrade;
- }
-
- #解决页面刷新404
- location / {
- try_files $uri $uri/ @req;
- index index.html;
- }
- location @req {
- rewrite ^.*$ /index.html last;
- }
- }
复制代码 浏览器访问模拟对话
1、浏览器开启多个无痕界面
http://192.168.1.4:7777/api/websocket/page
模拟对话:
多个界面的用户ID互补
2、分别开启soket,由于nginx轮询策略会分别注册在两个服务端上
3、客户端相互发送消息验证
由以上图片可知,我们两个客户端相互对话能够接收到对方推送的消息。那么,由此也可以证明我们后端Websocke集群使用Redis发布订阅的方式搭建乐成。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |