springboot集成websokcet+H5开发聊天原型(一)

打印 上一主题 下一主题

主题 900|帖子 900|积分 2700

1. 团体思绪

群组聊天功能实现思绪



  • 需要为每个群组维护一个对应的集合(可以是 Set 等数据布局),用来存放该群组内所有在线用户的 WebSocketSession。当有消息发送到群组时,遍历该群组对应的集合,向其中的每个在线用户发送消息。
  • 在消息布局体中新增一个字段用于标识所属群组,以便后端根据这个字段来举行消息的广播分发。
离线用户处置惩罚及汗青消息推送思绪



  • 对于离线用户,当他们重新上线时,需要能够辨认出他们之前所在的群组(可以通过用户登录等操纵记录其关联群组信息)。
  • 后端要将该群组在其离线期间产生的汗青消息查询出来(这可能涉及到数据库操纵,将群组聊天消息存储到数据库中以便查询汗青记录),然后通过 WebSocket 连接将这些汗青消息逐一发送给重新上线的用户。
后端代码修改思绪

1. 群组管理与消息处置惩罚




  • 群组数据布局:使用符合的数据布局(如 Map)来存储群组相关信息,以群组 ID 作为键,对应的值可以是包含该群组内在线用户 WebSocketSession 列表以及群组汗青消息列表等信息的对象。
  • 消息格式定义:明确消息的格式,使其能区分是文字消息还是图片消息,并且包含须要的元数据,好比发送者、群组 ID、消息内容(文字内容或图片链接等)、时间戳等。
  • 消息分发逻辑:当吸收到消息时,根据消息中的群组 ID,找到对应的群组在线用户列表,然后将消息发送给这些用户。
2. 离线用户汗青消息处置惩罚




  • 用户与群组关联记录:维护用户与所属群组的关联关系,好比使用 Map 存储用户 ID 和其所属群组 ID 列表的对应关系,以便在用户重新上线时确定需要推送哪些群组的汗青消息。
  • 汗青消息存储与查询:将群组内的聊天消息持久化存储(现实应用中通常是存入数据库,这里可简单模拟存储布局),当离线用户重新上线时,从存储布局中查询出其所属群组的汗青消息并推送给他。

直接 springboot+websokcet,感觉比原生的websocket简单一点。

  •  集成websokcet
  •  设置文件
  •  handler
  • postman测试一下
  •  uniapp
pom添加依赖:
  1.         <dependency>
  2.             <groupId>org.springframework.boot</groupId>
  3.             <artifactId>spring-boot-starter-websocket</artifactId>
  4.         </dependency>
复制代码
application.yml中 端口设置(先复用应用的端口吧)
  1. server:
  2.   port: 7877
  3.   tomcat:
  4.     accept-count: 10000
  5.     threads:
  6.       max: 800
  7.       min-spare: 200
  8.   compression:
  9.     enabled: true
  10.   servlet:
  11.     context-path: /chat
复制代码
设置文件(application.yml)



  • 务器端口及相关设置

    • server.port 设置为 7877,指定了 Spring Boot 应用启动后监听的端口号。
    • server.tomcat.accept-count 设置为 10000,它表示当所有的处置惩罚线程都在使用时,能够放到处置惩罚队列中的连接哀求数量。server.tomcat.threads.max 设为 800 定义了最大线程数,min-spare 设为 200 则是最小备用线程数,这些设置用于优化 Tomcat 处置惩罚哀求的线程资源分配。
    • server.compression.enabled 设为 true,开启了服务器相应内容的压缩功能,有助于镌汰网络传输的数据量,进步性能。
    • server.servlet.context-path 设置为 /chat,意味着应用的上下文路径是 /chat,后续访问应用中的资源路径都是基于这个上下文路径来构建的。

