马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
SSE
项目中遇到竞价功能,用户停留在竞价页面,服务端需要及时把竞价信息推送给每一个客户
什么是SSE
SSE是一种服务器推送技术,答应服务器及时向客户端发送数据流。它是一种轻量级的单向通讯机制,特别得当于及时性要求高的场景,如及时更新日志、及时新闻推送等。在Java中,SSE的实现依赖于Servlet 3.0及以上版本,通过使用Java的特定库和框架,可以轻松地向客户端推送及时数据。
业务上实现单向通讯有以下两种:
特性Spring WebFlux + SSESpring MVC + SseEmitter并发能力高(单线程支持数千毗连)低(受限于线程池巨细)线程占用无独占线程(EventLoop共享)每个推送占用线程池线程代码复杂度简单(内置广播机制)复杂(需手动管理毗连和线程)适用场景新项目/高并发需求旧项目/低并发需求Spring WebFlux + SSE 简单案例
1.接收器管理类
实际业务中,竞价不可能只是单一的一个物品竞价,可能同时存在多个物品竞价
ConcurrentMap dynamicSinks
一个物品竞价map就添加一条
业务结束(某一个竞价结束)调用closeSink
- package com.dem.framework.config.sse;
- import org.springframework.stereotype.Component;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Sinks;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentMap;
- /**
- * @program: dem-template-pc
- * @description: 接收器管理类
- * @author: lcb
- * @create: 2025-04-01
- **/
- @Component
- public class DynamicSinkManager {
- private final ConcurrentMap<String, Sinks.Many<String>> dynamicSinks = new ConcurrentHashMap<>();
- /**
- * 根据sinkKey获取Sink,如果不存在则创建一个
- * @param sinkKey Sink的唯一key
- * @author lcb
- * @date 2025/4/1
- * @return
- **/
- public Sinks.Many<String> getOrCreateSink(String sinkKey) {
- return dynamicSinks.computeIfAbsent(sinkKey,
- k -> Sinks.many().multicast().directBestEffort()
- );
- }
- /**
- * 关闭指定Sink
- * @param sinkKey
- *
- * @author lcb
- * @date 2025/4/1
- * @return
- *
- **/
- public void closeSink(String sinkKey) {
- Sinks.Many<String> sink = dynamicSinks.remove(sinkKey);
- if (sink != null) {
- sink.tryEmitComplete();
- }
- }
- public Flux<String> getFlux(String sinkKey) {
- return getOrCreateSink(sinkKey).asFlux();
- }
- }
复制代码 2.消息生产者
- package com.dem.framework.config.sse;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Sinks;
- import java.util.List;
- /**
- * @program: dem-template-pc
- * @description: sse 消息生产者
- * @author: lcb
- * @create: 2025-04-01
- **/
- @Component
- @Slf4j
- public class MessageProducer {
- @Autowired
- private DynamicSinkManager dynamicSinkManager;
- /**
- * 订阅 广播模式
- * @param sinkKey 订阅一个
- *
- * @author lcb
- * @date 2025/4/1 15:26
- * @return
- **/
- public Flux<String> asFluxShare(String sinkKey) {
- return dynamicSinkManager.getOrCreateSink(sinkKey).asFlux().share();
- }
- //
- public void sendMessage(String sinkKey,String message) {
- Sinks.EmitResult result = dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitNext(message);
- // 处理发送结果
- if (result.isFailure()) {
- // 根据业务需求处理失败情况
- handleEmitFailure(result, message);
- }
- }
- // 批量发送
- public void sendBatch(String sinkKey,List<String> messages) {
- messages.forEach(msg->sendMessage(sinkKey, msg));
- }
- // 关闭消息流
- public void shutdown(String sinkKey) {
- dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitComplete();
- }
- private void handleEmitFailure(Sinks.EmitResult result, String message) {
- // 实现你的失败处理逻辑
- log.error("消息发送失败,原因: {}", result);
- }
- }
复制代码 3.后端测试入口
- package com.dem.web.controller.business;
- import com.dem.common.annotation.Anonymous;
- import com.dem.framework.config.sse.MessageProducer;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.http.MediaType;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- import reactor.core.publisher.Flux;
- /**
- * @program: dem-template-pc
- * @description: 测试
- * @author: lcb
- * @create: 2025-03-28 10:21
- **/
- @RestController
- @RequestMapping("/sse")
- @Slf4j
- public class SSEController {
- @Autowired
- private MessageProducer messageProducer;
- /**
- * 与客户端建立链接
- *
- * @param sinkKey
- * @return
- * @author lcb
- * @date 2025/4/1
- **/
- @GetMapping(value = "/linkEvents/{sinkKey}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public Flux<String> linkEvents(@PathVariable String sinkKey) {
- return messageProducer.asFluxShare(sinkKey);
- }
- /**
- * 模拟添加数据
- *
- * @param sinkKey
- * @param str
- * @return
- * @author lcb
- * @date 2025/4/1 15:50
- **/
- @GetMapping(value = "/addData/{sinkKey}/{str}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public void addData(@PathVariable("sinkKey") String sinkKey, @PathVariable("str") String str) {
- /*
- * 处理业务。。。
- * */
- log.info("数据:{}", str);
- messageProducer.sendMessage(sinkKey, str);
- }
- /**
- * 模拟竞价结束
- *
- * @param sinkKey
- * @return
- * @author lcb
- * @date 2025/4/1 15:50
- **/
- @GetMapping(value = "/shutdown/{sinkKey}")
- public void guanb(@PathVariable String sinkKey) {
- /*
- * 竞价结束
- * */
- messageProducer.shutdown(sinkKey);
- }
- }
复制代码 4.测试
- 前端代码
- const eventSource = new EventSource('http://localhost:8080/dem-vue-template-api/sse/linkEvents/dem123');
- eventSource.onmessage = function(event) {
- console.log('Received:', event.data);
- // 在这里处理接收到的数据,比如更新UI
- };
复制代码 - 也可使用欣赏器直接哀求
其他参数
- Sinks.many():
- 创建一个可以发射多个元素的 Sink (接收器)
- 与 Sinks.one()(单元素)和 Sinks.empty()(无元素)相对
- .multicast():
- 指定这是一个多播 Sink
- 意味着多个订阅者可以订阅同一个源
- 与 .unicast()(单播)相对
- .directBestEffort():
由于这个业务是 竞价,后来订阅的用户只需要获取最新的数据即可
如果后来订阅的用户需要获取之前的汗青数据可以参考
.onBackpressureBuffer()
.replay()
- direct: 表现直接传递元素,不进行任何缓冲
- bestEffort: 表现"尽力而为"的传递策略,如果卑鄙跟不上,可能会抛弃元素
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |