Langchain 流式输出到前端(真正解决方法,附最佳实践的完备代码) ...

打印 上一主题 下一主题

主题 595|帖子 595|积分 1785

Langchain 流式输出

当我们深入使用Langchain时,我们都会思量如何进行流式输出。只管官方网站提供了一些流式输出的示例,但这些示例只能在控制台中输出,并不能获取我们所需的生成器。而网上的许多教程也只是伪流式输出,即先完全生成结束,再进行流式输出。
以下是我为大家提供的真正的流式输出示例代码:

这里是2023/12/08 最新增补的方法,简单快捷,好理解,猛烈推荐。
(已封装为 fastapi 接口,可供前端调用,前端如何使用,则需要前端工程师了解一下 流式输出 SSE的知识,有相干的前端库)

大语言模型的摆设(glm3、qwen等)可以使用我的开源项目:https://github.com/shell-nlp/gpt_server

最新的流式输出方式(LCEL语法的特性) 推荐指数 : ※ ※ ※ ※ ※ ※※※※※:

  1. import json
  2. from dotenv import load_dotenv
  3. from langchain_community.chat_models.openai import ChatOpenAI
  4. from langchain_core.prompts import ChatPromptTemplate
  5. from fastapi import FastAPI
  6. from fastapi.responses import StreamingResponse
  7. app = FastAPI()
  8. load_dotenv()
  9. llm = ChatOpenAI(model="chatglm3", streaming=True, max_tokens=2048)
  10. prompt = ChatPromptTemplate.from_messages(
  11.     [("system", "你是一个专业的AI助手。"), ("human", "{query}")]
  12. )
  13. llm_chain = prompt | llm
  14. @app.post("/chat_stream")
  15. def chat_stream(query: str = "你是谁"):
  16.     ret = llm_chain.stream({"query": query})
  17.     def predict():
  18.         text = ""
  19.         for _token in ret:
  20.             token = _token.content
  21.             js_data = {"code": "200", "msg": "ok", "data": token}
  22.             yield f"data: {json.dumps(js_data,ensure_ascii=False)}\n\n"
  23.             text += token
  24.         print(text)
  25.     generate = predict()
  26.     return StreamingResponse(generate, media_type="text/event-stream")
  27. if __name__ == "__main__":
  28.     import uvicorn
  29.     uvicorn.run(app=app, host="0.0.0.0", port=10088)
复制代码
方法一 推荐指数 : ※ ※ ※ ※ ※ :

本方法是开发新的线程的方法(固然也可以是新的历程的方式) ,然后联合 langchain的callback方法
为什么推荐使用 线程的方法,而不是 方法二中的异步操纵,因为在实际使用过程中,许多外部的方法是不支持异步操纵的,要想让程序run起来,必须把一些方法(比如 langchain中的 某些检索器,官方代码只帮你写了 同步的方法,而没有实现异步的方法) 重写为异步方法,而在 重写的过程中会遇到许多 的 问题,令人头疼,而所有的代码都是支持同步的,以是开发新线程的方式是好的方法。
注: 此方法就是我在 使用方法二的异步方式的过程,遇到了难以解决的问题(本人小白)才找到了此种方式。
缺点: 开发线程要比 协程 更加耗费计算机资源,因此将来的实现 尽可能还是使用 方法二,但是对于不会异步编程的人来说方法一更加简单。
但是如果对异步操纵非常熟悉,那么方法二 是最能提升性能的方式。

代码实现不表明了,手疼,需要的人自然可以看得懂。
至于 ChatOpenAI 所需要的 key,需要自己想办法。
  1. import os
  2. from dotenv import load_dotenv
  3. from langchain.chat_models import ChatOpenAI
  4. from langchain.callbacks import StreamingStdOutCallbackHandler
  5. from fastapi import FastAPI
  6. from sse_starlette.sse import EventSourceResponse
  7. from typing import Generator
  8. import threading
  9. import uvicorn
  10. os.system('clear')
  11. app = FastAPI()
  12. load_dotenv()
  13. class My_StreamingStdOutCallbackHandler(StreamingStdOutCallbackHandler):
  14.     # def __init__(self):
  15.     tokens = []
  16.     # 记得结束后这里置true
  17.     finish = False
  18.     def on_llm_new_token(self, token: str, **kwargs) -> None:
  19.         self.tokens.append(token)
  20.     def on_llm_end(self, response, **kwargs) -> None:
  21.         self.finish = True
  22.     def on_llm_error(self, error: Exception, **kwargs) -> None:
  23.         self.tokens.append(str(error))
  24.     def generate_tokens(self) -> Generator:
  25.         while not self.finish:  # or self.tokens:
  26.             if self.tokens:
  27.                 token = self.tokens.pop(0)
  28.                 yield {'data': token}
  29.             else:
  30.                 pass
  31.                 # time.sleep(0.02)  # wait for a new token
  32. # 用于在另一个 线程中运行的方法
  33. def f(llm, query):
  34.     llm.predict(query)
  35. @app.post('/qa')
  36. def test(query='你好'):
  37.     callback = My_StreamingStdOutCallbackHandler()
  38.     llm = ChatOpenAI(model='chatglm3',
  39.                      streaming=True,
  40.                      callbacks=[callback],
  41.                      max_tokens=1024)
  42.     thread = threading.Thread(target=f, args=(llm, query))
  43.     thread.start()
  44.     return EventSourceResponse(callback.generate_tokens(), media_type="text/event-stream")
  45. if __name__ == '__main__':
  46.     uvicorn.run(app=app, host='0.0.0.0')
复制代码
方法二: 基于 python 异步机制实现 推荐指数 : ※ ※ ※ ※

为了方便展示,我直接使用gradio写一个小webUI,因为流式输出的场景就是用于Web的展示。直接进行python输出也是可以的。
值得注意的是 方法 一定要加上 async ,这里对于小白可能看不懂, 因为这里涉及到,异步协程等概念。
  1. import gradio as gr
  2. import asyncio
  3. from langchain.chat_models import ChatOpenAI
  4. #使用 异步的 Callback   AsyncIteratorCallbackHandler
  5. from langchain.callbacks import AsyncIteratorCallbackHandler
  6. async def f():
  7.    callback = AsyncIteratorCallbackHandler()
  8.    llm = ChatOpenAI(engine='GPT-35',streaming=True,callbacks=[callback])
  9.    coro = llm.apredict("写一个1000字的修仙小说")  # 这里如果是 LLMChain的话 可以 换成  chain.acall()
  10.    asyncio.create_task(coro)
  11.    text = ""
  12.    async for token in callback.aiter():
  13.        text = text+token
  14.        yield gr.TextArea.update(value=text)
  15. with gr.Blocks() as demo:
  16.     with gr.Column():
  17.          摘要汇总 = gr.TextArea(value="",label="摘要总结",)
  18.          bn = gr.Button("触发", variant="primary")
  19.     bn.click(f,[],[摘要汇总])
  20. demo.queue().launch(share=False, inbrowser=False, server_name="0.0.0.0", server_port=8001)
复制代码
方法来之不易,如果您有所收获,请点赞,收藏,关注

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

自由的羽毛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表