Fastapi集成SSE服务后端自动推送消息到前端

打印 上一主题 下一主题

主题 853|帖子 853|积分 2561

sse技术简介

SSE(Server-Sent Events)是一种允许服务器向客户端浏览器推送信息的技术。它是 HTML5 的一部分,专门用于建立一个单向的从服务器到客户端的通信毗连。SSE的使用场景非常广泛,包括实时消息推送、实时关照更新等。
严格地说,HTTP 无法做到服务器自动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭毗连,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向浏览器推送信息。
FastAPI集成SSE功能

设置环境
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple sse-starlette fastapi[all]
代码实现:
  1. try:
  2.     from sse_starlette.sse import EventSourceResponse
  3.     import asyncio
  4. except:
  5.     import os
  6.     os.system('pip install -i https://pypi.tuna.tsinghua.edu.cn/simple sse-starlette')
  7.     from sse_starlette.sse import EventSourceResponse
  8.     import asyncio
  9. @router.get("/sse")
  10. async def root(request: Request):
  11.     async def event_generator(request: Request):
  12.         while True:
  13.             if await request.is_disconnected():
  14.                 print("连接已中断")
  15.                 break
  16.             sse_test_data = gl.gl_Redis.hgetall('sse:test:data')
  17.             sse_test_data['warn_site'] = sse_test_data['warn_site'].split(',') if sse_test_data.get('warn_site') else []
  18.             yield {
  19.                 "event": "message",
  20.                 # "retry": 15000,
  21.                 "data": demjson.encode(sse_test_data)
  22.             }
  23.             await asyncio.sleep(10)
  24.     g = event_generator(request)
  25.     return EventSourceResponse(g)
复制代码
SEE客户端测试脚本
  1. import aiohttp
  2. import asyncio
  3. async def test():
  4.     headers = {'Content-Type': 'text/event-stream'}
  5.     sseresp = aiohttp.request("GET", r"http://127.0.0.1:9000/api/sse", headers=headers)
  6.     async with sseresp as r:
  7.         async for chunk in r.content.iter_any():
  8.             print(chunk.decode())
  9. if __name__ == '__main__':
  10.     loop = asyncio.get_event_loop()
  11.     loop.run_until_complete(test())
复制代码
【特殊提示】在现在大多数webviews框架打包APP时,sse服务无效!!!
本文由博客一文多发平台 OpenWrite 发布!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

拉不拉稀肚拉稀

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表