道家人 发表于 2024-8-3 01:51:22

使用WebSocket实现log日志流的实时展示-从轮询到通知

场景介绍

最近开辟一个体系,其中一个模块必要展示实时的实验过程,过程日志大概比较多。从前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单轻易实现,但是比较斲丧资源,后端没有数据的时候,会造成大量的无用轮询。以是这次我们采用长连接的方案,优化这块的逻辑,提升用户体验。
https://i-blog.csdnimg.cn/direct/a0fce847f7a443c89dca23c1e024e4c9.png
WebSocket介绍

   参考:https://liaoxuefeng.com/books/java/spring/web/websocket/
WebSocket 是一种基于 HTTP 的长连接技术。传统的 HTTP 协议采用请求-响应模子,浏览器不发送请求时,服务器无法主动推送数据给浏览器。因此,当必要定期或不定期向浏览器推送数据(例如股票行情或在线谈天)时,传统的 HTTP 协议只能通过浏览器的 JavaScript 定时轮询来实现。这种方法效率低下,且实时性不高。
由于 HTTP 本身基于 TCP 连接,WebSocket 在 HTTP 协议的基础上举行了简单的升级。创建 TCP 连接后,浏览器在发送请求时附带以下头部信息:
GET /chat HTTP/1.1
Host: www.example.com
Upgrade: websocket
Connection: Upgrade
这表示客户端希望升级为长连接的 WebSocket。服务器返回升级成功的响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
收到成功响应后,WebSocket 握手即告完成。这意味着代表 WebSocket 的 TCP 连接将不会被服务器关闭,而是保持开放状态,服务器和浏览器可以随时相互推送消息。这些消息既可以是文本,也可以是二进制数据。通常,大多数应用步伐会发送基于 JSON 的文本消息。
当代浏览器均已支持 WebSocket 协议,服务器端则必要底层框架的支持。Java 的 Servlet 规范从 3.1 开始支持 WebSocket,因此,必须选择支持 Servlet 3.1 或更高版本的容器,才气使用 WebSocket。最新版本的 Tomcat、Jetty 等开源服务器均已支持 WebSocket。
https://i-blog.csdnimg.cn/direct/f3a4a83361214647b5c3c8ff9d799d30.png
实践演示

Java后端

我们以现实代码来演示如何在Springboot项目中实现对Websocket的支持。
step1: 添加websocket依靠

      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
      </dependency>
step2: 增长配置

这个配置的重要作用是主动启动使用了注解==@ServerEndpoint==的类
@Configuration
@EnableWebSocket
public class WebSocketConfiguration {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
      return new ServerEndpointExporter();
    }
}
step3: 创建一个ws endpoint

@ServerEndpoint(value = ChaosConst.CHAOS_WS_API + "/execute/log/{bizType}/{bizId}")
@Component
@Slf4j
public class LogWsEndpoint implements Consumer<ChaosLogEvent> {
    // 对话的标识
    private String bizKey;
    // 存储每个会话
    private static final ConcurrentHashMap<String, List<LogWsEndpoint>> endpointMap = new ConcurrentHashMap<>();
    // 将会话放入到线程池中,异步将数据返回给前端
    private static ThreadPoolExecutor wsThreadPoolExecutor;
    // 核心逻辑处理器
    private ChaosLogEventHandler handler;


        // 业务写和读log
    private static ChaosLogger chaosLogger;

    @Autowired
    @Qualifier("wsThreadPoolExecutor")
    public void setWsThreadPoolExecutor(ThreadPoolExecutor wsThreadPoolExecutor) {
      if (null != wsThreadPoolExecutor) {
            LogWsEndpoint.wsThreadPoolExecutor = wsThreadPoolExecutor;
      }
    }

    @Autowired
    public void setChaosLogger(ChaosLogger chaosLogger) {
      if (null != chaosLogger) {
            LogWsEndpoint.chaosLogger = chaosLogger;
      }
    }

    @OnOpen
    public void onOpen(Session session, @PathParam("bizType") String bizType, @PathParam("bizId") String bizId) {
      this.bizKey = String.format("%s-%s", bizType, bizId);
      log.info("连接建立中 ==> bizKey : {}", bizKey);
      this.handler = new ChaosLogEventHandler(chaosLogger, session);
      wsThreadPoolExecutor.submit(() -> flushMessage(bizType, bizId, true));

      endpointMap.compute(bizKey, (key, value) -> {
            List<LogWsEndpoint> ends = null == value ? new ArrayList<>() : value;
            ends.add(this);
            return ends;
      });
      log.info("连接建立成功: sessionId:{}, bizKey : {}",session.getId(), bizKey);
    }


    public void flushMessage(String bizType, String bizId, boolean force) {
      this.handler.flushMessage(bizType, bizId, force);
    }


    @OnClose
    public void onClose() {
      log.info("websocket log server close");
      if (StringUtils.isBlank(bizKey)) {
            return;
      }

      endpointMap.compute(bizKey, (key, endpoints) -> {
            if (null != endpoints) {
                endpoints.remove(this);
            }
            return endpoints;
      });
      log.info("连接关闭成功,关闭该连接信息:sessionId : {}, bizKey : {}。", handler.getSession().getId(), bizKey);
    }

    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
      log.info("服务端收到客户端消息 ==> sessionId : {}, bizKey : {}, message : {}", handler.getSession().getId(), bizKey, message);
    }

    @OnError
    public void onError(Session session, Throwable error) {
      log.error("WebSocket发生错误,sessionId : {}, bizKey : {}", handler.getSession().getId(), bizKey);
    }

    @Override
    public void accept(ChaosLogEvent chaosLogEvent) {
      String contextId = String.format("%s-%s", chaosLogEvent.getBizType(), chaosLogEvent.getBizId());
      log.info("accept chaosLogEvent : {}", JSON.toJSONString(chaosLogEvent));
      List<LogWsEndpoint> logWsEndpoints = endpointMap.get(contextId);
      if (CollectionUtils.isEmpty(logWsEndpoints)) {
            return;
      }

      logWsEndpoints.forEach(endpoint -> wsThreadPoolExecutor.submit(() -> endpoint.flushMessage(chaosLogEvent.getBizType(), chaosLogEvent.getBizId(), true)));
    }
}
==留意:上面有个accept()==方法,这个方法后面也会讲到,重要就是用于触发已经创建连接Websocket发送消息。
核心逻辑实现, 这里读取的日志文件是存储在百度云的ois,ois读取逻辑忽略。
@Slf4j
public class ChaosLogEventHandler {
    private static final long READ_LOG_MOST_LEN = 1024 * 1024 * 5L; // 5M
    private final ChaosLogger chaosLogger;
    @Getter
    private final Session session;
    private final AtomicLong offset = new AtomicLong(-1L);
    private final AtomicBoolean hasTruncated = new AtomicBoolean(false);
    private final AtomicLong waitEventCnt = new AtomicLong(0L);
    private final Lock lock = new ReentrantLock();


    public ChaosLogEventHandler(ChaosLogger chaosLogger, Session session) {
      this.chaosLogger = chaosLogger;
      this.session = session;
    }

    public void flushMessage(String bizType, String bizId, boolean force) {
      String bizKey = bizType + "-" + bizId;
      if (!lock.tryLock()) {
            waitEventCnt.incrementAndGet();
            log.info("获取锁失败,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);
            return;
      }
      try {
            if (!force && waitEventCnt.getAndSet(0L) < 1) {
                log.info("没有待处理事件,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);
                // 没有待处理的事件
                return;
            }
            log.info("向客户端刷新数据 ==> sessionId : {}, bizKey : {}, offset : {}", session.getId(), bizKey, offset.get());
            if (offset.get() < 0) {
                long contentLength = chaosLogger.getLogContentLength(bizType, bizId);
                log.info("contentLength = {} for bizLogKey {}", contentLength, bizKey);
                if (contentLength == 0) {
                  return;
                }
                if (contentLength > READ_LOG_MOST_LEN) {
                  offset.set(contentLength - READ_LOG_MOST_LEN);
                  hasTruncated.set(true);
                  log.info("文件过大,截取最后10M ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());
                } else {
                  offset.set(0L);
                }
            } else if (!force) {
                long contentLength = chaosLogger.getLogContentLength(bizType, bizId);
                if (contentLength <= offset.get()) {
                  log.info("文件长度小于offset,无需刷新 ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());
                  return;
                }
            }

            // 读取日志内容
            BosObject bosObject = chaosLogger.readLogObject(bizType, bizId, offset.get(), Long.MAX_VALUE);
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(bosObject.getObjectContent()))) {
                String line = null;
                while (null != (line = reader.readLine())) {
                  if (hasTruncated.get()) {
                        hasTruncated.set(false);
                        log.info("hasTruncated changed to false");
                  } else {
                        log.info("send ws msg:{}", line);
                        try {
                            session.getBasicRemote().sendText(line + "\n");
                        } catch (IllegalStateException e) {
                            log.info("发送消息过程中连接状态异常,跳过", e);
                        }
                  }
                  // +1是因为每一行结尾会有一个回车
                  offset.addAndGet(line.getBytes(StandardCharsets.UTF_8).length + 1);
                }
            } catch (IOException e) {
                log.error("", e);
            }
      } catch (NotFoundException e) {
            log.info("未找到数据,无需向客户端同步,bizKey:{}", bizKey, e);
      } catch (RuntimeException e) {
            log.error("", e);
      } finally {
            lock.unlock();
      }
      log.info("向客户端刷新数据,完成 ==> sessionId : {}, bizKey : {}", session.getId(), bizKey);
      // 在处理过程中,可能又有新的事件,所以再次尝试刷新数据
      flushMessage(bizType, bizKey, false);
    }
}
stept5: 广播事件,全局监听

前后端创建连接的时候,绑定了后端一台机器,但是配景一样平常都是多台服务器,如果事件传递到其他服务器,那么已经创建的连接如何监听到并返回内呢?
这里使用了rocketmq的机制,每台机器都会监听到事件的变革,从而触发当前机器将变更内容发回到前端。
@Component
@RocketMQMessageListener(topic = "EXECUTE_FLOW_LOG", selectorExpression = "log", consumerGroup = "flow-log", messageModel = MessageModel.BROADCASTING)
@Slf4j
public class ChaosLogEventConsumer implements RocketMQListener<String> {
    @Autowired(required = false)
    private List<Consumer<ChaosLogEvent>> chaosLogEventConsumers = Collections.emptyList();

    @Override
    public void onMessage(String message) {
      log.info("receive ChaosLogEvent message:{}", message);
      ChaosLogEvent event = JsonUtils.fromJson(message, ChaosLogEvent.class);
      for (Consumer<ChaosLogEvent> consumer : chaosLogEventConsumers) {
            try {
                consumer.accept(event);
            } catch (RuntimeException e) {
                log.error(" failed consume ChaosLogEvent message,consumer:" + consumer.getClass(), e);
            }
      }
    }
}
前端代码实现

以react为例,仅供参考:
export const fetchExecuteLogs = (bizType: string, bizId: any, logsRef: any, setLogs: any) => {
if (!bizType || !bizId) {
    console.log('fetchLogs: logContextToken or node is null')
    return
}
setLogs([])
if (logsRef.current) {
    console.log('close ws')
    logsRef.current.close()
}
let host = wsHost ? wsHost : window.location.host
let protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'
let client = new WebSocket(`${protocol}://${host}/ws/ark/chaos/execute/log/${bizType}/${bizId}`)
logsRef.current = ]
// 报错的回调函数
client.onerror = (e: any) => {
    console.log('Connection Error')
    console.log(e)
}
//链接打开的回调函数
client.onopen = () => {
    console.log('WebSocket Client Connected')
}
//链接关闭的回调函数
client.onclose = () => {
    console.log('echo-protocol Client Closed')
}
//收到消息的处理函数
client.onmessage = (e: any) => {
    if (logsRef.current === client) {
      if (typeof e.data === 'string') {
      let newLogs = [...logsRef.current, e.data]
      if (newLogs.length > 250) {
          newLogs = newLogs.slice(200)
      }
      setLogs(newLogs)
      logsRef.current =
      }
    } else {
      client.close()
    }
}
const sendPing = () => {
    if (logsRef.current === client) {
      const data = { message: 'heartbeat' }
      client.send(JSON.stringify(data))
      setTimeout(sendPing, 10000)
    }
}
setTimeout(sendPing, 10000)
}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 使用WebSocket实现log日志流的实时展示-从轮询到通知