FastAPI、langchain搭建chatbot,langgraph实现汗青记录

打印 上一主题 下一主题

主题 852|帖子 852|积分 2556

情况:openEuler、python 3.11.6、Azure openAi、langchain  0.3.3、langgraph  0.2.38
背景:基于FastAPI、langchain实现一个QA系统,要求实现汗青记录以及存储特征信息
时间:20241022
阐明:在汗青记录的存储中,增加自定义的存储内容,后期还要以流的形式实现
官方文档地点:Add summary of the conversation history
源码地点:尚无
1、情况搭建(略)

2、功能案例

1、实现记录和总结(langgraph)

概要:使用langgraph实现对话系统,该系统具备谈天汗青记录和总结功能
官方文档地点:Add summary of the conversation history
 实现具有总结和汗青记录功能的chatbot
  1. from typing import Literal
  2. from langchain_core.messages import SystemMessage, RemoveMessage
  3. from langgraph.checkpoint.memory import MemorySaver
  4. from langgraph.graph import MessagesState, StateGraph, START, END
  5. from langchain_core.messages import HumanMessage
  6. from langchain_openai import AzureChatOpenAI
  7. # 加载环境中的api_key等信息
  8. from dotenv import load_dotenv
  9. load_dotenv(".env")
  10. # 创建memory,用于记录state信息
  11. memory = MemorySaver()
  12. # 在state中新增summary参数
  13. class State(MessagesState):
  14.     summary: str
  15. # 使用Azure作为chain
  16. model = AzureChatOpenAI(deployment_name="aicontent-validation")
  17. # 定义访问模型的逻辑,这是node
  18. def call_model(state: State):
  19.     # summary是否存在,影响下一次的回答内容
  20.     summary = state.get("summary", "")
  21.     if summary:
  22.         system_message = f"Summary of conversation earlier: {summary}"
  23.         messages = [SystemMessage(content=system_message)] + state["messages"]
  24.     else:
  25.         messages = state["messages"]
  26.     response = model.invoke(messages)
  27.     # 将内容以列表的形式返回,会添加到state的列表中
  28.     return {"messages": [response]}
  29. # 确认结束还是执行summarize_conversation
  30. def should_continue(state: State) -> Literal["summarize_conversation", END]:
  31.     """Return the next node to execute."""
  32.     messages = state["messages"]
  33.     # 如果对话内容超过5条,则进行总结(转向执行summarize_conversation)
  34.     if len(messages) > 5:
  35.         return "summarize_conversation"
  36.     # 这个graph结束了
  37.     return END
  38. def summarize_conversation(state: State):
  39.     # 在state中获取summary的内容,没有则定义为空字符串
  40.     summary = state.get("summary", "")
  41.     if summary:
  42.         # summary_message可以认为是一个prompt,是否存在summary_message,直接影响总结
  43.         summary_message = (
  44.             f"This is summary of the conversation to date: {summary}\n\n"
  45.             "Extend the summary by taking into account the new messages above:"
  46.         )
  47.     else:
  48.         summary_message = "Create a summary of the conversation above:"
  49.     messages = state["messages"] + [HumanMessage(content=summary_message)]
  50.     # response即为总结内容
  51.     response = model.invoke(messages)
  52.     # 总结后,仅保留两条消息记录
  53.     delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
  54.     # delete_messages是最终保留的内容
  55.     return {"summary": response.content, "messages": delete_messages}
  56. # 定义一个新的graph
  57. workflow = StateGraph(State)
  58. # 定义节点,会话和总结两个node
  59. workflow.add_node("conversation", call_model)
  60. workflow.add_node(summarize_conversation)
  61. # 定义边,起始节点为conversation,即call_model函数
  62. workflow.add_edge(START, "conversation")
  63. # 添加条件边
  64. workflow.add_conditional_edges(
  65.     # 使用conversation,作为起始的node,意味着所有的边都在conversion节点后被访问
  66.     "conversation",
  67.     # 通过该function,来确认到底执行哪个节点
  68.     should_continue,
  69. )
  70. # summarize_conversation到END是单向边
  71. workflow.add_edge("summarize_conversation", END)
  72. # 编译,添加参数checkpointer,指定memory
  73. app = workflow.compile(checkpointer=memory)
  74. # 获取信息
  75. def print_update(update):
  76.     for k, v in update.items():
  77.         for m in v["messages"]:
  78.             m.pretty_print()
  79.         if "summary" in v:
  80.             print(v["summary"])
  81. # 定义配置文件,用于后期查询历史
  82. config = {"configurable": {"thread_id": "4"}}
复制代码
 调用graph实现对话,当大于6条则总结
  1. # 下面三个是问题以及输出
  2. input_message = HumanMessage(content="hi! I'm bob")
  3. input_message.pretty_print()
  4. for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):
  5.     print_update(event)
  6. input_message = HumanMessage(content="what's my name?")
  7. input_message.pretty_print()
  8. for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):
  9.     print_update(event)
  10. input_message = HumanMessage(content="i like the celtics!")
  11. input_message.pretty_print()
  12. for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):
  13.     print_update(event)
复制代码
 输出:
  1. [root@Laptop-latitude-7300 chatbot_fastapi]# python test.py
  2. ================================ Human Message =================================
  3. hi! I'm bob
  4. ================================== Ai Message ==================================
  5. Hello Bob! How can I assist you today?
  6. ================================ Human Message =================================
  7. what's my name?
  8. ================================== Ai Message ==================================
  9. Your name is Bob.
  10. ================================ Human Message =================================
  11. i like the celtics!
  12. ================================== Ai Message ==================================
  13. That's great! The Boston Celtics are indeed a fantastic basketball team. They have a rich history and a large fanbase. Are you excited about the upcoming games?
  14. ================================ Remove Message ================================
  15. ================================ Remove Message ================================
  16. ================================ Remove Message ================================
  17. ================================ Remove Message ================================
  18. In this conversation, the user introduces himself as Bob. He also expresses his liking for the Boston Celtics, a professional basketball team. The assistant acknowledges his interest and asks if he is excited about the upcoming games.
复制代码
 其中pretty_print、print_update是graph格式化的输出,示例如下:
input_message.pretty_print()实现了如下输出
  1. ================================ Human Message =================================
  2. i like the celtics!
复制代码
print_update(event)输出如下:
  1. ================================== Ai Message ==================================
  2. Your name is Bob.
复制代码

 
  1. ================================ Remove Message ================================
复制代码
2、FastAPI、langgraph实现chatbot(非流式)

 基于上述代码,实现非流式输出的问答系统
安装相应的模块
  1. pip install fastapi python-dotenv langchain_core langchain langgraph uvicorn langchain_openai
复制代码
 通过fastapi实现接口
  1. # main.py
  2. from dotenv import load_dotenv
  3. load_dotenv(".env")
  4. from fastapi import FastAPI, Request, WebSocket
  5. from pydantic import BaseModel
  6. from json import loads
  7. from chat import generate_response, get_chat_history
  8. app = FastAPI()
  9. # 定义请求和响应模型
  10. class ChatRequest(BaseModel):
  11.     user_id: str
  12.     input: str
  13. class ChatResponse(BaseModel):
  14.     response: str
  15. class ChatHistoryResponse(BaseModel):
  16.     history: list
  17. # 问答接口
  18. @app.get("/chat")
  19. async def websocket_ping(query: str, user_id: str):   
  20.     response = await generate_response(query, user_id)   
  21.     return {"response": response}
  22. # 获取历史记录接口
  23. @app.get("/chat_history/{user_id}")
  24. async def get_history_endpoint(user_id: str):
  25.     # 获取聊天历史记录
  26.     chat_history = get_chat_history(user_id)
  27.    
  28.     return {"history": chat_history}
  29. if __name__ == "__main__":
  30.     import uvicorn
  31.     uvicorn.run(app, host="0.0.0.0", port=8000)
复制代码
通过调用generate_response函数,返反响应
 天生响应以及获取汗青记录的函数
  1. from langchain_core.messages import HumanMessage, AIMessage
  2. from langgraph_test import graphapp
  3. from langchain_core.output_parsers import StrOutputParser
  4. from langgraph.checkpoint.serde import jsonplus
  5. from ignore_socket import ignore_msgpack_default
  6. jsonplus._msgpack_default = ignore_msgpack_default
  7. # 转换函数, 将数据中的AIMessage,HumanMessage转换为字符串
  8. def message_to_dict(message):
  9.     if isinstance(message, HumanMessage):
  10.         return {
  11.             'content': message.content,
  12.             'type': 'human',
  13.             'additional_kwargs': message.additional_kwargs,
  14.             'response_metadata': message.response_metadata,
  15.             'id': message.id
  16.         }
  17.     elif isinstance(message, AIMessage):
  18.         return {
  19.             'content': message.content,
  20.             'type': 'ai',
  21.             'additional_kwargs': message.additional_kwargs,
  22.             'response_metadata': message.response_metadata,
  23.             'id': message.id,
  24.             'usage_metadata': message.usage_metadata
  25.         }
  26.     else:
  27.         raise ValueError(f"Unsupported message type: {type(message)}")
  28. # 定义生成对话的函数
  29. async def generate_response(input_text: str, user_id: str):
  30.     # 定义用户配置文件
  31.     config = {"configurable": {"thread_id": user_id}}
  32.     # 实例化用户输入的消息
  33.     input_message = HumanMessage(content=input_text)
  34.     # 处理消息
  35.     event =  await graphapp.ainvoke({"messages":input_message}, config=config, stream_mode="values")
  36.     # 返回信息内容
  37.     return event.get("messages")[-1].content
  38. # 获取聊天历史记录
  39. def get_chat_history(user_id: str):
  40.     # 通过config获取相关用户聊天信息
  41.     values = graphapp.get_state({"configurable": {"thread_id": user_id}}).values
  42.     try:
  43.         # 仅获取messages中的content、type数据
  44.         values['messages'] = [message_to_dict(message) for message in values['messages']]
  45.         values["messages"] = [{k: v for k, v in message.items() if k in ["content", "type"]} for message in values["messages"]]
  46.     except:
  47.         values["messages"] = []
  48.     finally:
  49.         return values
复制代码
 使用langgraph实现对话功能
  1. from langchain.prompts import PromptTemplate, ChatPromptTemplate
  2. from langchain_core.output_parsers import StrOutputParser
  3. from typing import Literal
  4. from langchain_core.messages import HumanMessage
  5. from langchain_openai import AzureChatOpenAI
  6. from langchain_core.messages import SystemMessage, RemoveMessage
  7. from langgraph.checkpoint.memory import MemorySaver
  8. from langgraph.graph import StateGraph, START, END
  9. from langgraph.graph.message import add_messages
  10. from typing_extensions import TypedDict, Annotated
  11. from langchain_core.messages import AnyMessage
  12.    
  13. memory = MemorySaver()
  14. # 我们将使用此模型进行对话和总结
  15. llm = AzureChatOpenAI(deployment_name="aicontent-validation")
  16. # 定义 Prompt 模板(仅用于输出字符串)
  17. prompt_template = PromptTemplate(
  18.     input_variables=["query"],
  19.     template="You are a helpful assistant. User: {query}. Assistant:"
  20. )
  21. # 创建 LangChain 的处理链
  22. chain = prompt_template | llm | StrOutputParser()
  23. # 我们将添加一个 'summary' 属性
  24. class State(TypedDict):
  25.     messages: Annotated[list[AnyMessage], add_messages]
  26.     summary: str
  27. # 定义调用模型的逻辑
  28. async def call_model(state: State):
  29.     # 如果存在摘要,我们会将其作为系统消息添加
  30.     summary = state.get("summary", "")
  31.     if summary:
  32.         system_message = f"Summary of conversation earlier: {summary}"
  33.         messages = [SystemMessage(content=system_message)] + state["messages"]
  34.     else:
  35.         messages = state["messages"]
  36.     # 两种调用方式,参数支持{"input":messages}、messages
  37.     # response = chain.invoke({"input":messages})
  38.     response =await chain.ainvoke(messages)
  39.     # 我们返回一个列表,因为它将被添加到现有列表中
  40.     return {"messages": [response]}
  41. # 现在,我们定义用于确定是结束还是总结对话的逻辑
  42. async def should_continue(state: State) -> Literal["summarize_conversation", END]:
  43.     """Return the next node to execute."""
  44.     messages = state["messages"]
  45.     # 如果消息超过 6 条,则我们汇总对话
  46.     if len(messages) > 1:
  47.         return "summarize_conversation"
  48.     # 否则我们就可以结束
  49.     return END
  50. async def summarize_conversation(state: State):
  51.     # 首先,获取总结内容
  52.     summary = state.get("summary", "")
  53.     if summary:
  54.         # 如果 summary 已经存在,我们使用不同的系统提示符总结它
  55.         summary_message = (
  56.             f"This is summary of the conversation to date: {summary}\n\n"
  57.             "Extend the summary by taking into account the new messages above:"
  58.         )
  59.     else:
  60.         summary_message = "Create a summary of the conversation above:"
  61.     # 历史信息和总结信息
  62.     messages = state["messages"] + [HumanMessage(content=summary_message)]
  63.     # 获取总结信息
  64.     response = await chain.ainvoke(messages)
  65.     # 我们现在需要删除我们不想再显示的消息,将删除除最后两条消息之外的所有消息
  66.     delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
  67.     return {"summary": response, "messages": delete_messages}
  68. # 定义新的graph
  69. workflow = StateGraph(State)
  70. # 定义 conversation 节点和 summarize 节点
  71. workflow.add_node("conversation", call_model)
  72. workflow.add_node(summarize_conversation)
  73. # 将入口点设置为对话
  74. workflow.add_edge(START, "conversation")
  75. # 现在,我们添加一条条件边
  76. workflow.add_conditional_edges(
  77.     # 首先,我们定义起始节点。我们使用 'conversation'。
  78.     # 这意味着这些是调用 'conversation' 节点后采用的边。
  79.     "conversation",
  80.     # 接下来,我们传入一个函数,该函数将确定接下来调用哪个节点。
  81.     should_continue,
  82. )
  83. # 我们现在添加一条从 'summarize_conversation' 到 END 的法线边。
  84. # 这意味着在调用 'summarize_conversation' 之后,我们结束。
  85. workflow.add_edge("summarize_conversation", END)
  86. # 最后,我们编译它!
  87. graphapp = workflow.compile(checkpointer=memory)
复制代码
测试功能
启动服务:uvicorn main:app --reload --host 0.0.0.0 --port 8000
浏览器访问:http://172.26.20.199:8000/docs
 
 打开chat接口的折叠,点击try it out,在输入框输入信息,点击下面的执行,稍微等待,如下:

再次执行输入:what is my name,用来测试是否具备影象功能
 
 测试总结功能:使用上述定义的123

在summary中存在总结(择要),成功 
3、FastAPI、langgraph实现chatbot(流式)

基于上述代码,实现基于FastAPI、langchain、langgraph的流式传输,
FastAPI官方文档:WebSocket - FastAPI 中文
 主要鉴戒官方文档,在此基础上实现了流式的传输
安装相应的模块
  1. pip install fastapi python-dotenv langchain_core langchain langgraph uvicorn websockets langchain_openai
复制代码
 1、计划html文件,实现websocket请求,基于FastAPI文档魔改

  1. // websocket_test.html
  2. <!DOCTYPE html>
  3. <html lang="en">
  4. <head>
  5. <meta charset="UTF-8">
  6. <meta name="viewport" content="width=device-width, initial-scale=1.0">
  7. <title>pc</title>
  8. <body>
  9. <div id="rowAiChatbot">
  10. </div>
  11.     <script>
  12.         var ws = new WebSocket('ws://localhost:8000/chat');
  13.         ws.onopen = function () {
  14.             console.log('ws onopen');
  15.             ws.send('{"query":"my name is jack", "user_id":"123"}');
  16.         };
  17.         ws.onmessage = function (e) {
  18.             console.log('ws onmessage');
  19.             console.log('from server: ' , e.data);
  20.         };
  21. </script>
  22. </body>
  23. </html>
复制代码
2、计划FastAPI主程序,实现接收参数并执行

  1. # main.py
  2. from dotenv import load_dotenv
  3. load_dotenv(".env")
  4. from fastapi import FastAPI, WebSocket
  5. from json import loads
  6. from chat import generate_response, get_chat_history
  7. app = FastAPI()
  8. @app.websocket("/chat")
  9. async def websocket_ping(websocket: WebSocket):
  10.     await websocket.accept()
  11.    
  12.     try:
  13.         while True:
  14.             # 处理消息等逻辑
  15.             data = await websocket.receive_text()
  16.             jsondata = loads(data)
  17.             # 生成响应
  18.             await generate_response(jsondata.get("query"), jsondata.get("user_id"), websocket)
复制代码
3、实现调用graph,实现流式输出

鉴戒langgraph文档:LangGraph:Stream events from the final node
  1. # chat.py
  2. from langchain_core.messages import HumanMessage, AIMessage
  3. from langgraph_test import graphapp
  4. # 定义生成对话的函数
  5. async def generate_response(input_text: str, user_id: str, websocket: object):
  6.     # 定义用户配置文件
  7.     config = {"configurable": {"thread_id": user_id}}
  8.     # 实例化用户输入的消息
  9.     input_message = HumanMessage(content=input_text)
  10.     # 异步流处理消息
  11.     async for msg, metadata in graphapp.astream({"messages":input_message}, config, stream_mode="messages"):
  12.         # 仅消息存在且节点为conversation的才返回给前端
  13.         if msg.content and metadata.get("langgraph_node") == 'conversation' and metadata.get("ls_provider"):
  14.    
  15.             # print(msg.content, end=" ")
  16.             # 使用websocket实现流式传输到前端
  17.             await websocket.send_text(msg.content)
复制代码
4、langgraph实现总结、记录

  1. from langchain.prompts import PromptTemplate
  2. from langchain_core.output_parsers import StrOutputParser
  3. from typing import Literal
  4. from langchain_core.messages import HumanMessage, AIMessage
  5. from langchain_openai import AzureChatOpenAI
  6. from langchain_core.messages import SystemMessage, RemoveMessage
  7. from langgraph.checkpoint.memory import MemorySaver
  8. from langgraph.graph import MessagesState, StateGraph, START, END
  9. memory = MemorySaver()
  10. # 我们将使用此模型进行对话和总结
  11. llm = AzureChatOpenAI(deployment_name="aicontent-validation")
  12. # 定义 Prompt 模板(仅用于输出字符串)
  13. prompt_template = PromptTemplate(
  14.     input_variables=["query"],
  15.     template="You are a helpful assistant. User: {query}. Assistant:"
  16. )
  17. # 创建 LangChain 的处理链
  18. chain = prompt_template | llm | StrOutputParser()
  19. # 我们将添加一个 'summary' 属性(除了MessagesState已经有 'messages' 键之外)
  20. class State(MessagesState):
  21.     summary: str
  22. # 定义调用模型的逻辑
  23. async def call_model(state: State):
  24.     # 如果存在摘要,我们会将其作为系统消息添加
  25.     summary = state.get("summary", "")
  26.    
  27.     if summary:
  28.         system_message = f"Summary of conversation earlier: {summary}"
  29.         messages = [SystemMessage(content=system_message)] + state["messages"]
  30.     else:
  31.         messages = state["messages"]
  32.     # 两种调用方式,参数支持{"input":messages}、messages
  33.     # response = chain.invoke({"input":messages})
  34.     response =await chain.ainvoke(messages)
  35.     # print(response)
  36.     # 用于验证输出一致,此处执行与generate_response位置输出时间基本一致
  37.     # async for msg in chain.astream(messages):
  38.     #     print(msg)
  39.     # 我们返回一个列表,因为它将被添加到现有列表中
  40.     return {"messages": [AIMessage(content=response)]}
  41. # 现在,我们定义用于确定是结束还是总结对话的逻辑
  42. async def should_continue(state: State) -> Literal["summarize_conversation", END]:
  43.     """Return the next node to execute."""
  44.     messages = state["messages"]
  45.     # 如果消息超过 6 条,则我们汇总对话
  46.     if len(messages) > 3:
  47.         return "summarize_conversation"
  48.     # 否则我们就可以结束
  49.     return END
  50. async def summarize_conversation(state: State):
  51.     # 首先,获取总结内容
  52.     summary = state.get("summary", "")
  53.     if summary:
  54.         # 如果 summary 已经存在,我们使用不同的系统提示符总结它
  55.         summary_message = (
  56.             f"This is summary of the conversation to date: {summary}\n\n"
  57.             "Extend the summary by taking into account the new messages above:"
  58.         )
  59.     else:
  60.         summary_message = "Create a summary of the conversation above:"
  61.     # 历史信息和总结信息
  62.     messages = state["messages"] + [HumanMessage(content=summary_message)]
  63.     # 获取总结信息
  64.     response = await chain.ainvoke(messages)
  65.     # 我们现在需要删除我们不想再显示的消息,将删除除最后两条消息之外的所有消息
  66.     delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
  67.     return {"summary": response, "messages": delete_messages}
  68. # 定义新的graph
  69. workflow = StateGraph(State)
  70. # 定义 conversation 节点和 summarize 节点
  71. workflow.add_node("conversation", call_model)
  72. workflow.add_node(summarize_conversation)
  73. # 将入口点设置为对话
  74. workflow.add_edge(START, "conversation")
  75. # 现在,我们添加一条条件边
  76. workflow.add_conditional_edges(
  77.     # 首先,我们定义起始节点。我们使用 'conversation'。
  78.     # 这意味着这些是调用 'conversation' 节点后采用的边。
  79.     "conversation",
  80.     # 接下来,我们传入一个函数,该函数将确定接下来调用哪个节点。
  81.     should_continue,
  82. )
  83. # 我们现在添加一条从 'summarize_conversation' 到 END 的法线边。
  84. # 这意味着在调用 'summarize_conversation' 之后,我们结束。
  85. workflow.add_edge("summarize_conversation", END)
  86. # 最后,我们编译它!
  87. graphapp = workflow.compile(checkpointer=memory)
