情况:openEuler、python 3.11.6、Azure openAi、langchain 0.3.3、langgraph 0.2.38
背景:基于FastAPI、langchain实现一个QA系统,要求实现汗青记录以及存储特征信息
时间:20241022
阐明:在汗青记录的存储中,增加自定义的存储内容,后期还要以流的形式实现
官方文档地点:Add summary of the conversation history
源码地点:尚无
1、情况搭建(略)
2、功能案例
1、实现记录和总结(langgraph)
概要:使用langgraph实现对话系统,该系统具备谈天汗青记录和总结功能
官方文档地点:Add summary of the conversation history
实现具有总结和汗青记录功能的chatbot
- from typing import Literal
- from langchain_core.messages import SystemMessage, RemoveMessage
- from langgraph.checkpoint.memory import MemorySaver
- from langgraph.graph import MessagesState, StateGraph, START, END
- from langchain_core.messages import HumanMessage
- from langchain_openai import AzureChatOpenAI
- # 加载环境中的api_key等信息
- from dotenv import load_dotenv
- load_dotenv(".env")
- # 创建memory,用于记录state信息
- memory = MemorySaver()
- # 在state中新增summary参数
- class State(MessagesState):
- summary: str
- # 使用Azure作为chain
- model = AzureChatOpenAI(deployment_name="aicontent-validation")
- # 定义访问模型的逻辑,这是node
- def call_model(state: State):
- # summary是否存在,影响下一次的回答内容
- summary = state.get("summary", "")
- if summary:
- system_message = f"Summary of conversation earlier: {summary}"
- messages = [SystemMessage(content=system_message)] + state["messages"]
- else:
- messages = state["messages"]
- response = model.invoke(messages)
- # 将内容以列表的形式返回,会添加到state的列表中
- return {"messages": [response]}
- # 确认结束还是执行summarize_conversation
- def should_continue(state: State) -> Literal["summarize_conversation", END]:
- """Return the next node to execute."""
- messages = state["messages"]
- # 如果对话内容超过5条,则进行总结(转向执行summarize_conversation)
- if len(messages) > 5:
- return "summarize_conversation"
- # 这个graph结束了
- return END
- def summarize_conversation(state: State):
- # 在state中获取summary的内容,没有则定义为空字符串
- summary = state.get("summary", "")
- if summary:
- # summary_message可以认为是一个prompt,是否存在summary_message,直接影响总结
- summary_message = (
- f"This is summary of the conversation to date: {summary}\n\n"
- "Extend the summary by taking into account the new messages above:"
- )
- else:
- summary_message = "Create a summary of the conversation above:"
- messages = state["messages"] + [HumanMessage(content=summary_message)]
- # response即为总结内容
- response = model.invoke(messages)
- # 总结后,仅保留两条消息记录
- delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
- # delete_messages是最终保留的内容
- return {"summary": response.content, "messages": delete_messages}
- # 定义一个新的graph
- workflow = StateGraph(State)
- # 定义节点,会话和总结两个node
- workflow.add_node("conversation", call_model)
- workflow.add_node(summarize_conversation)
- # 定义边,起始节点为conversation,即call_model函数
- workflow.add_edge(START, "conversation")
- # 添加条件边
- workflow.add_conditional_edges(
- # 使用conversation,作为起始的node,意味着所有的边都在conversion节点后被访问
- "conversation",
- # 通过该function,来确认到底执行哪个节点
- should_continue,
- )
- # summarize_conversation到END是单向边
- workflow.add_edge("summarize_conversation", END)
- # 编译,添加参数checkpointer,指定memory
- app = workflow.compile(checkpointer=memory)
- # 获取信息
- def print_update(update):
- for k, v in update.items():
- for m in v["messages"]:
- m.pretty_print()
- if "summary" in v:
- print(v["summary"])
- # 定义配置文件,用于后期查询历史
- config = {"configurable": {"thread_id": "4"}}
复制代码 调用graph实现对话,当大于6条则总结
- # 下面三个是问题以及输出
- input_message = HumanMessage(content="hi! I'm bob")
- input_message.pretty_print()
- for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):
- print_update(event)
- input_message = HumanMessage(content="what's my name?")
- input_message.pretty_print()
- for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):
- print_update(event)
- input_message = HumanMessage(content="i like the celtics!")
- input_message.pretty_print()
- for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):
- print_update(event)
复制代码 输出:
- [root@Laptop-latitude-7300 chatbot_fastapi]# python test.py
- ================================ Human Message =================================
- hi! I'm bob
- ================================== Ai Message ==================================
- Hello Bob! How can I assist you today?
- ================================ Human Message =================================
- what's my name?
- ================================== Ai Message ==================================
- Your name is Bob.
- ================================ Human Message =================================
- i like the celtics!
- ================================== Ai Message ==================================
- That's great! The Boston Celtics are indeed a fantastic basketball team. They have a rich history and a large fanbase. Are you excited about the upcoming games?
- ================================ Remove Message ================================
- ================================ Remove Message ================================
- ================================ Remove Message ================================
- ================================ Remove Message ================================
- In this conversation, the user introduces himself as Bob. He also expresses his liking for the Boston Celtics, a professional basketball team. The assistant acknowledges his interest and asks if he is excited about the upcoming games.
复制代码 其中pretty_print、print_update是graph格式化的输出,示例如下:
input_message.pretty_print()实现了如下输出
- ================================ Human Message =================================
- i like the celtics!
复制代码 print_update(event)输出如下:
- ================================== Ai Message ==================================
- Your name is Bob.
复制代码 或
- ================================ Remove Message ================================
复制代码 2、FastAPI、langgraph实现chatbot(非流式)
基于上述代码,实现非流式输出的问答系统
安装相应的模块
- pip install fastapi python-dotenv langchain_core langchain langgraph uvicorn langchain_openai
复制代码 通过fastapi实现接口
- # main.py
- from dotenv import load_dotenv
- load_dotenv(".env")
- from fastapi import FastAPI, Request, WebSocket
- from pydantic import BaseModel
- from json import loads
- from chat import generate_response, get_chat_history
- app = FastAPI()
- # 定义请求和响应模型
- class ChatRequest(BaseModel):
- user_id: str
- input: str
- class ChatResponse(BaseModel):
- response: str
- class ChatHistoryResponse(BaseModel):
- history: list
- # 问答接口
- @app.get("/chat")
- async def websocket_ping(query: str, user_id: str):
- response = await generate_response(query, user_id)
- return {"response": response}
- # 获取历史记录接口
- @app.get("/chat_history/{user_id}")
- async def get_history_endpoint(user_id: str):
- # 获取聊天历史记录
- chat_history = get_chat_history(user_id)
-
- return {"history": chat_history}
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000)
复制代码 通过调用generate_response函数,返反响应
天生响应以及获取汗青记录的函数
- from langchain_core.messages import HumanMessage, AIMessage
- from langgraph_test import graphapp
- from langchain_core.output_parsers import StrOutputParser
- from langgraph.checkpoint.serde import jsonplus
- from ignore_socket import ignore_msgpack_default
- jsonplus._msgpack_default = ignore_msgpack_default
- # 转换函数, 将数据中的AIMessage,HumanMessage转换为字符串
- def message_to_dict(message):
- if isinstance(message, HumanMessage):
- return {
- 'content': message.content,
- 'type': 'human',
- 'additional_kwargs': message.additional_kwargs,
- 'response_metadata': message.response_metadata,
- 'id': message.id
- }
- elif isinstance(message, AIMessage):
- return {
- 'content': message.content,
- 'type': 'ai',
- 'additional_kwargs': message.additional_kwargs,
- 'response_metadata': message.response_metadata,
- 'id': message.id,
- 'usage_metadata': message.usage_metadata
- }
- else:
- raise ValueError(f"Unsupported message type: {type(message)}")
- # 定义生成对话的函数
- async def generate_response(input_text: str, user_id: str):
- # 定义用户配置文件
- config = {"configurable": {"thread_id": user_id}}
- # 实例化用户输入的消息
- input_message = HumanMessage(content=input_text)
- # 处理消息
- event = await graphapp.ainvoke({"messages":input_message}, config=config, stream_mode="values")
- # 返回信息内容
- return event.get("messages")[-1].content
- # 获取聊天历史记录
- def get_chat_history(user_id: str):
- # 通过config获取相关用户聊天信息
- values = graphapp.get_state({"configurable": {"thread_id": user_id}}).values
- try:
- # 仅获取messages中的content、type数据
- values['messages'] = [message_to_dict(message) for message in values['messages']]
- values["messages"] = [{k: v for k, v in message.items() if k in ["content", "type"]} for message in values["messages"]]
- except:
- values["messages"] = []
- finally:
- return values
复制代码 使用langgraph实现对话功能
- from langchain.prompts import PromptTemplate, ChatPromptTemplate
- from langchain_core.output_parsers import StrOutputParser
- from typing import Literal
- from langchain_core.messages import HumanMessage
- from langchain_openai import AzureChatOpenAI
- from langchain_core.messages import SystemMessage, RemoveMessage
- from langgraph.checkpoint.memory import MemorySaver
- from langgraph.graph import StateGraph, START, END
- from langgraph.graph.message import add_messages
- from typing_extensions import TypedDict, Annotated
- from langchain_core.messages import AnyMessage
-
- memory = MemorySaver()
- # 我们将使用此模型进行对话和总结
- llm = AzureChatOpenAI(deployment_name="aicontent-validation")
- # 定义 Prompt 模板(仅用于输出字符串)
- prompt_template = PromptTemplate(
- input_variables=["query"],
- template="You are a helpful assistant. User: {query}. Assistant:"
- )
- # 创建 LangChain 的处理链
- chain = prompt_template | llm | StrOutputParser()
- # 我们将添加一个 'summary' 属性
- class State(TypedDict):
- messages: Annotated[list[AnyMessage], add_messages]
- summary: str
- # 定义调用模型的逻辑
- async def call_model(state: State):
- # 如果存在摘要,我们会将其作为系统消息添加
- summary = state.get("summary", "")
- if summary:
- system_message = f"Summary of conversation earlier: {summary}"
- messages = [SystemMessage(content=system_message)] + state["messages"]
- else:
- messages = state["messages"]
- # 两种调用方式,参数支持{"input":messages}、messages
- # response = chain.invoke({"input":messages})
- response =await chain.ainvoke(messages)
- # 我们返回一个列表,因为它将被添加到现有列表中
- return {"messages": [response]}
- # 现在,我们定义用于确定是结束还是总结对话的逻辑
- async def should_continue(state: State) -> Literal["summarize_conversation", END]:
- """Return the next node to execute."""
- messages = state["messages"]
- # 如果消息超过 6 条,则我们汇总对话
- if len(messages) > 1:
- return "summarize_conversation"
- # 否则我们就可以结束
- return END
- async def summarize_conversation(state: State):
- # 首先,获取总结内容
- summary = state.get("summary", "")
- if summary:
- # 如果 summary 已经存在,我们使用不同的系统提示符总结它
- summary_message = (
- f"This is summary of the conversation to date: {summary}\n\n"
- "Extend the summary by taking into account the new messages above:"
- )
- else:
- summary_message = "Create a summary of the conversation above:"
- # 历史信息和总结信息
- messages = state["messages"] + [HumanMessage(content=summary_message)]
- # 获取总结信息
- response = await chain.ainvoke(messages)
- # 我们现在需要删除我们不想再显示的消息,将删除除最后两条消息之外的所有消息
- delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
- return {"summary": response, "messages": delete_messages}
- # 定义新的graph
- workflow = StateGraph(State)
- # 定义 conversation 节点和 summarize 节点
- workflow.add_node("conversation", call_model)
- workflow.add_node(summarize_conversation)
- # 将入口点设置为对话
- workflow.add_edge(START, "conversation")
- # 现在,我们添加一条条件边
- workflow.add_conditional_edges(
- # 首先,我们定义起始节点。我们使用 'conversation'。
- # 这意味着这些是调用 'conversation' 节点后采用的边。
- "conversation",
- # 接下来,我们传入一个函数,该函数将确定接下来调用哪个节点。
- should_continue,
- )
- # 我们现在添加一条从 'summarize_conversation' 到 END 的法线边。
- # 这意味着在调用 'summarize_conversation' 之后,我们结束。
- workflow.add_edge("summarize_conversation", END)
- # 最后,我们编译它!
- graphapp = workflow.compile(checkpointer=memory)
复制代码 测试功能
启动服务:uvicorn main:app --reload --host 0.0.0.0 --port 8000
浏览器访问:http://172.26.20.199:8000/docs

