SSE (Server-Sent Events) 服务器及时推送详解

打印 上一主题 下一主题

主题 998|帖子 998|积分 2994

一、什么是 SSE ?

Server-sent Events(SSE)是一种轻量级的服务器主动向客户端单向推送及时数据的技术。
与 WebSocket 差别的是,服务器发送变乱是单向的。数据消息只能从服务端到发送到客户端(如用户的欣赏器)。这使其成为不必要从客户端往服务器发送消息的情况下的最佳选择。二者的重要区别:
SSEWebSocket通信单向通信双向通信协议HTTPWebSocket主动重连支持不支持,必要客户端自行支持数据格式文本格式,如果必要二进制数据,必要自行编码默认二进制数据,支持文本格式欣赏器支持大部分支持,早期 Edge 欣赏器、IE欣赏器不支持主流欣赏器(包罗移动端)的支持较好 SSE 常见推送场景有:微信消息通知栏、消息推送、外卖状态 等等,我们自身的推送场景有:下载、连线哀求、直播提示 …
二、SSE 的工作原理

sse 的工作原理基于标准的 HTTP 协议,客户端通过发送一个特殊的 HTTP GET 哀求到服务器,哀求中包含 Accept: text/event-stream 头,表明客户端期望接收 SSE 数据流。
服务器相应后保持毗连打开,并可以持续向客户端推送数据。数据流由一系列变乱构成,每个变乱都包含变乱类型、数据内容和变乱 ID 等信息,客户端可以使用 JavaScript 中的 EventSource 接口来监听服务器发送的变乱,并进行相应的处置处罚。

三、SSE 的基本设置

1.HTTP 哀求和相应头设置

在 sse 中,客户端起首向服务器发送一个 HTTP GET 哀求,表明客户端准备接收 SSE 数据流,在服务器相应的时候,必要设置特定的相应头来告知客户端这是一个 SSE 流:


  • Content-Type : text/event-stream:告诉欣赏器这个相应是SSE流,欣赏器应该以这种方式处置处罚接收到的数据。
  • Character-Encoding : UTF-8:设置编码格式。
  • Cache-Control : no-cache:指示欣赏器不应该缓存此相应。对于SSE来说,这是很重要的,由于我们希望及时更新数据,而不希望欣赏器缓存旧的数据。
  • Connection : keep-alive:保持毗连打开,以便服务器可以持续发送数据。
2.SSE 字段介绍

SSE 数据流由一系列的字段构成,每个字段都以键值对的情势出现,字段之间用换行符分隔:


  • event: <event_name>:可选字段,用于指定变乱的名称,message是默认的变乱名称。
  • data:必须字段,包含变乱的数据内容,可以有多行,每行都以data:开头。
  • id:可选字段,提供一个唯一的标识符给变乱,可用于断线重连和消息追踪。
  • retry:可选字段,指定客户端在毗连断开后重连的隔断时间。
3.SSE 变乱数据流示例

  1.         //SSE测试
  2.     @GetMapping(value = "ssePush")
  3.     public void ssePush(HttpServletResponse response) throws IOException {
  4.         response.setContentType("text/event-stream");
  5.         response.setCharacterEncoding("UTF-8");
  6.         response.setHeader("Cache-Control", "no-cache");
  7.         response.setHeader("Connection", "keep-alive");
  8.         for (int i = 0; i < 10; i++) {
  9.             // 数据格式:
  10.             // id字段是可选的,用于指定事件的标识符;
  11.             // event字段是可选的,用于指定事件的名称;
  12.             // data字段是必须的,用于指定数据的内容;
  13.             // retry字段是可选的,用于指定客户端在连接断开后重新连接的时间间隔(以毫秒为单位)。
  14.             // 每个字段都必须以换行符(\n)结尾,并且每个消息都必须以两个换行符(\n\n)结尾。
  15.             String message = "Hello, world" + i;
  16.             response.getWriter().write("id:"+i+"\n");
  17.             response.getWriter().write("event:me\n");
  18.             response.getWriter().write("data:" + i + "\n\n");
  19.             response.getWriter().flush();
  20.             try {
  21.                 TimeUnit.SECONDS.sleep(1);
  22.             } catch (InterruptedException e) {
  23.                 e.printStackTrace();
  24.             }
  25.         }
  26.     }
复制代码

四、SseEmitter 的基本设置

SseEmitter是 Spring Framework 提供的一个类,用于实现 SSE(Server-Sent Events)。是一种基于 Servlet API 的机制,通过 HTTP 相应流(ResponseBody)来持续发送消息。
1.SseEmitter 介绍及用法



  • 构造方法

    • SseEmitter():创建一个新的 SseEmitter 实例,使用默认的超时值。
    • SseEmitter(Long timeout):创建一个新的 SseEmitter 实例,设置指定的超时时间(毫秒)。

  • 发送数据

    • send(Object data):发送数据到客户端。
    • send(Object data, MediaType mediaType):发送数据到客户端,并指定数据的媒体类型。
    • send(SseEvent event):发送一个 SseEvent 对象到客户端。

  • 关闭毗连

    • complete():正常完成变乱流,关闭毗连。
    • completeWithError(Throwable throwable):由于错误完成变乱流,并关闭毗连。
    • completeWithError(String message):由于错误完成变乱流,并关闭毗连,提供错误信息。

  • 毗连状态处置处罚

    • onCompletion(Runnable callback):注册毗连完成的回调函数。
    • onTimeout(Runnable callback):注册毗连超时的回调函数。

  • 获取超时时间

    • getTimeout():返回当前的超时时间(毫秒)。

  • 其他

    • isCompleted():检查 SseEmitter 是否已完成。
    • isExpired():检查 SseEmitter 是否已逾期。

2.使用 SseEmitter 示例1

1)编写核心 SSE Client


  • 创建 SSE 端点
    创建一个 SseEmitter,用 uid 进行标识,uid 可以是用户标识符,也可以是业务标识符。可以理解为通信信道标识。
  • 通过端点发送变乱
    可以定时或在时间发生是调用 SseEmitter.send() 方法来发送变乱。
  • 关闭端点毗连
  1. @Slf4j
  2. @Component
  3. public class SseClient {
  4.     private static final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
  5.     /**
  6.      * 创建连接
  7.      */
  8.     public SseEmitter createSse(String uid) {
  9.         //默认30秒超时,设置为0L则永不超时
  10.         SseEmitter sseEmitter = new SseEmitter(0l);
  11.         //完成后回调
  12.         sseEmitter.onCompletion(() -> {
  13.             log.info("[{}]结束连接...................", uid);
  14.             sseEmitterMap.remove(uid);
  15.         });
  16.         //超时回调
  17.         sseEmitter.onTimeout(() -> {
  18.             log.info("[{}]连接超时...................", uid);
  19.         });
  20.         //异常回调
  21.         sseEmitter.onError(
  22.                 throwable -> {
  23.                     try {
  24.                         log.info("[{}]连接异常,{}", uid, throwable.toString());
  25.                         sseEmitter.send(SseEmitter.event()
  26.                                 .id(uid)
  27.                                 .name("发生异常!")
  28.                                 .data("发生异常请重试!")
  29.                                 .reconnectTime(3000));
  30.                         sseEmitterMap.put(uid, sseEmitter);
  31.                     } catch (IOException e) {
  32.                         e.printStackTrace();
  33.                     }
  34.                 }
  35.         );
  36.         try {
  37.             sseEmitter.send(SseEmitter.event().reconnectTime(5000));
  38.         } catch (IOException e) {
  39.             e.printStackTrace();
  40.         }
  41.         sseEmitterMap.put(uid, sseEmitter);
  42.         log.info("[{}]创建sse连接成功!", uid);
  43.         return sseEmitter;
  44.     }
  45.     /**
  46.      * 给指定用户发送消息
  47.      *
  48.      */
  49.     public boolean sendMessage(String uid,String messageId, String message) {
  50.         if (StrUtil.isBlank(message)) {
  51.             log.info("参数异常,msg为null", uid);
  52.             return false;
  53.         }
  54.         SseEmitter sseEmitter = sseEmitterMap.get(uid);
  55.         if (sseEmitter == null) {
  56.             log.info("消息推送失败uid:[{}],没有创建连接,请重试。", uid);
  57.             return false;
  58.         }
  59.         try {
  60.             sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(1*60*1000L).data(message));
  61.             log.info("用户{},消息id:{},推送成功:{}", uid,messageId, message);
  62.             return true;
  63.         }catch (Exception e) {
  64.             sseEmitterMap.remove(uid);
  65.             log.info("用户{},消息id:{},推送异常:{}", uid,messageId, e.getMessage());
  66.             sseEmitter.complete();
  67.             return false;
  68.         }
  69.     }
  70.     /**
  71.      * 断开
  72.      * @param uid
  73.      */
  74.     public void closeSse(String uid){
  75.         if (sseEmitterMap.containsKey(uid)) {
  76.             SseEmitter sseEmitter = sseEmitterMap.get(uid);
  77.             sseEmitter.complete();
  78.             sseEmitterMap.remove(uid);
  79.         }else {
  80.             log.info("用户{} 连接已关闭",uid);
  81.         }
  82.     }
  83. }
复制代码
2)编写 Controller


  • 打开页面默认页面,通报端点标识
  • 毗连端点(/createSse),页面必要使用
  • 通过 ajax(/sendMsg),触发后端业务,向页面发送消息
  • 主动关闭毗连(/closeSse)
  1. @Controller
  2. public class IndexAction {
  3.     @Autowired
  4.     private SseClient sseClient;
  5.     @GetMapping("/")
  6.     public String index(ModelMap model) {
  7.         String uid = IdUtil.fastUUID();
  8.         model.put("uid",uid);
  9.         return "index";
  10.     }
  11.     @CrossOrigin
  12.     @GetMapping("/createSse")
  13.     public SseEmitter createConnect(String uid) {
  14.         return sseClient.createSse(uid);
  15.     }
  16.     @CrossOrigin
  17.     @GetMapping("/sendMsg")
  18.     @ResponseBody
  19.     public String sseChat(String uid) {
  20.         for (int i = 0; i < 10; i++) {
  21.             sseClient.sendMessage(uid, "no"+i,IdUtil.fastUUID());
  22.         }
  23.         return "ok";
  24.     }
  25.     /**
  26.      * 关闭连接
  27.      */
  28.     @CrossOrigin
  29.     @GetMapping("/closeSse")
  30.     public void closeConnect(String uid ){
  31.         sseClient.closeSse(uid);
  32.     }
  33. }
复制代码
3)前端接收与处置处罚

前端每接收到一次SSE推送的变乱,就会在id为"con"的元素中追加数据。
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4.     <meta charset="UTF-8">
  5.     <title>Title</title>
  6. </head>
  7. <body>
  8. <div id="con"></div>
  9. <script>
  10.     let chat = document.getElementById("con");
  11.     if (window.EventSource) {
  12.         //创建sse
  13.          eventSource = new EventSource(`/createSse?uid=${uid}`);
  14.         eventSource.onopen = function (event) {
  15.             console.log('SSE链接成功');
  16.         }
  17.         eventSource.onmessage = function (event) {
  18.             if(event.data){
  19.                 chat.innerHTML += event.data + '<br/>';
  20.                 //console.log('后端返回的数据:', data.value);
  21.             }
  22.         }
  23.         eventSource.onerror = (error) => {
  24.             console.log('SSE链接失败');
  25.         };
  26.     } else {
  27.         alert("你的浏览器不支持SSE");
  28.     }
  29. </script>
  30. </body>
  31. </html>
复制代码
3.使用 SseEmitter 示例2

1)后端实现SSE

  1. @RestController
  2. public class SseController {
  3.     @GetMapping("/stream")
  4.     public SseEmitter handleSse(HttpServletResponse response) {
  5.         response.setContentType("text/event-stream");
  6.         response.setCharacterEncoding("UTF-8");
  7.         response.setHeader("Cache-Control", "no-cache");
  8.         response.setHeader("Connection", "keep-alive");
  9.         SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
  10.         
  11.         new Thread(() -> { // 创建一个新的线程
  12.             try {
  13.                 for (int i = 0; i < 10; i++) {
  14.                     String message = "Hello, world" + i;
  15.                     emitter.send(SseEmitter.event()
  16.                                     .id(i + "")
  17.                             .name("message")
  18.                             .data(message));
  19.                     Thread.sleep(1000); // 每秒发送一条消息
  20.                 }
  21.                 emitter.complete(); // 发送完毕后关闭连接
  22.             } catch (IOException | InterruptedException e) {
  23.                 emitter.completeWithError(e); // 错误完成事件流,并关闭连接
  24.             }
  25.         }).start();
  26.         return emitter;
  27.     }
  28. }
复制代码
2)前端接入SSE

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4.     <meta charset="UTF-8">
  5.     <title>SSE Client</title>
  6. </head>
  7. <body>
  8.     <h1>Server-Sent Events Client</h1>
  9.     <pre id="output"></pre>
  10.     <button id="closeButton">Close Connection</button>
  11.     <script>
  12.         const eventSource = new EventSource('<http://localhost:3000/stream>');
  13.         const output = document.getElementById('output');
  14.         const closeButton = document.getElementById('closeButton');
  15.         eventSource.onmessage = function(event) {
  16.             output.textContent += event.data + '\\n';
  17.         };
  18.         eventSource.onerror = function(event) {
  19.             console.error('EventSource failed: ', event);
  20.             eventSource.close(); // 可以选择在发生错误时关闭连接
  21.         };
  22.         eventSource.addEventListener('end', function(event) {
  23.             console.log('Server closed the connection: ', event);
  24.             eventSource.close();
  25.         });
  26.         closeButton.addEventListener('click', function() {
  27.             eventSource.close();
  28.         });
  29.     </script>
  30. </body>
  31. </html>
复制代码
五、留意事项

1.断开毗连时

当客户端断开毗连时,SseEmitter 会抛出 IOException,所以务必捕获并处置处罚这种异常,通常情况下我们会调用 emitter.complete() 或 emitter.completeWithError() 来关闭 SseEmitter。
2.nginx设置

这里记载一个踩坑情况:在我没有设置 nginx 时,调用 SSE 接口,通过IP+端口访问与直接通过域名访问是不一样的。由于没有设置 nginx,域名访问的接口会等待全部消息发送后,全部一起在页面展示。而IP+端口则会一条条的展示消息。所以大家遇到雷怜悯况记得设置 nginx。如下:

3.广播推送

我们以「文件下载」功能进行说明,一般情况下,大文件的下载,服务端压力比较大、处置处罚时间也比较长,为了有更好的交互体验,我们可以使用异步处置处罚,服务端处置处罚完了之后主动通知 客户端。
下载完成后,我们必要将完成变乱推送给客户端。必要留意的是,由于服务是集群部署、SSE 毗连在节点本地 Map 维护,这就有可能导致当前客户端的 SSE毗连所在节点 与 变乱推送节点 是两个独立的节点。
因此,我们这里借助于 Redis 的发布/订阅能力,将消息广播出去,能匹配毗连的节点负责将消息推送至客户端、其他节点直接丢弃即可。效果图如下:

可否做到精准投递?
可以的,我们可以如许:
借助 Redis 做中心存储,存储 Map <用户, 节点IP> 如许的映射关系。
在推送消息之前,先通过映射关系找到该用户的 SSE 毗连所在节点,然后通过 RPC 调用,直接将消息投递到对应的服务节点,末了由该节点进行变乱推送。
一般情况下,我们可以用「广播」这种简单粗暴的方式应对大部分场景,毕竟「精准投递」必要中心化维护节点关系、应对节点变更等,处置处罚起来稍显贫苦。当然,具体视业务场景来做选择即可。
3.安全标题

1)防止 XSS 攻击

由于 SSE 允许服务器动态地向客户端页面发送数据,如果不精确处置处罚,可能会成为 XSS 攻击的载体。确保对全部接收到的数据进行适当的清理和编码,避免直接插入到 DOM 中。
  1. eventSource.onmessage = function(event) {
  2.     const safeData = encodeURI(event.data); // 对数据进行URL编码
  3.     const messageElement = document.createElement('div');
  4.     messageElement.textContent = safeData; // 安全地将数据添加到页面
  5.     document.getElementById('messages').appendChild(messageElement);
  6. };
复制代码
2)验证毗连哀求

验证全部SSE毗连哀求,确保它们来自可信的源。可以通过检查Referer头或使用身份验证令牌来实现。
  1. // 检查请求来源
  2. String refererHost = request.getHeader("Referer");
  3. if (refererHost == null || !refererHost.contains("trusted-domain.com")) {
  4.     response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
  5.     return;
  6. }
复制代码
3)限制毗连数目

为了防止资源耗尽攻击,服务器应该限制每个客户端可以创建的SSE毗连数目。这可以通过在服务器端设置最大毗连数来实现。
4)限制毗连数目

为了防止资源耗尽攻击,服务器应该限制每个客户端可以创建的SSE毗连数目。这可以通过在服务器端设置最大毗连数来实现。
5)监控和日志记载

启用具体的日志记载和监控机制,以便在发生安全变乱时快速相应。记载全部SSE毗连的元数据,如IP所在、毗连时间等。
6)实行访问控制

使用适当的访问控制战略,确保只有授权用户才能接收敏感数据。这可能涉及到用户认证和授权机制。
4.服务端资源消耗

1)毗连开销

SSE通过保持HTTP毗连打开来实现服务器向客户端的持续数据推送。这意味着服务器必要为每个SSE毗连分配内存和资源,用于维护毗连状态和数据缓冲 在Java中,可以使用线程或异步处置处罚来管理SSE毗连,但必要留意资源的合理分配和回收。
  1. @GetMapping("/stream")
  2. public SseEmitter handleSseRequest(HttpServletRequest request) {
  3.     SseEmitter emitter = new SseEmitter();
  4.     // 添加资源清理逻辑
  5.     emitter.onCompletion(() -> {
  6.         // 清理资源
  7.     });
  8.     return emitter;
  9. }
复制代码
2)并发毗连

当大量客户端同时毗连到服务器时,服务器必要处置处罚的并发毗连数增长,这会显著增长CPU和内存的使用率。 可以使用线程池来控制并发量,例如在Spring框架中设置线程池:
  1. @Configuration
  2. public class AsyncConfig {
  3.     @Bean
  4.     public Executor taskExecutor() {
  5.         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  6.         executor.setCorePoolSize(2);
  7.         executor.setMaxPoolSize(10);
  8.         executor.setQueueCapacity(25);
  9.         executor.setThreadNamePrefix("SSE-Executor-");
  10.         return executor;
  11.     }
  12. }
复制代码
3)状态管理

服务器必要维护每个SSE毗连的状态,包罗发送的数据、重连尝试等。状态管理的复杂性随着毗连数的增长而增长。 可以使用数据库或缓存来存储和管理SSE毗连状态:
  1. // 伪代码,展示如何存储和检索SSE连接状态
  2. ConnectionState state = connectionStateRepository.findByConnectionId(connectionId);
  3. state.updateWithData(latestData);
  4. connectionStateRepository.save(state);
复制代码
4)内存走漏防护

长时间运行的SSE毗连可能会导致内存走漏,特殊是如果不精确地管理变乱监听器和相干资源。 确保在毗连关闭时清理全部资源:
  1. emitter.onCompletion(() -> {
  2.     // 清理内存,取消定时器,关闭数据库连接等
  3. });
复制代码
5)日志和监控

适当的日志记载和监控可以资助识别和解决资源消耗标题。 实现自定义的日志记载和监控逻辑:
  1. @GetMapping("/stream")
  2. public SseEmitter handleSseRequest(HttpServletRequest request) {
  3.     SseEmitter emitter = new SseEmitter();
  4.     emitter.onTimeout(() -> log.warn("SSE connection timed out"));
  5.     emitter.onCompletion(() -> log.info("SSE connection completed"));
  6.     return emitter;
  7. }
复制代码
6)优化战略



  • 「毗连复用」:尽可能复用现有的毗连,减少毗连创建和关闭的开销
  • 「批量发送」:如果可能,批量发送数据而不是单个变乱,减少数据包的数目
  • 「使用高效的序列化」:选择高效的数据序列化方法,减少数据传输的巨细
  • 「超时和主动重连」:合理设置超时时间和主动重连战略,避免不必要的资源浪费

好事定律:每件事末了都会是好事,如果不是好事,说明还没到末了。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户云卷云舒

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表