如何在FastAPI中玩转WebSocket消息处置处罚?

[复制链接]
发表于 2025-7-8 00:05:23 | 显示全部楼层 |阅读模式
扫描二维码
关注大概微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序https://tools.cmdragon.cn/
一、文本消息接收与发送
  1. # 运行环境:Python 3.8+
  2. # 安装依赖:pip install fastapi==0.68.0 uvicorn==0.15.0 websockets==10.3 pydantic==1.10.7
  3. from fastapi import FastAPI, WebSocket
  4. app = FastAPI()
  5. @app.websocket("/ws/chat")
  6. async def websocket_chat(websocket: WebSocket):
  7.     await websocket.accept()
  8.     try:
  9.         while True:
  10.             # 接收文本消息
  11.             client_msg = await websocket.receive_text()
  12.             # 处理消息(示例:添加时间戳)
  13.             server_response = f"[{datetime.now()}] Server received: {client_msg}"
  14.             # 发送文本响应
  15.             await websocket.send_text(server_response)
  16.     except WebSocketDisconnect:
  17.         print("Client disconnected")
复制代码
消息处置处罚流程:


  • 客户端通过ws://协议创建WebSocket毗连
  • 服务端使用await websocket.accept()接受毗连
  • 进入接收循环处置处罚receive_text()和send_text()
  • 非常处置处罚自动断开毗连
sequenceDiagram    participant C as Client    participant S as Server        C->>S: Establish WebSocket connection (ws://)    activate S    S->>S: await websocket.accept() to accept connection    activate C    loop Receiving and Sending Messages        C->>S: receive_text()        S-->>C: send_text()    end    alt Exception Handling        S->>C: Automatically disconnect    end    deactivate S    deactivate C应用场景:及时谈天室、协同编辑体系、及时日志日志监控监控
二、二进制数据传输处置处罚
  1. @app.websocket("/ws/file-transfer")
  2. async def websocket_file(websocket: WebSocket):
  3.     await websocket.accept()
  4.     try:
  5.         while True:
  6.             # 接收二进制数据
  7.             binary_data = await websocket.receive_bytes()
  8.             # 保存文件示例
  9.             with open("received_file.bin", "wb") as f:
  10.                 f.write(binary_data)
  11.             # 发送确认消息
  12.             await websocket.send_bytes(b"FILE_RECEIVED")
  13.     except WebSocketDisconnect:
  14.         print("File transfer interrupted")
复制代码
二进制处置处罚要点:


  • 使用receive_bytes()和send_bytes()方法
  • 适合传输图片、音频、视频等二进制格式
  • 建议分块传输大文件(联合消息头协议)
三、JSON消息序列化与自动剖析
  1. from pydantic import BaseModel
  2. class MessageModel(BaseModel):
  3.     user: str
  4.     content: str
  5.     timestamp: float
  6. @app.websocket("/ws/json-demo")
  7. async def websocket_json(websocket: WebSocket):
  8.     await websocket.accept()
  9.     try:
  10.         while True:
  11.             json_data = await websocket.receive_json()
  12.             # 自动验证JSON结构
  13.             message = MessageModel(**json_data)
  14.             # 处理业务逻辑
  15.             processed_data = message.dict()
  16.             processed_data["status"] = "PROCESSED"
  17.             # 返回处理结果
  18.             await websocket.send_json(processed_data)
  19.     except ValidationError as e:
  20.         await websocket.send_json({"error": str(e)})
复制代码
自动验证流程:


  • 接收原始JSON数据
  • 通过Pydantic模型举行数据洗濯
  • 自动类型转换和字段验证
  • 结构化错误响应返回
四、消息接收循环与超时控制
  1. from websockets.exceptions import ConnectionClosed
  2. @app.websocket("/ws/with-timeout")
  3. async def websocket_timeout(websocket: WebSocket):
  4.     await websocket.accept()
  5.     try:
  6.         while True:
  7.             try:
  8.                 # 设置10秒接收超时
  9.                 data = await asyncio.wait_for(websocket.receive_text(), timeout=10)
  10.                 await process_message(data)
  11.             except asyncio.TimeoutError:
  12.                 # 发送心跳包保持连接
  13.                 await websocket.send_text("HEARTBEAT")
  14.     except ConnectionClosed:
  15.         print("Connection closed normally")
复制代码
超时控制计谋:


  • 使用asyncio.wait_for设置单次接收超时
  • 定期发送心跳包维持毗连
  • 非常分类处置处罚(正常关闭/非常断开)
课后Quiz

Q1:如何处置处罚同时接收文本和二进制消息的场景?
A:通过receive()方法获取消息类型判断:
  1. message = await websocket.receive()
  2. if message["type"] == "websocket.receive.text":
  3.     handle_text(message["text"])
  4. elif message["type"] == "websocket.receive.bytes":
  5.     handle_bytes(message["bytes"])
复制代码
Q2:为什么推荐使用Pydantic举行JSON验证?
A:① 自动类型转换 ② 字段约束检查 ③ 防御无效数据 ④ 生成API文档
常见报错解决方案

422 Validation Error
  1. {
  2.   "detail": [
  3.     {
  4.       "loc": [
  5.         "body",
  6.         "timestamp"
  7.       ],
  8.       "msg": "field required",
  9.       "type": "value_error.missing"
  10.     }
  11.   ]
  12. }
复制代码
解决方法:

  • 检查客户端发送的JSON字段是否完整
  • 验证时间戳是否为数字类型
  • 添加默认值字段:timestamp: float = None
WebSocketTimeoutException

  • 成因:长时间未发送/接收消息
  • 解决:调整wait_for超时参数,添加心跳机制
余下文章内容请点击跳转至 个人博客页面 大概 扫码关注大概微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何在FastAPI中玩转WebSocket消息处置处罚?
往期文章归档:

免费好用的热门在线工具


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

快速回复 返回顶部 返回列表