打开chat接口的折叠,点击try it out,在输入框输入信息,点击下面的执行,稍微等待,如下:

再次执行输入:what is my name,用来测试是否具备影象功能

测试总结功能:使用上述定义的123

在summary中存在总结(择要),成功
3、FastAPI、langgraph实现chatbot(流式)
基于上述代码,实现基于FastAPI、langchain、langgraph的流式传输,
FastAPI官方文档:WebSocket - FastAPI 中文
主要鉴戒官方文档,在此基础上实现了流式的传输
安装相应的模块
- pip install fastapi python-dotenv langchain_core langchain langgraph uvicorn websockets langchain_openai
复制代码 1、计划html文件,实现websocket请求,基于FastAPI文档魔改
- // websocket_test.html
- <!DOCTYPE html>
- <html lang="en">
-
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <title>pc</title>
-
- <body>
- <div id="rowAiChatbot">
- </div>
-
- <script>
- var ws = new WebSocket('ws://localhost:8000/chat');
- ws.onopen = function () {
- console.log('ws onopen');
- ws.send('{"query":"my name is jack", "user_id":"123"}');
- };
- ws.onmessage = function (e) {
- console.log('ws onmessage');
- console.log('from server: ' , e.data);
- };
- </script>
- </body>
- </html>
复制代码 2、计划FastAPI主程序,实现接收参数并执行
- # main.py
- from dotenv import load_dotenv
- load_dotenv(".env")
- from fastapi import FastAPI, WebSocket
- from json import loads
- from chat import generate_response, get_chat_history
- app = FastAPI()
- @app.websocket("/chat")
- async def websocket_ping(websocket: WebSocket):
- await websocket.accept()
-
- try:
- while True:
- # 处理消息等逻辑
- data = await websocket.receive_text()
- jsondata = loads(data)
- # 生成响应
- await generate_response(jsondata.get("query"), jsondata.get("user_id"), websocket)
复制代码 3、实现调用graph,实现流式输出
鉴戒langgraph文档:LangGraph:Stream events from the final node
- # chat.py
- from langchain_core.messages import HumanMessage, AIMessage
- from langgraph_test import graphapp
- # 定义生成对话的函数
- async def generate_response(input_text: str, user_id: str, websocket: object):
- # 定义用户配置文件
- config = {"configurable": {"thread_id": user_id}}
- # 实例化用户输入的消息
- input_message = HumanMessage(content=input_text)
- # 异步流处理消息
- async for msg, metadata in graphapp.astream({"messages":input_message}, config, stream_mode="messages"):
- # 仅消息存在且节点为conversation的才返回给前端
- if msg.content and metadata.get("langgraph_node") == 'conversation' and metadata.get("ls_provider"):
-
- # print(msg.content, end=" ")
- # 使用websocket实现流式传输到前端
- await websocket.send_text(msg.content)
复制代码 4、langgraph实现总结、记录
- from langchain.prompts import PromptTemplate
- from langchain_core.output_parsers import StrOutputParser
- from typing import Literal
- from langchain_core.messages import HumanMessage, AIMessage
- from langchain_openai import AzureChatOpenAI
- from langchain_core.messages import SystemMessage, RemoveMessage
- from langgraph.checkpoint.memory import MemorySaver
- from langgraph.graph import MessagesState, StateGraph, START, END
- memory = MemorySaver()
- # 我们将使用此模型进行对话和总结
- llm = AzureChatOpenAI(deployment_name="aicontent-validation")
- # 定义 Prompt 模板(仅用于输出字符串)
- prompt_template = PromptTemplate(
- input_variables=["query"],
- template="You are a helpful assistant. User: {query}. Assistant:"
- )
- # 创建 LangChain 的处理链
- chain = prompt_template | llm | StrOutputParser()
- # 我们将添加一个 'summary' 属性(除了MessagesState已经有 'messages' 键之外)
- class State(MessagesState):
- summary: str
- # 定义调用模型的逻辑
- async def call_model(state: State):
- # 如果存在摘要,我们会将其作为系统消息添加
- summary = state.get("summary", "")
-
- if summary:
- system_message = f"Summary of conversation earlier: {summary}"
- messages = [SystemMessage(content=system_message)] + state["messages"]
- else:
- messages = state["messages"]
- # 两种调用方式,参数支持{"input":messages}、messages
- # response = chain.invoke({"input":messages})
- response =await chain.ainvoke(messages)
- # print(response)
- # 用于验证输出一致,此处执行与generate_response位置输出时间基本一致
- # async for msg in chain.astream(messages):
- # print(msg)
- # 我们返回一个列表,因为它将被添加到现有列表中
- return {"messages": [AIMessage(content=response)]}
- # 现在,我们定义用于确定是结束还是总结对话的逻辑
- async def should_continue(state: State) -> Literal["summarize_conversation", END]:
- """Return the next node to execute."""
- messages = state["messages"]
- # 如果消息超过 6 条,则我们汇总对话
- if len(messages) > 3:
- return "summarize_conversation"
- # 否则我们就可以结束
- return END
- async def summarize_conversation(state: State):
- # 首先,获取总结内容
- summary = state.get("summary", "")
- if summary:
- # 如果 summary 已经存在,我们使用不同的系统提示符总结它
- summary_message = (
- f"This is summary of the conversation to date: {summary}\n\n"
- "Extend the summary by taking into account the new messages above:"
- )
- else:
- summary_message = "Create a summary of the conversation above:"
- # 历史信息和总结信息
- messages = state["messages"] + [HumanMessage(content=summary_message)]
- # 获取总结信息
- response = await chain.ainvoke(messages)
- # 我们现在需要删除我们不想再显示的消息,将删除除最后两条消息之外的所有消息
- delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
- return {"summary": response, "messages": delete_messages}
- # 定义新的graph
- workflow = StateGraph(State)
- # 定义 conversation 节点和 summarize 节点
- workflow.add_node("conversation", call_model)
- workflow.add_node(summarize_conversation)
- # 将入口点设置为对话
- workflow.add_edge(START, "conversation")
- # 现在,我们添加一条条件边
- workflow.add_conditional_edges(
- # 首先,我们定义起始节点。我们使用 'conversation'。
- # 这意味着这些是调用 'conversation' 节点后采用的边。
- "conversation",
- # 接下来,我们传入一个函数,该函数将确定接下来调用哪个节点。
- should_continue,
- )
- # 我们现在添加一条从 'summarize_conversation' 到 END 的法线边。
- # 这意味着在调用 'summarize_conversation' 之后,我们结束。
- workflow.add_edge("summarize_conversation", END)
- # 最后,我们编译它!
- graphapp = workflow.compile(checkpointer=memory)
复制代码 5、测试流式传输
使用浏览器打开html文件,效果如下:
6、获取汗青记录
请求接口
- @app.get("/chat_history/{user_id}")
- async def get_history_endpoint(user_id: str):
- # 获取聊天历史记录
- chat_history = get_chat_history(user_id)
-
- return {"history": chat_history}
复制代码 获取逻辑方法,定义一个函数进行数据转换
- # chat.py
- # 转换函数, 将数据中的AIMessage,HumanMessage转换为字符串
- def message_to_dict(message):
- if isinstance(message, HumanMessage):
- return {
- 'content': message.content,
- 'type': 'human',
- 'additional_kwargs': message.additional_kwargs,
- 'response_metadata': message.response_metadata,
- 'id': message.id
- }
- elif isinstance(message, AIMessage):
- return {
- 'content': message.content,
- 'type': 'ai',
- 'additional_kwargs': message.additional_kwargs,
- 'response_metadata': message.response_metadata,
- 'id': message.id,
- 'usage_metadata': message.usage_metadata
- }
- else:
- raise ValueError(f"Unsupported message type: {type(message)}")
- # 获取聊天历史记录
- def get_chat_history(user_id: str):
- # 通过config获取相关用户聊天信息
- values = graphapp.get_state({"configurable": {"thread_id": user_id}}).values
- try:
- # 仅获取messages中的content、type数据
- values['messages'] = [message_to_dict(message) for message in values['messages']]
- values["messages"] = [{k: v for k, v in message.items() if k in ["content", "type"]} for message in values["messages"]]
- except:
- values["messages"] = []
- finally:
- return values
复制代码 浏览器测试
网址:http://172.25.23.143:8000/docs#

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |