IT评测·应用市场-qidao123.com技术社区
标题:
通过 API 将Deepseek相应流式内容输出到前端
[打印本页]
作者:
种地
时间:
2025-4-12 18:02
标题:
通过 API 将Deepseek相应流式内容输出到前端
要实现通过 API 将流式内容输出到前端,可以采用以下技能方案(以 Python 后端 + 前端 JavaScript 为例):
方案一:使用 Server-Sent Events (SSE)
这是浏览器原生支持的流式传输方案,推荐首选
# Flask 示例
from flask import Response, stream_with_context
@app.route('/stream')
def stream_data():
def generate():
response = client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True
)
for chunk in response:
if chunk.choices:
content = chunk.choices[0].delta.content or ""
# SSE 格式要求 data: 前缀和双换行符
yield f"data: {json.dumps({'content': content})}\n\n"
return Response(stream_with_context(generate()), mimetype='text/event-stream')
复制代码
// 前端 JavaScript
const eventSource = new EventSource('/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
document.getElementById('output').innerHTML += data.content;
};
eventSource.onerror = (err) => {
console.error('EventSource failed:', err);
eventSource.close();
};
复制代码
方案二:使用流式 HTTP 相应(NDJSON)
更通用的流式传输方案,适合非浏览器客户端
# FastAPI 示例
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import json
@app.get("/stream")
async def stream_data():
async def generate():
response = client.chat.completions.create(
model="deepseek-chat",
messages=messages,
stream=True
)
async for chunk in response:
if chunk.choices:
content = chunk.choices[0].delta.content or ""
yield json.dumps({"content": content}) + "\n" # NDJSON 格式
return StreamingResponse(generate(), media_type='application/x-ndjson')
复制代码
// 前端 JavaScript 使用 Fetch API
async function streamData() {
const response = await fetch('/stream');
const reader = response.body.getReader();
const decoder = new TextDecoder();
while(true) {
const { done, value } = await reader.read();
if(done) break;
const chunk = decoder.decode(value);
const data = JSON.parse(chunk);
document.getElementById('output').innerHTML += data.content;
}
}
复制代码
关键设置说明
相应头设置
:
# Flask
headers = {
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
# FastAPI 自动处理
复制代码
数据格式选择
:
SSE (text/event-stream):浏览器原生支持,自动重连
NDJSON (application/x-ndjson):更通用的流式 JSON 格式
纯文本流:简单但结构化本领弱
前端处理建议
:
// 更健壮的读取方式(处理分块不完整情况)
let buffer = '';
async function processChunk(chunk) {
buffer += chunk;
while(buffer.includes('\n')) {
const lineEnd = buffer.indexOf('\n');
const line = buffer.slice(0, lineEnd);
buffer = buffer.slice(lineEnd + 1);
try {
const data = JSON.parse(line);
// 处理数据...
} catch(e) {
console.error('解析错误:', e);
}
}
}
复制代码
完整工作流程示例(FastAPI + React)
后端
:
# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/chat")
async def chat_stream(prompt: str):
async def generate():
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in response:
if content := chunk.choices[0].delta.content:
yield json.dumps({"content": content})
return StreamingResponse(generate(), media_type="application/x-ndjson")
复制代码
前端 React 组件
:
// ChatComponent.jsx
import { useState } from 'react';
export default function ChatComponent() {
const [output, setOutput] = useState('');
const startStream = async () => {
const response = await fetch('http://api/chat?prompt=你好');
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while(true) {
const { done, value } = await reader.read();
if(done) break;
buffer += decoder.decode(value);
while(buffer.includes('}')) {
const endIndex = buffer.indexOf('}') + 1;
const chunk = buffer.slice(0, endIndex);
buffer = buffer.slice(endIndex);
try {
const data = JSON.parse(chunk);
setOutput(prev => prev + data.content);
} catch(e) {
console.error('解析错误:', e);
}
}
}
};
return (
<div>
<button onClick={startStream}>开始对话</button>
<div id="output">{output}</div>
</div>
);
}
复制代码
注意事项
毗连管理
:
设置合理的超时时间(通常 30-60 秒)
处理客户端提前断开毗连的情况
# FastAPI 示例
try:
async for chunk in response:
# ...处理数据
if await request.is_disconnected():
break
finally:
await client.close() # 清理资源
复制代码
性能优化
:
使用异步框架(FastAPI 性能优于 Flask)
启用相应压缩
app = FastAPI()
@app.middleware("http")
async def add_compression(request, call_next):
response = await call_next(request)
response.headers["Content-Encoding"] = "gzip"
return response
复制代码
安全考虑
:
限定最大并发毗连数
实施速率限定
from fastapi import Request
from fastapi.middleware import Middleware
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.get("/chat")
@limiter.limit("10/minute")
async def chat_stream(request: Request):
# ...
复制代码
错误处理增强
:
async def generate():
try:
response = client.chat.completions.create(...)
async for chunk in response:
# 处理数据...
except Exception as e:
yield json.dumps({"error": str(e)})
finally:
await client.close() # 确保释放资源
复制代码
这些方案可根据具体需求组合使用,建议优先选择 SSE 方案(浏览器兼容性好),需要支持更复杂场景时可考虑 WebSocket,但后者实现成本较高。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/)
Powered by Discuz! X3.4