SSE

打印 上一主题 下一主题

主题 1711|帖子 1711|积分 5133

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
SSE

项目中遇到竞价功能,用户停留在竞价页面,服务端需要及时把竞价信息推送给每一个客户
什么是SSE

SSE是一种服务器推送技术,答应服务器及时向客户端发送数据流。它是一种轻量级的单向通讯机制,特别得当于及时性要求高的场景,如及时更新日志、及时新闻推送等。在Java中,SSE的实现依赖于Servlet 3.0及以上版本,通过使用Java的特定库和框架,可以轻松地向客户端推送及时数据。
业务上实现单向通讯有以下两种:
特性Spring WebFlux + SSESpring MVC + SseEmitter并发能力高(单线程支持数千毗连)低(受限于线程池巨细)线程占用无独占线程(EventLoop共享)每个推送占用线程池线程代码复杂度简单(内置广播机制)复杂(需手动管理毗连和线程)适用场景新项目/高并发需求旧项目/低并发需求Spring WebFlux + SSE 简单案例

1.接收器管理类

实际业务中,竞价不可能只是单一的一个物品竞价,可能同时存在多个物品竞价
ConcurrentMap dynamicSinks
一个物品竞价map就添加一条
业务结束(某一个竞价结束)调用closeSink
  1. package com.dem.framework.config.sse;
  2. import org.springframework.stereotype.Component;
  3. import reactor.core.publisher.Flux;
  4. import reactor.core.publisher.Sinks;
  5. import java.util.concurrent.ConcurrentHashMap;
  6. import java.util.concurrent.ConcurrentMap;
  7. /**
  8. * @program: dem-template-pc
  9. * @description: 接收器管理类
  10. * @author: lcb
  11. * @create: 2025-04-01
  12. **/
  13. @Component
  14. public class DynamicSinkManager {
  15.     private final ConcurrentMap<String, Sinks.Many<String>> dynamicSinks = new ConcurrentHashMap<>();
  16.     /**
  17.      * 根据sinkKey获取Sink,如果不存在则创建一个
  18.      * @param sinkKey Sink的唯一key
  19.      * @author lcb
  20.      * @date 2025/4/1
  21.      * @return
  22.      **/
  23.     public Sinks.Many<String> getOrCreateSink(String sinkKey) {
  24.         return dynamicSinks.computeIfAbsent(sinkKey,
  25.                 k -> Sinks.many().multicast().directBestEffort()
  26.         );
  27.     }
  28.     /**
  29.      * 关闭指定Sink
  30.      * @param sinkKey
  31.      *
  32.      * @author lcb
  33.      * @date 2025/4/1
  34.      * @return
  35.      *
  36.      **/
  37.     public void closeSink(String sinkKey) {
  38.         Sinks.Many<String> sink = dynamicSinks.remove(sinkKey);
  39.         if (sink != null) {
  40.             sink.tryEmitComplete();
  41.         }
  42.     }
  43.     public Flux<String> getFlux(String sinkKey) {
  44.         return getOrCreateSink(sinkKey).asFlux();
  45.     }
  46. }
复制代码
2.消息生产者
  1. package com.dem.framework.config.sse;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import reactor.core.publisher.Flux;
  6. import reactor.core.publisher.Sinks;
  7. import java.util.List;
  8. /**
  9. * @program: dem-template-pc
  10. * @description: sse 消息生产者
  11. * @author: lcb
  12. * @create: 2025-04-01
  13. **/
  14. @Component
  15. @Slf4j
  16. public class MessageProducer {
  17.     @Autowired
  18.     private DynamicSinkManager dynamicSinkManager;
  19.     /**
  20.      * 订阅 广播模式
  21.      * @param sinkKey 订阅一个
  22.      *
  23.      * @author lcb
  24.      * @date 2025/4/1 15:26
  25.      * @return
  26.      **/
  27.     public Flux<String> asFluxShare(String sinkKey) {
  28.         return dynamicSinkManager.getOrCreateSink(sinkKey).asFlux().share();
  29.     }
  30.     //
  31.     public void sendMessage(String sinkKey,String message) {
  32.         Sinks.EmitResult result = dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitNext(message);
  33.         // 处理发送结果
  34.         if (result.isFailure()) {
  35.             // 根据业务需求处理失败情况
  36.             handleEmitFailure(result, message);
  37.         }
  38.     }
  39.     // 批量发送
  40.     public void sendBatch(String sinkKey,List<String> messages) {
  41.         messages.forEach(msg->sendMessage(sinkKey, msg));
  42.     }
  43.     // 关闭消息流
  44.     public void shutdown(String sinkKey) {
  45.         dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitComplete();
  46.     }
  47.     private void handleEmitFailure(Sinks.EmitResult result, String message) {
  48.         // 实现你的失败处理逻辑
  49.         log.error("消息发送失败,原因: {}", result);
  50.     }
  51. }
复制代码
3.后端测试入口
  1. package com.dem.web.controller.business;
  2. import com.dem.common.annotation.Anonymous;
  3. import com.dem.framework.config.sse.MessageProducer;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.http.MediaType;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.PathVariable;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RestController;
  11. import reactor.core.publisher.Flux;
  12. /**
  13. * @program: dem-template-pc
  14. * @description: 测试
  15. * @author: lcb
  16. * @create: 2025-03-28 10:21
  17. **/
  18. @RestController
  19. @RequestMapping("/sse")
  20. @Slf4j
  21. public class SSEController {
  22.     @Autowired
  23.     private MessageProducer messageProducer;
  24.     /**
  25.      * 与客户端建立链接
  26.      *
  27.      * @param sinkKey
  28.      * @return
  29.      * @author lcb
  30.      * @date 2025/4/1
  31.      **/
  32.     @GetMapping(value = "/linkEvents/{sinkKey}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  33.     public Flux<String> linkEvents(@PathVariable String sinkKey) {
  34.         return messageProducer.asFluxShare(sinkKey);
  35.     }
  36.     /**
  37.      * 模拟添加数据
  38.      *
  39.      * @param sinkKey
  40.      * @param str
  41.      * @return
  42.      * @author lcb
  43.      * @date 2025/4/1 15:50
  44.      **/
  45.     @GetMapping(value = "/addData/{sinkKey}/{str}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  46.     public void addData(@PathVariable("sinkKey") String sinkKey, @PathVariable("str") String str) {
  47.         /*
  48.          * 处理业务。。。
  49.          * */
  50.         log.info("数据:{}", str);
  51.         messageProducer.sendMessage(sinkKey, str);
  52.     }
  53.     /**
  54.      * 模拟竞价结束
  55.      *
  56.      * @param sinkKey
  57.      * @return
  58.      * @author lcb
  59.      * @date 2025/4/1 15:50
  60.      **/
  61.     @GetMapping(value = "/shutdown/{sinkKey}")
  62.     public void guanb(@PathVariable String sinkKey) {
  63.         /*
  64.          * 竞价结束
  65.          * */
  66.         messageProducer.shutdown(sinkKey);
  67.     }
  68. }
复制代码
4.测试


  • 前端代码
    1. const eventSource = new EventSource('http://localhost:8080/dem-vue-template-api/sse/linkEvents/dem123');
    2. eventSource.onmessage = function(event) {
    3.   console.log('Received:', event.data);
    4.   // 在这里处理接收到的数据,比如更新UI
    5. };
    复制代码
  • 也可使用欣赏器直接哀求
其他参数


  • Sinks.many():

    • 创建一个可以发射多个元素的 Sink (接收器)
    • 与 Sinks.one()(单元素)和 Sinks.empty()(无元素)相对

  • .multicast():

    • 指定这是一个多播 Sink
    • 意味着多个订阅者可以订阅同一个源
    • 与 .unicast()(单播)相对

  • .directBestEffort():
    由于这个业务是 竞价,后来订阅的用户只需要获取最新的数据即可
    如果后来订阅的用户需要获取之前的汗青数据可以参考
    .onBackpressureBuffer()
    .replay()


    • direct: 表现直接传递元素,不进行任何缓冲
    • bestEffort: 表现"尽力而为"的传递策略,如果卑鄙跟不上,可能会抛弃元素


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

何小豆儿在此

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表