qidao123.com技术社区-IT企服评测·应用市场

标题: SSE实现消息实时推送,前端渐进式学习、实践,真香 [打印本页]

作者: 愛在花開的季節    时间: 5 天前
标题: SSE实现消息实时推送,前端渐进式学习、实践,真香
一、SSE概念

SSE(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件。我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式
二、SSE应用场景

在web端消息推送功能中,由于传统的HTTP协议是由客户端主动发起请求,服务端才会响应。根本的ajax轮询技能便是云云。而在SSE中,浏览发送一个请求给服务端,通过响应头中的Content-Type:text/event-stream等向客户端申明这是一个长连接,发送的是流数据,这样客户端就不会关闭连接,一直等候服务端发送数据。
假如服务器返回的数据中包含了事件标识符,浏览器会记录最后一次接收的事件的标识符。假如与服务器的连接中断,当浏览器再次举行连接时,会通过头来声明最后一次接收的事件的标识符。服务器端可以通过浏览器发送的事件标识符来确定从哪个事件来继承连接
三、前端使用方法、问题

1、get方式

使用eventsource完成get请求
缺点:客户端无法通过一个get请求完成数据传递
参考文档:
EventSource - Web API 接口参考 | MDN
实现流程:
代码实现:
  1. const eventSourceRef = useRef<any>(null)
  2. const contact = async (messageData: any) => {
  3.   eventSourceRef.current = new EventSource(
  4.     `${API_BASE}/v1/model/stream?id=${id}`,
  5.   )
  6.   if (!eventSourceRef.current) return
  7.   // 监听 SSE 事件,因为后端定义了具名事件,所以这儿要用addEventListener监听,而不是onmessage
  8.   eventSourceRef.current.addEventListener('add', function (e: any) {
  9.         // 处理数据展示
  10.   })
  11.   eventSourceRef.current.addEventListener('finish', function (e: any) {
  12.    // 结束标识finish
  13.     eventSourceRef.current.close() // 关闭连接
  14.   })
  15.   eventSourceRef.current.addEventListener('error', function (e: any) {
  16.     if (e.status === 401) {
  17.       // 用户登录状态失效处理
  18.     }
  19.     // error报错处理
  20.     console.log('Error occurred:', e)
  21.     // 关闭连接
  22.     eventSourceRef.current.close()
  23.   })
  24. }
复制代码
2、post方式

使用fetch-event-source完成连接,仅需一个接口,支持添加请求头
缺点:在浏览器返回的text/eventstream里看不到具体返回,无法举行预览
参考文档:
@microsoft/fetch-event-source
实现流程:
具体实现:
  1.   const eventSourceRef = useRef<any>(null)
  2.   const [abortController, setAbortController] = useState(new AbortController())
  3.   // 通信事件
  4.   const contact = async (messageData: any) => {
  5.     messageData = { ...messageData, do_stream: modelArg.do_stream } // 请求参数
  6.     receivedDataRef.current = ''
  7.     const token: string = getLocal('AUTHCODE') || ''
  8.     fetchEventSource(`${MAAS_API_BASE}/v1/model_api/invoke`, {
  9.       method: 'POST',
  10.       // 添加请求头
  11.       headers: {
  12.         Authorization: token,
  13.         'Content-Type': 'application/json',
  14.       },
  15.        // 传参必须保证是json
  16.       body: JSON.stringify(messageData),
  17.       // abortController.signal 提供了一个信号对象给 fetchEventSource 函数。
  18.       // 如果在任何时候你想取消正在进行的 fetch 操作,你可以调用
  19.       // abortController.abort()。这会发出关联任务的信号,你可以使用
  20.       // AbortController 的信号来检查异步操作是否已被取消。
  21.       signal: abortController.signal,
  22.       openWhenHidden: true, // 切换标签页时连接不关闭
  23.       async onopen(resp) {
  24.         // 处理登录失效
  25.         if (resp.status === 401) {
  26.           message.warning('登录过期')
  27.           return
  28.         }
  29.       },
  30.       onmessage(msg: any) {
  31.         const eventType = msg.event // 监听event的具名事件
  32.         switch (eventType) {
  33.           case 'add':
  34.             // 流式输出事件,add每次会返回具体字符,前端负责拼接展示
  35.             break
  36.           case 'finish':
  37.             setStatu('finish') // 结束标识
  38.             break
  39.           case 'error':
  40.             if (msg.status === 401) {
  41.                message.warning('登录过期')
  42.             }
  43.             console.log('Error occurred:', e)
  44.             break
  45.         }
  46.       },
  47.       onerror(err) {
  48.         throw err // 连接遇到http错误时,如跨域等,必须要throw才能停止,不然会一直重连
  49.       },
  50.       onclose() {},
  51.     })
  52.   }
  53.   // 终止连接方法,比如在切换模型时,你可能有必要终止上一次连接来避免问答串联
  54.   const closeSSE = () => {
  55.     abortController.abort()
  56.     setAbortController(new AbortController())
  57.   }
复制代码
3、一种接口同时兼容流式/非流式

同上post方法
  1.     fetchEventSource(sseUrl, {
  2.       method: 'POST',
  3.       headers,
  4.       signal: abortController.signal,
  5.       body: JSON.stringify(customInferData),
  6.       openWhenHidden: true,
  7.     /**
  8.     *在onopen阶段处理
  9.     第一步:判断resp.headers.get('content-type'),如果不包含text/event-stream,
  10.     则代表非流式
  11.     第二步:需要在onopen阶段处理非流式返回,即json返回,读取json返回并渲染,注意异常也要处理
  12.     第三步:
  13.      */
  14.       async onopen(resp) {
  15.         const contentype = resp.headers.get('content-type') || ''
  16.         console.log('contentype =>', contentype)
  17.         console.log('resp.ok =>', resp.ok)
  18.         if (resp.ok && !contentype.includes('text/event-stream')) {
  19.           // 读取json数据
  20.           const responseData = await resp.json()
  21.           if (responseData.code !== 0) {
  22.              // 报错处理+关闭连接
  23.           } else {
  24.             //处理数据渲染+关闭连接
  25.             
  26.             stopSession()
  27.           }
  28.         } else if (resp.status === 401) {
  29.           message.warning('登录过期')
  30.           // 报错处理+关闭连接
  31.           stopSession()
  32.         
  33.         }
  34.       },
  35.       onmessage(msg: any) {
  36.         const eventType = msg.event
  37.         const messages: any = cloneDeep(chatState.sessionMessages)
  38.         let lastMessage: any = messages[messages.length - 1] || {}
  39.         switch (eventType) {
  40.           case 'add':
  41.             lastMessage = {
  42.               ...lastMessage,
  43.               text: `${lastMessage.text}${msg.data || ' '}`,
  44.               loading: false,
  45.             }
  46.             messages.splice(messages.length - 1, 1, lastMessage)
  47.             chatAction.updateSessionMessages(messages)
  48.             break
  49.           case 'finish':
  50.             console.log('finish lastMessage =>', lastMessage)
  51.             chatAction.updateSessionStatu(SessionStatuTypes.ready)
  52.             chatAction.updateContext(msg.data)
  53.             break
  54.           case 'info':
  55.             {
  56.               const messages: any = cloneDeep(chatState.sessionMessages)
  57.               let lastMessage: any = messages[messages.length - 1] || {}
  58.               lastMessage = {
  59.                 referenceDocs: JSON.parse(msg.data).reference_by_docs,
  60.                 ...lastMessage,
  61.               }
  62.               messages.splice(messages.length - 1, 1, lastMessage)
  63.               chatAction.updateSessionMessages(messages)
  64.             }
  65.             break
  66.           case 'error':
  67.             if (msg.status === 401) {
  68.               chatAction.updateSessionStatu(SessionStatuTypes.ready)
  69.             } else {
  70.               errorItemFn(msg?.msg || msg?.data || '抱歉,暂无法回答问题')
  71.             }
  72.             break
  73.         }
  74.       },
  75.       onerror(err: any) {
  76.         errorItemFn(err?.msg || '抱歉,暂无法回答该问题')
  77.         console.log('eventSource error: ', `${err}`)
  78.         throw err  // 连接遇到http错误时,如跨域等,必须要throw才能停止,不然会一直重连
  79.       },
  80.       onclose() {
  81.         console.log('eventSource close')
  82.       },
  83.     })
  84.   // 终止会话
  85.   const stopSession = () => {
  86.     abortController.abort()
  87.     setAbortController(new AbortController())
  88.   }
复制代码
四、常见问题汇总

1、无法添加请求头

应用fetch-event-source办理
2、一个方法需要同时兼容流式和非流式

应用fetch-event-source在onopen阶段处理非流式输出,如报错、接口json返回等
3、遇到跨域时候,请求一直连接

应用fetch-event-source在监听具名事件时,如error,将错误throw err,否则无法中断连接
4、使用SSE时,前端引入markdown渲染时发现后端response data里的前置空格被忽略

服务器响应状态码应该为 200,header 为Content-Type: text/event-stream,然后保持此连接并以一种特殊的格式写入消息,就像这样:
  1. data: Message 1
  2. data: Message 2
  3. data: Message 3
  4. data: of two lines
复制代码
data:后为消息文本,冒号反面的空格是可选的。
这个可选便是重要缘故原由。查询社区后,找到了两种办理办法:

具体表现,点击放大查察:

5、fetch方法如何停止

  1.   const stopSession = () => {
  2.     abortController.abort()
  3.     setAbortController(new AbortController())
  4.   }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 qidao123.com技术社区-IT企服评测·应用市场 (https://dis.qidao123.com/) Powered by Discuz! X3.4