复制代码
5、测试流式传输

使用浏览器打开html文件,效果如下:

6、获取汗青记录

 请求接口
  1. @app.get("/chat_history/{user_id}")
  2. async def get_history_endpoint(user_id: str):
  3.     # 获取聊天历史记录
  4.     chat_history = get_chat_history(user_id)
  5.    
  6.     return {"history": chat_history}
复制代码
 获取逻辑方法,定义一个函数进行数据转换
  1. # chat.py
  2. # 转换函数, 将数据中的AIMessage,HumanMessage转换为字符串
  3. def message_to_dict(message):
  4.     if isinstance(message, HumanMessage):
  5.         return {
  6.             'content': message.content,
  7.             'type': 'human',
  8.             'additional_kwargs': message.additional_kwargs,
  9.             'response_metadata': message.response_metadata,
  10.             'id': message.id
  11.         }
  12.     elif isinstance(message, AIMessage):
  13.         return {
  14.             'content': message.content,
  15.             'type': 'ai',
  16.             'additional_kwargs': message.additional_kwargs,
  17.             'response_metadata': message.response_metadata,
  18.             'id': message.id,
  19.             'usage_metadata': message.usage_metadata
  20.         }
  21.     else:
  22.         raise ValueError(f"Unsupported message type: {type(message)}")
  23. # 获取聊天历史记录
  24. def get_chat_history(user_id: str):
  25.     # 通过config获取相关用户聊天信息
  26.     values = graphapp.get_state({"configurable": {"thread_id": user_id}}).values
  27.     try:
  28.         # 仅获取messages中的content、type数据
  29.         values['messages'] = [message_to_dict(message) for message in values['messages']]
  30.         values["messages"] = [{k: v for k, v in message.items() if k in ["content", "type"]} for message in values["messages"]]
  31.     except:
  32.         values["messages"] = []
  33.     finally:
  34.         return values
复制代码
浏览器测试
网址:http://172.25.23.143:8000/docs#

 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

渣渣兔

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表