IT评测·应用市场-qidao123.com
标题:
【deepseek+LangChain】实现postgre数据库分析问答系统
[打印本页]
作者:
渣渣兔
时间:
2025-3-8 09:51
标题:
【deepseek+LangChain】实现postgre数据库分析问答系统
使命介绍
本地有一个postgre数据记载中国近20年人口明细数据和出生率、死亡率明细。
要求使用LangChain0.3和deepseek api毗连本地postgresql数据库,并执行数据分析,整个数据分析系统具备记忆功能,输入exit退出。
具体代码
实现ai大模子本地postgresql数据库查询分析的python代码实现如下:
import re
from langchain_openai import ChatOpenAI
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain.chains import create_sql_query_chain
from langchain_core.prompts import PromptTemplate # 新增关键导入
class DeepSeekDBAnalyzer:
def __init__(self):
# 初始化DeepSeek模型
self.llm = ChatOpenAI(
base_url="https://api.deepseek.com",
api_key="your_deepseek_api_key",
model="deepseek-chat",
temperature=0
)
# 连接PostgreSQL数据库
self.db = SQLDatabase.from_uri(
"postgresql+psycopg2://user:password@192.168.124.13:5432/my_db",
include_tables=['total_population_year', 'birth_rate_year'],
)
# 创建自定义提示模板(关键修复点)
custom_prompt = PromptTemplate(
input_variables=["table_info","input","top_k"], # 确保输入变量名与实际使用一致
template="""
你是一个专业的数据库分析专家,请根据数据库结构进行SQL查询。
数据库结构:
{table_info}
请根据用户问题生成精确的SQL查询并解释结果。
请不要包含任何解释,只输出SQL代码。
请使用标准PostgreSQL语法进行查询。
生成的sql语句不限制limit值为{top_k}。
用户问题:{input}
"""
)
# 创建SQL查询链
self.query_chain = create_sql_query_chain(
llm=self.llm,
db=self.db,
prompt=custom_prompt # 使用正确格式的PromptTemplate
)
# 初始化对话历史
self.messages = [
SystemMessage(content="""
数据库结构:
1. total_population_year 表字段:
- Year (int4)
- Total_Population (int4)
- Male_Population (int4)
- Female_Population (int4)
- Urban_Population (int4)
- Rural_Population (int4)
2. birth_rate_year 表字段:
- Year (int4)
- Birth_Rate (int4)
- Death_Rate (int4)
- Natural_Growth_Rate (int4)
请根据用户问题生成精确的SQL查询并解释结果
""")
]
def _extract_sql(self, query_response):
"""提取纯SQL语句"""
sql_match = re.search(r"```sql\n(.*?)\n```", query_response, re.DOTALL)
if sql_match:
return sql_match.group(1).strip()
return query_response.split(";")[0] + ";" # 兜底处理
def analyze_data(self, question):
try:
# 生成原始查询
raw_query = self.query_chain.invoke({"question": question})
# 清理SQL语句
clean_sql = self._extract_sql(raw_query)
print(f"[DEBUG] Generated SQL: \n{clean_sql}")
# 安全校验
if any(cmd in clean_sql.upper() for cmd in ["DROP", "DELETE", "UPDATE", "INSERT"]):
raise ValueError("检测到危险SQL操作")
# 执行查询
result = self.db.run(clean_sql)
# 构建分析提示
analysis_prompt = f"""
SQL执行结果:{result}
请用中文完成以下分析:
1. 数据趋势描述(数值变化)
2. 关键年份变化分析
3. 可能的影响因素推测
"""
# 维护对话历史
self.messages.extend([
HumanMessage(content=question),
AIMessage(content=f"执行查询:{clean_sql}"),
HumanMessage(content=analysis_prompt)
])
response = self.llm.invoke(self.messages)
self.messages.append(AIMessage(content=response.content))
return response.content
except Exception as e:
error_msg = f"分析失败:{str(e)}"
self.messages.append(AIMessage(content=error_msg))
return error_msg
def main():
analyzer = DeepSeekDBAnalyzer()
print("人口数据分析系统已启动(输入 exit 退出)")
while True:
try:
user_input = input("\n用户提问:").strip()
if user_input.lower() == "exit":
print("\n系统退出")
break
print("\n用户提问:%s\n" % user_input)
# 获取分析结果
response = analyzer.analyze_data(user_input)
print("\n分析结果:")
print(response)
except KeyboardInterrupt:
print("\n操作已取消")
break
if __name__ == "__main__":
main()
复制代码
执行结果
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/)
Powered by Discuz! X3.4