使用WebSocket实现log日志流的实时展示-从轮询到通知

道家人  金牌会员 | 2024-8-3 01:51:22 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 576|帖子 576|积分 1728

场景介绍

最近开辟一个体系,其中一个模块必要展示实时的实验过程,过程日志大概比较多。从前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单轻易实现,但是比较斲丧资源,后端没有数据的时候,会造成大量的无用轮询。以是这次我们采用长连接的方案,优化这块的逻辑,提升用户体验。

WebSocket介绍

   参考:https://liaoxuefeng.com/books/java/spring/web/websocket/
  WebSocket 是一种基于 HTTP 的长连接技术。传统的 HTTP 协议采用请求-响应模子,浏览器不发送请求时,服务器无法主动推送数据给浏览器。因此,当必要定期或不定期向浏览器推送数据(例如股票行情或在线谈天)时,传统的 HTTP 协议只能通过浏览器的 JavaScript 定时轮询来实现。这种方法效率低下,且实时性不高。
由于 HTTP 本身基于 TCP 连接,WebSocket 在 HTTP 协议的基础上举行了简单的升级。创建 TCP 连接后,浏览器在发送请求时附带以下头部信息:
  1. GET /chat HTTP/1.1
  2. Host: www.example.com
  3. Upgrade: websocket
  4. Connection: Upgrade
复制代码
这表示客户端希望升级为长连接的 WebSocket。服务器返回升级成功的响应:
  1. HTTP/1.1 101 Switching Protocols
  2. Upgrade: websocket
  3. Connection: Upgrade
复制代码
收到成功响应后,WebSocket 握手即告完成。这意味着代表 WebSocket 的 TCP 连接将不会被服务器关闭,而是保持开放状态,服务器和浏览器可以随时相互推送消息。这些消息既可以是文本,也可以是二进制数据。通常,大多数应用步伐会发送基于 JSON 的文本消息。
当代浏览器均已支持 WebSocket 协议,服务器端则必要底层框架的支持。Java 的 Servlet 规范从 3.1 开始支持 WebSocket,因此,必须选择支持 Servlet 3.1 或更高版本的容器,才气使用 WebSocket。最新版本的 Tomcat、Jetty 等开源服务器均已支持 WebSocket。

实践演示

Java后端

我们以现实代码来演示如何在Springboot项目中实现对Websocket的支持。
step1: 添加websocket依靠

  1.         <dependency>
  2.             <groupId>org.springframework.boot</groupId>
  3.             <artifactId>spring-boot-starter-websocket</artifactId>
  4.         </dependency>
复制代码
step2: 增长配置

这个配置的重要作用是主动启动使用了注解==@ServerEndpoint==的类
  1. @Configuration
  2. @EnableWebSocket
  3. public class WebSocketConfiguration {
  4.     @Bean
  5.     public ServerEndpointExporter serverEndpointExporter() {
  6.         return new ServerEndpointExporter();
  7.     }
  8. }
