通过 API 将Deepseek相应流式内容输出到前端

种地  论坛元老 | 2025-4-12 18:02:38 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1714|帖子 1714|积分 5142

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
要实现通过 API 将流式内容输出到前端,可以采用以下技能方案(以 Python 后端 + 前端 JavaScript 为例):
方案一:使用 Server-Sent Events (SSE)

这是浏览器原生支持的流式传输方案,推荐首选
  1. # Flask 示例
  2. from flask import Response, stream_with_context
  3. @app.route('/stream')
  4. def stream_data():
  5.     def generate():
  6.         response = client.chat.completions.create(
  7.             model="deepseek-chat",
  8.             messages=messages,
  9.             stream=True
  10.         )
  11.         
  12.         for chunk in response:
  13.             if chunk.choices:
  14.                 content = chunk.choices[0].delta.content or ""
  15.                 # SSE 格式要求 data: 前缀和双换行符
  16.                 yield f"data: {json.dumps({'content': content})}\n\n"
  17.    
  18.     return Response(stream_with_context(generate()), mimetype='text/event-stream')
复制代码
  1. // 前端 JavaScript
  2. const eventSource = new EventSource('/stream');
  3. eventSource.onmessage = (event) => {
  4.     const data = JSON.parse(event.data);
  5.     document.getElementById('output').innerHTML += data.content;
  6. };
  7. eventSource.onerror = (err) => {
  8.     console.error('EventSource failed:', err);
  9.     eventSource.close();
  10. };
复制代码
方案二:使用流式 HTTP 相应(NDJSON)

更通用的流式传输方案,适合非浏览器客户端
  1. # FastAPI 示例
  2. from fastapi import APIRouter
  3. from fastapi.responses import StreamingResponse
  4. import json
  5. @app.get("/stream")
  6. async def stream_data():
  7.     async def generate():
  8.         response = client.chat.completions.create(
  9.             model="deepseek-chat",
  10.             messages=messages,
  11.             stream=True
  12.         )
  13.         
  14.         async for chunk in response:
  15.             if chunk.choices:
  16.                 content = chunk.choices[0].delta.content or ""
  17.                 yield json.dumps({"content": content}) + "\n"  # NDJSON 格式
  18.    
  19.     return StreamingResponse(generate(), media_type='application/x-ndjson')
复制代码
  1. // 前端 JavaScript 使用 Fetch API
  2. async function streamData() {
  3.     const response = await fetch('/stream');
  4.     const reader = response.body.getReader();
  5.     const decoder = new TextDecoder();
  6.    
  7.     while(true) {
  8.         const { done, value } = await reader.read();
  9.         if(done) break;
  10.         
  11.         const chunk = decoder.decode(value);
  12.         const data = JSON.parse(chunk);
  13.         document.getElementById('output').innerHTML += data.content;
  14.     }
  15. }
复制代码
关键设置说明


  • 相应头设置
    1. # Flask
    2. headers = {
    3.     'Cache-Control': 'no-cache',
    4.     'Connection': 'keep-alive'
    5. }
    6. # FastAPI 自动处理
    复制代码
  • 数据格式选择

    • SSE (text/event-stream):浏览器原生支持,自动重连
    • NDJSON (application/x-ndjson):更通用的流式 JSON 格式
    • 纯文本流:简单但结构化本领弱

  • 前端处理建议
    1. // 更健壮的读取方式(处理分块不完整情况)
    2. let buffer = '';
    3. async function processChunk(chunk) {
    4.     buffer += chunk;
    5.     while(buffer.includes('\n')) {
    6.         const lineEnd = buffer.indexOf('\n');
    7.         const line = buffer.slice(0, lineEnd);
    8.         buffer = buffer.slice(lineEnd + 1);
    9.         
    10.         try {
    11.             const data = JSON.parse(line);
    12.             // 处理数据...
    13.         } catch(e) {
    14.             console.error('解析错误:', e);
    15.         }
    16.     }
    17. }
    复制代码
完整工作流程示例(FastAPI + React)

后端
  1. # main.py
  2. from fastapi import FastAPI
  3. from fastapi.middleware.cors import CORSMiddleware
  4. app = FastAPI()
  5. app.add_middleware(
  6.     CORSMiddleware,
  7.     allow_origins=["*"],
  8.     allow_methods=["*"],
  9.     allow_headers=["*"],
  10. )
  11. @app.get("/chat")
  12. async def chat_stream(prompt: str):
  13.     async def generate():
  14.         response = client.chat.completions.create(
  15.             model="deepseek-chat",
  16.             messages=[{"role": "user", "content": prompt}],
  17.             stream=True
  18.         )
  19.         
  20.         async for chunk in response:
  21.             if content := chunk.choices[0].delta.content:
  22.                 yield json.dumps({"content": content})
  23.    
  24.     return StreamingResponse(generate(), media_type="application/x-ndjson")
复制代码
前端 React 组件
  1. // ChatComponent.jsx
  2. import { useState } from 'react';
  3. export default function ChatComponent() {
  4.     const [output, setOutput] = useState('');
  5.    
  6.     const startStream = async () => {
  7.         const response = await fetch('http://api/chat?prompt=你好');
  8.         const reader = response.body.getReader();
  9.         const decoder = new TextDecoder();
  10.         let buffer = '';
  11.         
  12.         while(true) {
  13.             const { done, value } = await reader.read();
  14.             if(done) break;
  15.             
  16.             buffer += decoder.decode(value);
  17.             while(buffer.includes('}')) {
  18.                 const endIndex = buffer.indexOf('}') + 1;
  19.                 const chunk = buffer.slice(0, endIndex);
  20.                 buffer = buffer.slice(endIndex);
  21.                
  22.                 try {
  23.                     const data = JSON.parse(chunk);
  24.                     setOutput(prev => prev + data.content);
  25.                 } catch(e) {
  26.                     console.error('解析错误:', e);
  27.                 }
  28.             }
  29.         }
  30.     };
  31.    
  32.     return (
  33.         <div>
  34.             <button onClick={startStream}>开始对话</button>
  35.             <div id="output">{output}</div>
  36.         </div>
  37.     );
  38. }
复制代码
注意事项


  • 毗连管理

    • 设置合理的超时时间(通常 30-60 秒)
    • 处理客户端提前断开毗连的情况
    1. # FastAPI 示例
    2. try:
    3.     async for chunk in response:
    4.         # ...处理数据
    5.         if await request.is_disconnected():
    6.             break
    7. finally:
    8.     await client.close()  # 清理资源
    复制代码

  • 性能优化

    • 使用异步框架(FastAPI 性能优于 Flask)
    • 启用相应压缩
    1. app = FastAPI()
    2. @app.middleware("http")
    3. async def add_compression(request, call_next):
    4.     response = await call_next(request)
    5.     response.headers["Content-Encoding"] = "gzip"
    6.     return response
    复制代码

  • 安全考虑

    • 限定最大并发毗连数
    • 实施速率限定
    1. from fastapi import Request
    2. from fastapi.middleware import Middleware
    3. from slowapi import Limiter
    4. from slowapi.util import get_remote_address
    5. limiter = Limiter(key_func=get_remote_address)
    6. app.state.limiter = limiter
    7. @app.get("/chat")
    8. @limiter.limit("10/minute")
    9. async def chat_stream(request: Request):
    10.     # ...
    复制代码

  • 错误处理增强
    1. async def generate():
    2.     try:
    3.         response = client.chat.completions.create(...)
    4.         async for chunk in response:
    5.             # 处理数据...
    6.     except Exception as e:
    7.         yield json.dumps({"error": str(e)})
    8.     finally:
    9.         await client.close()  # 确保释放资源
    复制代码
这些方案可根据具体需求组合使用,建议优先选择 SSE 方案(浏览器兼容性好),需要支持更复杂场景时可考虑 WebSocket,但后者实现成本较高。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

种地

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表