WebSocket 设置类(WebSocketConfig)



  • 这个类实现了 WebSocketConfigurer 接口,用于设置 WebSocket 相关的处置惩罚。
  • 在 registerWebSocketHandlers 方法中,将自定义的 MyWebSocketHandler 注册到了 WebSocket 处置惩罚器注册表 WebSocketHandlerRegistry 中,并且将 WebSocket 的端点路径设置为 /websocket,同时答应来自任意源(setAllowedOrigins("*"))的连接访问该 WebSocket 端点。
  1. package com.edwin.java.config;
  2. import com.edwin.java.config.interceptor.GroupChatInterceptor;
  3. import com.edwin.java.util.MyWebSocketHandler;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  6. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  7. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  8. @Configuration
  9. @EnableWebSocket
  10. public class WebSocketConfig implements WebSocketConfigurer {
  11.     /**
  12.      * registerWebSocketHandlers 是一个函数或方法,通常用于在 Web 应用程序中注册 WebSocket 处理程序。
  13.      * WebSocket 是一种基于 TCP 的协议,可以实现客户端和服务器之间的双向通信,可以用于实时应用程序,如聊天应用、游戏、实时更新等。在 Java Web 应用程序中,可以使用 Spring 框架提供的 WebSocket 支持来处理 WebSocket 连接。
  14.      * registerWebSocketHandlers 方法是 Spring WebSocket 的一个 API,它允许开发人员在应用程序中注册 WebSocket 处理程序,并将其映射到特定的 URI。在调用 registerWebSocketHandlers 方法时,需要传递一个 WebSocketHandler 实例和一个 URI 路径作为参数。当客户端请求与该 URI 路径对应的 WebSocket 连接时,Spring 将调用相应的 WebSocket 处理程序来处理连接。
  15.      */
  16.     @Override
  17.     public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  18.         //参数1:注册我们自定义的MyWebSocketHandler类
  19.         //参数2:路径【UniApp中建立连接的路径】如:我的ip是192.168.1.8:8099则UniApp需要输入的url是ws://192.168.1.8:8099/websocket
  20.         //参数3:setAllowedOrigins("*")设置允许全部来源【在WebSocket中,浏览器会发送一个带有Origin头部的HTTP请求来请求建立WebSocket连接。服务器可以使用setAllowedOrigins方法来设置允许的来源,即允许建立WebSocket连接的域名或IP地址。这样,服务器就可以限制建立WebSocket连接的客户端,避免来自不信任的域名或IP地址的WebSocket连接。】
  21.         registry.addHandler(new MyWebSocketHandler(), "/websocket").setAllowedOrigins("*").addInterceptors(new GroupChatInterceptor());;
  22.     }
  23. }
复制代码
WebSocket 处置惩罚器类(MyWebSocketHandler)



  • 连接建立

    • afterConnectionEstablished 方法在 WebSocket 连接建立后被调用,会记录连接乐成的日志信息,并将对应的 WebSocketSession 添加到 sessions 列表中,用于后续管理连接会话。

  • 消息处置惩罚

    • handleMessage 方法吸收到消息时,会记录消息内容日志,然后遍历所有已连接的会话,实验向每个客户端发送一条固定格式的消息(这里只是简单示例性子的消息)。
    • handleTextMessage 方法针对文本消息做更具体的处置惩罚,会对收到的哀求消息举行转义和记录日志,然后构造相应消息并发送回对应的客户端会话。

  • 定时消息发送

    • 通过 @Scheduled(fixedRate = 10000) 注解定义了一个定时任务,每隔 10000 毫秒(即 10 秒)会遍历所有连接会话,假如会话处于打开状态,就向其发送一条包含当前时间的广播消息。

  • 连接关闭及其他

    • afterConnectionClosed 方法在 WebSocket 连接关闭时被调用,负责从 sessions 列表中移除对应的会话,并记录连接关闭的日志。
    • supportsPartialMessages 方法返回 false,表示不支持部门消息处置惩罚。
    • handleTransportError 方法用于处置惩罚 WebSocket 传输过程中的错误,会记录相应的错误日志。

  1. package edu.yzu.testspringboot002.handler;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.scheduling.annotation.Scheduled;
  5. import org.springframework.stereotype.Component;
  6. import org.springframework.web.socket.*;
  7. import org.springframework.web.socket.handler.TextWebSocketHandler;
  8. import java.io.IOException;
  9. import java.util.List;
  10. import java.util.concurrent.CopyOnWriteArrayList;
  11. import java.time.LocalDateTime;
  12. import java.util.ArrayList;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. import org.springframework.web.socket.WebSocketSession;
  16. import com.fasterxml.jackson.databind.ObjectMapper;
  17. @Component
  18. public class MyWebSocketHandler extends TextWebSocketHandler {
  19.     private static final Logger LOGGER = LoggerFactory.getLogger(MyWebSocketHandler.class);
  20.     // 用于存储群组信息,键为群组ID,值包含在线用户会话列表和历史消息列表
  21.     private Map<String, GroupInfo> groupInfos = new HashMap<>();
  22.     // 用于存储用户与群组的关联关系,键为用户ID,值为群组ID列表  一个用户可以加入多个群组   它是一个Map,键是用户ID,值是群组ID列表
  23.     private Map<String, List<String>> userGroups = new HashMap<>();
  24.     private Map<String,String> sessionId_userId =new HashMap<>(); //存放  sessionId  与  userId 的map
  25.     private ObjectMapper objectMapper = new ObjectMapper();
  26.     private final List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
  27.     //
  28.     public int checkUserIdAndSessionId(String sessionId,String userId ) {
  29.         // 遍历userId_SessionId(虽然在这个场景中并不需要遍历)
  30.         for (Map.Entry<String, String> entry : sessionId_userId.entrySet()) {
  31.             String   storedSessionId = entry.getKey();
  32.             String storedUserId = entry.getValue();
  33.             // 检查userId是否存在
  34.             if (storedUserId.equals(userId)) {
  35.                 // 如果存在,进一步判断对应的sessionId是否和参数sessionId相等
  36.                 if (storedSessionId.equals(sessionId)) {
  37.                     return 0; // 相等返回0
  38.                 } else {
  39.                     return 2; // 不等返回2
  40.                 }
  41.             }
  42.         }
  43.         // 其他情况返回3(即userId不存在)
  44.         return 3;
  45.     }
  46.     /**
  47.      * afterConnectionEstablished 是一个 WebSocket API 中的回调函数,它是在建立 WebSocket 连接之后被调用的。
  48.      * 当 WebSocket 连接建立成功后,浏览器会发送一个握手请求给服务器端,如果服务器成功地接受了该请求,那么连接就会被建立起来
  49.      */
  50.     @Override
  51.     public void afterConnectionEstablished(WebSocketSession session) throws Exception {
  52.         LOGGER.info("WebSocket已连接: {}", session.getId());
  53.         // 假设从WebSocket连接的属性或者请求参数中获取用户ID和群组ID列表(实际需按业务逻辑调整获取方式)
  54.         // 理想情况下 应该是从请求参数中获取userId和groupId   我们在每次连接的时候都会传递userId和groupId   但是但是 我们先不在这里传递,每次收到消息的时候再传递
  55. //        String userId = (String) session.getAttributes().get("userId");
  56. //        List<String> groupIds = (List<String>) session.getAttributes().get("groupIds");
  57. //
  58. //        if (userId!= null && groupIds!= null) {
  59. //            userGroups.put(userId, groupIds);
  60. //            for (String groupId : groupIds) {
  61. //                // 用于存储群组信息,键为群组ID,值包含在线用户会话列表和历史消息列表
  62. //                groupInfos.computeIfAbsent(groupId, k -> new GroupInfo()).addSession(session);
  63. //            }
  64. //        }
  65.        //System.out.println("groupInfos: " + groupInfos);
  66.     }
  67.     /**
  68.      * handleMessage 是 WebSocket API 中的回调函数,它是用来处理从客户端接收到的 WebSocket 消息的。
  69.      * 当客户端通过 WebSocket 连接发送消息到服务器端时,服务器端会自动调用 handleMessage 函数并传递收到的消息作为参数,你可以在该函数中处理这个消息,并根据需要向客户端发送一些响应消息。
  70.      */
  71.     @Override
  72.     public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
  73.         LOGGER.info("WebSocket收到的消息: {}", message.getPayload());
  74.         // 将消息反序列化,假设消息是JSON格式,这里解析为Message对象(下面定义)
  75.         Message msg = objectMapper.readValue(message.getPayload().toString(), Message.class);
  76.         String groupId = msg.getGroupId();
  77.         String userId=msg.getUserId();
  78.         int flag=this.checkUserIdAndSessionId(session.getId(),userId);  //检查userId和sessionId是否匹配
  79.         //flag==0 说明userId和sessionId匹配 ,不做更多的操作 直接执行下面的代码
  80.         if(flag==2){   //userId一致 但是 sessionId不一致,说明用户重新建立了session,要把原来该用户相关的session等全部删除掉
  81.               //找到这个用户下原来所有的session,然后全部删除掉
  82.             for (Map.Entry<String, String> entry : sessionId_userId.entrySet()) {
  83.                 String storedSessionId = entry.getKey();
  84.                 String storedUserId = entry.getValue();
  85.                 if (storedUserId.equals(userId)) {
  86.                     sessionId_userId.remove(storedSessionId);
  87.                 }
  88.                 //by userId remove all the groupInfo
  89.                 List<String> groupIds = userGroups.get(userId);
  90.                 if (groupIds!= null) {
  91.                     for (String groupId1 : groupIds) {
  92.                         GroupInfo groupInfo = groupInfos.get(groupId1);
  93.                         if (groupInfo!= null) {
  94.                             groupInfo.removeSession(session);
  95.                         }
  96.                     }
  97.                     userGroups.remove(userId);
  98.                 }
  99.             }
  100.         }else if(flag==3){  //userId不存在  session不存在 ,用户第一次建立连接
  101.           //do nothing   放在这里就是为了说明flag==3的情况
  102.         }
  103.         if(flag==2 || flag==3){
  104.             sessionId_userId.put(session.getId(),userId);    //sessionId 与 userId 建立映射  是1对1的关系
  105.             List<String> groupIds = userGroups.computeIfAbsent(userId, k -> new ArrayList<>());  // 获取用户的群组ID列表
  106.             groupIds.add(groupId);
  107.         }
  108.         GroupInfo groupInfo = groupInfos.get(groupId);
  109.         if (groupInfo == null) {
  110.             // 如果群组信息不存在,则创建新的群组信息,并添加当前用户的WebSocketSession
  111.             groupInfo = new GroupInfo();
  112.             groupInfo.addSession(session);
  113.             groupInfos.put(groupId, groupInfo);
  114.             // 同时,假设这里从WebSocket连接的属性或者请求参数中获取用户ID(实际需按业务逻辑调整获取方式)
  115.             //String userId = (String) session.getAttributes().get("userId");
  116. //            List<String> groupIds = userGroups.computeIfAbsent(userId, k -> new ArrayList<>());  // 获取用户的群组ID列表
  117. //            groupIds.add(groupId);
  118.         }else {
  119.             groupInfo.addSession(session);  //addSession()方法会检查是否已经存在,如果存在就不会再添加
  120.         }
  121.         // 将消息添加到群组历史消息列表
  122.         groupInfo.addHistoryMessage(msg);
  123.         // 向群组内所有在线用户发送消息
  124.         List<WebSocketSession> sessions = groupInfo.getSessions();
  125.         for (WebSocketSession s : sessions) {
  126.             try {
  127.                 s.sendMessage(new TextMessage(objectMapper.writeValueAsString(msg)));
  128.             } catch (IOException e) {
  129.                 LOGGER.error("无法发送WebSocket消息", e);
  130.             }
  131.         }
  132.     }
  133.     @Override
  134.     public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {  // 处理文本消息
  135.         handleMessage(session, message);
  136.     }
  137.     @Scheduled(fixedRate = 10000)
  138.     void sendPeriodicMessages() throws IOException {
  139.         // 这里可扩展定时向群组推送系统消息等功能,暂不做详细修改
  140.         for (GroupInfo groupInfo : groupInfos.values()) {
  141.             List<WebSocketSession> sessions = groupInfo.getSessions();
  142.             for (WebSocketSession s : sessions) {
  143.                 if (s.isOpen()) {
  144.                     String broadcast = "server periodic message " + LocalDateTime.now();
  145.                     LOGGER.info("Server sends: {}", broadcast);
  146.                     s.sendMessage(new TextMessage(broadcast));
  147.                 }
  148.             }
  149.         }
  150.     }
  151.     // 处理用户重新上线,推送历史消息的方法
  152.     public void handleUserReconnect(String userId, WebSocketSession session) {
  153.         List<String> groupIds = userGroups.get(userId);
  154.         if (groupIds!= null) {
  155.             for (String groupId : groupIds) {
  156.                 GroupInfo groupInfo = groupInfos.get(groupId);
  157.                 if (groupInfo!= null) {
  158.                     List<Message> historyMessages = groupInfo.getHistoryMessages();
  159.                     for (Message historyMessage : historyMessages) {
  160.                         try {
  161.                             session.sendMessage(new TextMessage(objectMapper.writeValueAsString(historyMessage)));
  162.                         } catch (IOException e) {
  163.                             LOGGER.error("无法发送历史消息给重新上线用户", e);
  164.                         }
  165.                     }
  166.                 }
  167.             }
  168.         }
  169.     }
  170.     /**
  171.      * afterConnectionClosed 是 WebSocket API 中的回调函数,它是在 WebSocket 连接关闭后被调用的。
  172.      * 当客户端或服务器端主动关闭 WebSocket 连接时,afterConnectionClosed 回调函数会被调用,你可以在该函数中执行一些资源释放、清理工作等操作,比如关闭数据库连接、清理缓存等。
  173.      */
  174.     @Override
  175.     public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
  176.         LOGGER.info("WebSocket已断开连接: {}", session.getId());
  177.         //应该根据sessionid 出发,找到userId,且把该用户名下的所有的GroupInfo  中的 session 全部删除掉
  178.         // 移除用户会话在各个群组中的关联
  179.         //String userId = (String) session.getAttributes().get("userId");
  180.         //query userId by sessionId
  181.         String userId = sessionId_userId.get(session.getId());
  182.         List<String> groupIds = userGroups.get(userId);
  183.         if (groupIds!= null) {
  184.             for (String groupId : groupIds) {
  185.                 GroupInfo groupInfo = groupInfos.get(groupId);
  186.                 if (groupInfo!= null) {
  187.                     groupInfo.removeSession(session);
  188.                 }
  189.             }
  190.             userGroups.remove(userId);
  191.         }
  192.     }
  193.     /**
  194.      * supportsPartialMessages 是 WebSocket API 中的方法,它用来指示 WebSocket 消息是否支持分段传输。
  195.      * WebSocket 消息可以分段传输,也就是说一个消息可以被分成多个部分依次传输,这对于大型数据传输和流媒体传输非常有用。当消息被分成多个部分传输时,WebSocket 会自动将这些部分合并成完整的消息。
  196.      * supportsPartialMessages 方法用来指示服务器是否支持分段消息传输,如果支持,则可以在接收到部分消息时开始处理消息,否则需要等待接收到完整消息后才能开始处理。
  197.      */
  198.     @Override
  199.     public boolean supportsPartialMessages() {
  200.         return false;
  201.     }
  202.     /**
  203.      * handleTransportError 是 WebSocket API 中的回调函数,它用来处理 WebSocket 传输层出现错误的情况。
  204.      *当 WebSocket 传输层出现错误,比如网络中断、协议错误等,WebSocket 会自动调用 handleTransportError 函数,并传递相应的错误信息。在该函数中,我们可以处理这些错误,比如关闭 WebSocket 连接、记录错误日志等。
  205.      */
  206.     @Override
  207.     public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
  208.         LOGGER.error("WebSocket错误", exception);
  209.     }
  210.     // 在MyWebSocketHandler类内部定义
  211.     private class GroupInfo {
  212.         // 存储群组内所有在线用户的WebSocketSession列表
  213.         private List<WebSocketSession> sessions = new ArrayList<>();
  214.         // 存储群组的历史消息列表,消息以自定义的Message对象形式存储(前面代码中已定义Message类)
  215.         private List<Message> historyMessages = new ArrayList<>();
  216.         // 添加一个用户的WebSocketSession到群组的在线用户列表中
  217.         public void addSession(WebSocketSession session) {
  218.             if(0==sessions.size()){
  219.                 sessions.add(session);
  220.             }else {
  221.                 //iterator sessions, if the session is already in the list, do not add it again,else add it
  222.                 for (WebSocketSession s : sessions) {
  223.                     if (s.getId().equals(session.getId())) {
  224.                         return;
  225.                     }
  226.                 }
  227.                 sessions.add(session);
  228.             }
  229.         }
  230.         // 从群组的在线用户列表中移除指定用户的WebSocketSession
  231.         public void removeSession(WebSocketSession session) {
  232.             sessions.remove(session);
  233.         }
  234.         // 向群组的历史消息列表中添加一条消息
  235.         public void addHistoryMessage(Message message) {
  236.             historyMessages.add(message);
  237.         }
  238.         // 获取群组内所有在线用户的WebSocketSession列表
  239.         public List<WebSocketSession> getSessions() {
  240.             return sessions;
  241.         }
  242.         // 获取群组的历史消息列表
  243.         public List<Message> getHistoryMessages() {
  244.             return historyMessages;
  245.         }
  246.     }
  247.     // 定义消息类,包含必要的消息属性,可根据实际需求扩展
  248.     private static class Message {
  249.         private String type; // 消息类型,如 "text" 或 "image"
  250.         private String groupId; // 群组ID
  251.         private String sender; // 发送者(可根据实际情况完善,比如用户ID等)
  252.         private String content; // 消息内容,文字或图片链接等
  253.         public String getUserId() {
  254.             return userId;
  255.         }
  256.         public void setUserId(String userId) {
  257.             this.userId = userId;
  258.         }
  259.         private String userId; // 新增字段
  260.         // private LocalDateTime timestamp = LocalDateTime.now(); // 时间戳
  261.         // 生成必要的Getter和Setter方法(可使用Lombok简化代码,此处为清晰展示手动编写)
  262.         public String getType() {
  263.             return type;
  264.         }
  265.         public void setType(String type) {
  266.             this.type = type;
  267.         }
  268.         public String getGroupId() {
  269.             return groupId;
  270.         }
  271.         public void setGroupId(String groupId) {
  272.             this.groupId = groupId;
  273.         }
  274.         public String getSender() {
  275.             return sender;
  276.         }
  277.         public void setSender(String sender) {
  278.             this.sender = sender;
  279.         }
  280.         public String getContent() {
  281.             return content;
  282.         }
  283.         public void setContent(String content) {
  284.             this.content = content;
  285.         }
  286. //        public LocalDateTime getTimestamp() {
  287. //            return timestamp;
  288. //        }
  289. //
  290. //        public void setTimestamp(LocalDateTime timestamp) {
  291. //            this.timestamp = timestamp;
  292. //        }
  293.     }
  294. }
复制代码


 postman验证:

注意,路径中要加chat ,由于application.yml中设置了

  背景message布局:
  1.     private static class Message {
  2.         private String type; // 消息类型,如 "text" 或 "image"
  3.         private String groupId; // 群组ID
  4.         private String sender; // 发送者(可根据实际情况完善,比如用户ID等)
  5.         private String content; // 消息内容,文字或图片链接等
  6.         ......
  7.     }
复制代码
前端发送消息也要和这个对应:
  1.   sendMessage(message) {
  2.       this.socket.send(JSON.stringify({
  3.           type: 'message',
  4.           userId: this.userId,
  5.           groupId:this.groupId,   // todo
  6.         ...message,
  7.       }));
  8.   }
复制代码
 服务端socket每次收到消息,先执行如下游程,(这个流程可以到websocket拦截器中去执行?)。




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

商道如狼道

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表