复制代码
step3: 创建一个ws endpoint

  1. @ServerEndpoint(value = ChaosConst.CHAOS_WS_API + "/execute/log/{bizType}/{bizId}")
  2. @Component
  3. @Slf4j
  4. public class LogWsEndpoint implements Consumer<ChaosLogEvent> {
  5.     // 对话的标识
  6.     private String bizKey;
  7.     // 存储每个会话
  8.     private static final ConcurrentHashMap<String, List<LogWsEndpoint>> endpointMap = new ConcurrentHashMap<>();
  9.     // 将会话放入到线程池中,异步将数据返回给前端
  10.     private static ThreadPoolExecutor wsThreadPoolExecutor;
  11.     // 核心逻辑处理器
  12.     private ChaosLogEventHandler handler;
  13.         // 业务写和读log
  14.     private static ChaosLogger chaosLogger;
  15.     @Autowired
  16.     @Qualifier("wsThreadPoolExecutor")
  17.     public void setWsThreadPoolExecutor(ThreadPoolExecutor wsThreadPoolExecutor) {
  18.         if (null != wsThreadPoolExecutor) {
  19.             LogWsEndpoint.wsThreadPoolExecutor = wsThreadPoolExecutor;
  20.         }
  21.     }
  22.     @Autowired
  23.     public void setChaosLogger(ChaosLogger chaosLogger) {
  24.         if (null != chaosLogger) {
  25.             LogWsEndpoint.chaosLogger = chaosLogger;
  26.         }
  27.     }
  28.     @OnOpen
  29.     public void onOpen(Session session, @PathParam("bizType") String bizType, @PathParam("bizId") String bizId) {
  30.         this.bizKey = String.format("%s-%s", bizType, bizId);
  31.         log.info("[ws-chaos-log]连接建立中 ==> bizKey : {}", bizKey);
  32.         this.handler = new ChaosLogEventHandler(chaosLogger, session);
  33.         wsThreadPoolExecutor.submit(() -> flushMessage(bizType, bizId, true));
  34.         endpointMap.compute(bizKey, (key, value) -> {
  35.             List<LogWsEndpoint> ends = null == value ? new ArrayList<>() : value;
  36.             ends.add(this);
  37.             return ends;
  38.         });
  39.         log.info("[ws-chaos-log]连接建立成功: sessionId:{}, bizKey : {}",session.getId(), bizKey);
  40.     }
  41.     public void flushMessage(String bizType, String bizId, boolean force) {
  42.         this.handler.flushMessage(bizType, bizId, force);
  43.     }
  44.     @OnClose
  45.     public void onClose() {
  46.         log.info("websocket log server close");
  47.         if (StringUtils.isBlank(bizKey)) {
  48.             return;
  49.         }
  50.         endpointMap.compute(bizKey, (key, endpoints) -> {
  51.             if (null != endpoints) {
  52.                 endpoints.remove(this);
  53.             }
  54.             return endpoints;
  55.         });
  56.         log.info("[ws-chaos-log]连接关闭成功,关闭该连接信息:sessionId : {}, bizKey : {}。", handler.getSession().getId(), bizKey);
  57.     }
  58.     @OnMessage
  59.     public void onMessage(String message, Session session) throws IOException {
  60.         log.info("[ws-chaos-log]服务端收到客户端消息 ==> sessionId : {}, bizKey : {}, message : {}", handler.getSession().getId(), bizKey, message);
  61.     }
  62.     @OnError
  63.     public void onError(Session session, Throwable error) {
  64.         log.error("[ws-chaos-log]WebSocket发生错误,sessionId : {}, bizKey : {}", handler.getSession().getId(), bizKey);
  65.     }
  66.     @Override
  67.     public void accept(ChaosLogEvent chaosLogEvent) {
  68.         String contextId = String.format("%s-%s", chaosLogEvent.getBizType(), chaosLogEvent.getBizId());
  69.         log.info("accept chaosLogEvent : {}", JSON.toJSONString(chaosLogEvent));
  70.         List<LogWsEndpoint> logWsEndpoints = endpointMap.get(contextId);
  71.         if (CollectionUtils.isEmpty(logWsEndpoints)) {
  72.             return;
  73.         }
  74.         logWsEndpoints.forEach(endpoint -> wsThreadPoolExecutor.submit(() -> endpoint.flushMessage(chaosLogEvent.getBizType(), chaosLogEvent.getBizId(), true)));
  75.     }
  76. }
复制代码
==留意:上面有个accept()==方法,这个方法后面也会讲到,重要就是用于触发已经创建连接Websocket发送消息。
核心逻辑实现, 这里读取的日志文件是存储在百度云的ois,ois读取逻辑忽略。
  1. @Slf4j
  2. public class ChaosLogEventHandler {
  3.     private static final long READ_LOG_MOST_LEN = 1024 * 1024 * 5L; // 5M
  4.     private final ChaosLogger chaosLogger;
  5.     @Getter
  6.     private final Session session;
  7.     private final AtomicLong offset = new AtomicLong(-1L);
  8.     private final AtomicBoolean hasTruncated = new AtomicBoolean(false);
  9.     private final AtomicLong waitEventCnt = new AtomicLong(0L);
  10.     private final Lock lock = new ReentrantLock();
  11.     public ChaosLogEventHandler(ChaosLogger chaosLogger, Session session) {
  12.         this.chaosLogger = chaosLogger;
  13.         this.session = session;
  14.     }
  15.     public void flushMessage(String bizType, String bizId, boolean force) {
  16.         String bizKey = bizType + "-" + bizId;
  17.         if (!lock.tryLock()) {
  18.             waitEventCnt.incrementAndGet();
  19.             log.info("[WS]获取锁失败,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);
  20.             return;
  21.         }
  22.         try {
  23.             if (!force && waitEventCnt.getAndSet(0L) < 1) {
  24.                 log.info("[ws-chaos-log]没有待处理事件,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);
  25.                 // 没有待处理的事件
  26.                 return;
  27.             }
  28.             log.info("[ws-chaos-log]向客户端刷新数据 ==> sessionId : {}, bizKey : {}, offset : {}", session.getId(), bizKey, offset.get());
  29.             if (offset.get() < 0) {
  30.                 long contentLength = chaosLogger.getLogContentLength(bizType, bizId);
  31.                 log.info("[ws-chaos-log]contentLength = {} for bizLogKey {}", contentLength, bizKey);
  32.                 if (contentLength == 0) {
  33.                     return;
  34.                 }
  35.                 if (contentLength > READ_LOG_MOST_LEN) {
  36.                     offset.set(contentLength - READ_LOG_MOST_LEN);
  37.                     hasTruncated.set(true);
  38.                     log.info("[ws-chaos-log]文件过大,截取最后10M ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());
  39.                 } else {
  40.                     offset.set(0L);
  41.                 }
  42.             } else if (!force) {
  43.                 long contentLength = chaosLogger.getLogContentLength(bizType, bizId);
  44.                 if (contentLength <= offset.get()) {
  45.                     log.info("[ws-chaos-log]文件长度小于offset,无需刷新 ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());
  46.                     return;
  47.                 }
  48.             }
  49.             // 读取日志内容
  50.             BosObject bosObject = chaosLogger.readLogObject(bizType, bizId, offset.get(), Long.MAX_VALUE);
  51.             try (BufferedReader reader = new BufferedReader(new InputStreamReader(bosObject.getObjectContent()))) {
  52.                 String line = null;
  53.                 while (null != (line = reader.readLine())) {
  54.                     if (hasTruncated.get()) {
  55.                         hasTruncated.set(false);
  56.                         log.info("[ws-chaos-log]hasTruncated changed to false");
  57.                     } else {
  58.                         log.info("[ws-chaos-log]send ws msg:{}", line);
  59.                         try {
  60.                             session.getBasicRemote().sendText(line + "\n");
  61.                         } catch (IllegalStateException e) {
  62.                             log.info("[ws-chaos-log]发送消息过程中连接状态异常,跳过", e);
  63.                         }
  64.                     }
  65.                     // +1是因为每一行结尾会有一个回车
  66.                     offset.addAndGet(line.getBytes(StandardCharsets.UTF_8).length + 1);
  67.                 }
  68.             } catch (IOException e) {
  69.                 log.error("", e);
  70.             }
  71.         } catch (NotFoundException e) {
  72.             log.info("[ws-chaos-log]未找到数据,无需向客户端同步,bizKey:{}", bizKey, e);
  73.         } catch (RuntimeException e) {
  74.             log.error("", e);
  75.         } finally {
  76.             lock.unlock();
  77.         }
  78.         log.info("[ws-chaos-log]向客户端刷新数据,完成 ==> sessionId : {}, bizKey : {}", session.getId(), bizKey);
  79.         // 在处理过程中,可能又有新的事件,所以再次尝试刷新数据
  80.         flushMessage(bizType, bizKey, false);
  81.     }
  82. }
复制代码
stept5: 广播事件,全局监听

前后端创建连接的时候,绑定了后端一台机器,但是配景一样平常都是多台服务器,如果事件传递到其他服务器,那么已经创建的连接如何监听到并返回内呢?
这里使用了rocketmq的机制,每台机器都会监听到事件的变革,从而触发当前机器将变更内容发回到前端。
  1. @Component
  2. @RocketMQMessageListener(topic = "EXECUTE_FLOW_LOG", selectorExpression = "log", consumerGroup = "flow-log", messageModel = MessageModel.BROADCASTING)
  3. @Slf4j
  4. public class ChaosLogEventConsumer implements RocketMQListener<String> {
  5.     @Autowired(required = false)
  6.     private List<Consumer<ChaosLogEvent>> chaosLogEventConsumers = Collections.emptyList();
  7.     @Override
  8.     public void onMessage(String message) {
  9.         log.info("[MQ]receive ChaosLogEvent message:{}", message);
  10.         ChaosLogEvent event = JsonUtils.fromJson(message, ChaosLogEvent.class);
  11.         for (Consumer<ChaosLogEvent> consumer : chaosLogEventConsumers) {
  12.             try {
  13.                 consumer.accept(event);
  14.             } catch (RuntimeException e) {
  15.                 log.error("[MQ] failed consume ChaosLogEvent message,consumer:" + consumer.getClass(), e);
  16.             }
  17.         }
  18.     }
  19. }
复制代码
前端代码实现

以react为例,仅供参考:
  1. export const fetchExecuteLogs = (bizType: string, bizId: any, logsRef: any, setLogs: any) => {
  2.   if (!bizType || !bizId) {
  3.     console.log('fetchLogs: logContextToken or node is null')
  4.     return
  5.   }
  6.   setLogs([])
  7.   if (logsRef.current[0]) {
  8.     console.log('close ws')
  9.     logsRef.current[0].close()
  10.   }
  11.   let host = wsHost ? wsHost : window.location.host
  12.   let protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'
  13.   let client = new WebSocket(`${protocol}://${host}/ws/ark/chaos/execute/log/${bizType}/${bizId}`)
  14.   logsRef.current = [client, []]
  15.   // 报错的回调函数
  16.   client.onerror = (e: any) => {
  17.     console.log('Connection Error')
  18.     console.log(e)
  19.   }
  20.   //链接打开的回调函数
  21.   client.onopen = () => {
  22.     console.log('WebSocket Client Connected')
  23.   }
  24.   //链接关闭的回调函数
  25.   client.onclose = () => {
  26.     console.log('echo-protocol Client Closed')
  27.   }
  28.   //收到消息的处理函数
  29.   client.onmessage = (e: any) => {
  30.     if (logsRef.current[0] === client) {
  31.       if (typeof e.data === 'string') {
  32.         let newLogs = [...logsRef.current[1], e.data]
  33.         if (newLogs.length > 250) {
  34.           newLogs = newLogs.slice(200)
  35.         }
  36.         setLogs(newLogs)
  37.         logsRef.current = [client, newLogs]
  38.       }
  39.     } else {
  40.       client.close()
  41.     }
  42.   }
  43.   const sendPing = () => {
  44.     if (logsRef.current[0] === client) {
  45.       const data = { message: 'heartbeat' }
  46.       client.send(JSON.stringify(data))
  47.       setTimeout(sendPing, 10000)
  48.     }
  49.   }
  50.   setTimeout(sendPing, 10000)
  51. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

道家人

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表