马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
要实现通过 API 将流式内容输出到前端,可以采用以下技能方案(以 Python 后端 + 前端 JavaScript 为例):
方案一:使用 Server-Sent Events (SSE)
这是浏览器原生支持的流式传输方案,推荐首选
- # Flask 示例
- from flask import Response, stream_with_context
- @app.route('/stream')
- def stream_data():
- def generate():
- response = client.chat.completions.create(
- model="deepseek-chat",
- messages=messages,
- stream=True
- )
-
- for chunk in response:
- if chunk.choices:
- content = chunk.choices[0].delta.content or ""
- # SSE 格式要求 data: 前缀和双换行符
- yield f"data: {json.dumps({'content': content})}\n\n"
-
- return Response(stream_with_context(generate()), mimetype='text/event-stream')
复制代码- // 前端 JavaScript
- const eventSource = new EventSource('/stream');
- eventSource.onmessage = (event) => {
- const data = JSON.parse(event.data);
- document.getElementById('output').innerHTML += data.content;
- };
- eventSource.onerror = (err) => {
- console.error('EventSource failed:', err);
- eventSource.close();
- };
复制代码 方案二:使用流式 HTTP 相应(NDJSON)
更通用的流式传输方案,适合非浏览器客户端
- # FastAPI 示例
- from fastapi import APIRouter
- from fastapi.responses import StreamingResponse
- import json
- @app.get("/stream")
- async def stream_data():
- async def generate():
- response = client.chat.completions.create(
- model="deepseek-chat",
- messages=messages,
- stream=True
- )
-
- async for chunk in response:
- if chunk.choices:
- content = chunk.choices[0].delta.content or ""
- yield json.dumps({"content": content}) + "\n" # NDJSON 格式
-
- return StreamingResponse(generate(), media_type='application/x-ndjson')
复制代码- // 前端 JavaScript 使用 Fetch API
- async function streamData() {
- const response = await fetch('/stream');
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
-
- while(true) {
- const { done, value } = await reader.read();
- if(done) break;
-
- const chunk = decoder.decode(value);
- const data = JSON.parse(chunk);
- document.getElementById('output').innerHTML += data.content;
- }
- }
复制代码 关键设置说明
- 相应头设置:
- # Flask
- headers = {
- 'Cache-Control': 'no-cache',
- 'Connection': 'keep-alive'
- }
- # FastAPI 自动处理
复制代码 - 数据格式选择:
- SSE (text/event-stream):浏览器原生支持,自动重连
- NDJSON (application/x-ndjson):更通用的流式 JSON 格式
- 纯文本流:简单但结构化本领弱
- 前端处理建议:
- // 更健壮的读取方式(处理分块不完整情况)
- let buffer = '';
- async function processChunk(chunk) {
- buffer += chunk;
- while(buffer.includes('\n')) {
- const lineEnd = buffer.indexOf('\n');
- const line = buffer.slice(0, lineEnd);
- buffer = buffer.slice(lineEnd + 1);
-
- try {
- const data = JSON.parse(line);
- // 处理数据...
- } catch(e) {
- console.error('解析错误:', e);
- }
- }
- }
复制代码 完整工作流程示例(FastAPI + React)
后端:
- # main.py
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
- app = FastAPI()
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_methods=["*"],
- allow_headers=["*"],
- )
- @app.get("/chat")
- async def chat_stream(prompt: str):
- async def generate():
- response = client.chat.completions.create(
- model="deepseek-chat",
- messages=[{"role": "user", "content": prompt}],
- stream=True
- )
-
- async for chunk in response:
- if content := chunk.choices[0].delta.content:
- yield json.dumps({"content": content})
-
- return StreamingResponse(generate(), media_type="application/x-ndjson")
复制代码 前端 React 组件:
- // ChatComponent.jsx
- import { useState } from 'react';
- export default function ChatComponent() {
- const [output, setOutput] = useState('');
-
- const startStream = async () => {
- const response = await fetch('http://api/chat?prompt=你好');
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let buffer = '';
-
- while(true) {
- const { done, value } = await reader.read();
- if(done) break;
-
- buffer += decoder.decode(value);
- while(buffer.includes('}')) {
- const endIndex = buffer.indexOf('}') + 1;
- const chunk = buffer.slice(0, endIndex);
- buffer = buffer.slice(endIndex);
-
- try {
- const data = JSON.parse(chunk);
- setOutput(prev => prev + data.content);
- } catch(e) {
- console.error('解析错误:', e);
- }
- }
- }
- };
-
- return (
- <div>
- <button onClick={startStream}>开始对话</button>
- <div id="output">{output}</div>
- </div>
- );
- }
复制代码 注意事项
- 毗连管理:
- 设置合理的超时时间(通常 30-60 秒)
- 处理客户端提前断开毗连的情况
- # FastAPI 示例
- try:
- async for chunk in response:
- # ...处理数据
- if await request.is_disconnected():
- break
- finally:
- await client.close() # 清理资源
复制代码
- 性能优化:
- 使用异步框架(FastAPI 性能优于 Flask)
- 启用相应压缩
- app = FastAPI()
- @app.middleware("http")
- async def add_compression(request, call_next):
- response = await call_next(request)
- response.headers["Content-Encoding"] = "gzip"
- return response
复制代码
- 安全考虑:
- from fastapi import Request
- from fastapi.middleware import Middleware
- from slowapi import Limiter
- from slowapi.util import get_remote_address
- limiter = Limiter(key_func=get_remote_address)
- app.state.limiter = limiter
- @app.get("/chat")
- @limiter.limit("10/minute")
- async def chat_stream(request: Request):
- # ...
复制代码
- 错误处理增强:
- async def generate():
- try:
- response = client.chat.completions.create(...)
- async for chunk in response:
- # 处理数据...
- except Exception as e:
- yield json.dumps({"error": str(e)})
- finally:
- await client.close() # 确保释放资源
复制代码 这些方案可根据具体需求组合使用,建议优先选择 SSE 方案(浏览器兼容性好),需要支持更复杂场景时可考虑 WebSocket,但后者实现成本较高。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |