【deepseek+LangChain】实现postgre数据库分析问答系统

打印 上一主题 下一主题

主题 947|帖子 947|积分 2841

使命介绍

本地有一个postgre数据记载中国近20年人口明细数据和出生率、死亡率明细。
要求使用LangChain0.3和deepseek api毗连本地postgresql数据库,并执行数据分析,整个数据分析系统具备记忆功能,输入exit退出。
具体代码

实现ai大模子本地postgresql数据库查询分析的python代码实现如下:
  1. import re
  2. from langchain_openai import ChatOpenAI
  3. from langchain_community.utilities import SQLDatabase
  4. from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
  5. from langchain.chains import create_sql_query_chain
  6. from langchain_core.prompts import PromptTemplate  # 新增关键导入
  7. class DeepSeekDBAnalyzer:
  8.     def __init__(self):
  9.         # 初始化DeepSeek模型
  10.         self.llm = ChatOpenAI(
  11.             base_url="https://api.deepseek.com",
  12.             api_key="your_deepseek_api_key",
  13.             model="deepseek-chat",
  14.             temperature=0
  15.         )
  16.         
  17.         # 连接PostgreSQL数据库
  18.         self.db = SQLDatabase.from_uri(
  19.             "postgresql+psycopg2://user:password@192.168.124.13:5432/my_db",
  20.             include_tables=['total_population_year', 'birth_rate_year'],
  21.         )
  22.         
  23.         # 创建自定义提示模板(关键修复点)
  24.         custom_prompt = PromptTemplate(
  25.             input_variables=["table_info","input","top_k"],  # 确保输入变量名与实际使用一致
  26.             template="""
  27.             你是一个专业的数据库分析专家,请根据数据库结构进行SQL查询。
  28.             数据库结构:
  29.             {table_info}
  30.             
  31.             请根据用户问题生成精确的SQL查询并解释结果。
  32.             请不要包含任何解释,只输出SQL代码。
  33.             请使用标准PostgreSQL语法进行查询。
  34.             生成的sql语句不限制limit值为{top_k}。
  35.             
  36.             用户问题:{input}
  37.             """
  38.         )
  39.         
  40.         # 创建SQL查询链
  41.         self.query_chain = create_sql_query_chain(
  42.             llm=self.llm,
  43.             db=self.db,
  44.             prompt=custom_prompt  # 使用正确格式的PromptTemplate
  45.         )
  46.         
  47.         # 初始化对话历史
  48.         self.messages = [
  49.             SystemMessage(content="""
  50.                 数据库结构:
  51.                 1. total_population_year 表字段:
  52.                    - Year (int4)
  53.                    - Total_Population (int4)
  54.                    - Male_Population (int4)
  55.                    - Female_Population (int4)
  56.                    - Urban_Population (int4)
  57.                    - Rural_Population (int4)
  58.                 2. birth_rate_year 表字段:
  59.                    - Year (int4)
  60.                    - Birth_Rate (int4)
  61.                    - Death_Rate (int4)
  62.                    - Natural_Growth_Rate (int4)
  63.                 请根据用户问题生成精确的SQL查询并解释结果
  64.             """)
  65.         ]
  66.     def _extract_sql(self, query_response):
  67.         """提取纯SQL语句"""
  68.         sql_match = re.search(r"```sql\n(.*?)\n```", query_response, re.DOTALL)
  69.         if sql_match:
  70.             return sql_match.group(1).strip()
  71.         return query_response.split(";")[0] + ";"  # 兜底处理
  72.     def analyze_data(self, question):
  73.         try:
  74.             # 生成原始查询
  75.             raw_query = self.query_chain.invoke({"question": question})
  76.             
  77.             # 清理SQL语句
  78.             clean_sql = self._extract_sql(raw_query)
  79.             print(f"[DEBUG] Generated SQL: \n{clean_sql}")
  80.             
  81.             # 安全校验
  82.             if any(cmd in clean_sql.upper() for cmd in ["DROP", "DELETE", "UPDATE", "INSERT"]):
  83.                 raise ValueError("检测到危险SQL操作")
  84.             
  85.             # 执行查询
  86.             result = self.db.run(clean_sql)
  87.             
  88.             # 构建分析提示
  89.             analysis_prompt = f"""
  90.             SQL执行结果:{result}
  91.             
  92.             请用中文完成以下分析:
  93.             1. 数据趋势描述(数值变化)
  94.             2. 关键年份变化分析
  95.             3. 可能的影响因素推测
  96.             """
  97.             
  98.             # 维护对话历史
  99.             self.messages.extend([
  100.                 HumanMessage(content=question),
  101.                 AIMessage(content=f"执行查询:{clean_sql}"),
  102.                 HumanMessage(content=analysis_prompt)
  103.             ])
  104.             
  105.             response = self.llm.invoke(self.messages)
  106.             self.messages.append(AIMessage(content=response.content))
  107.             
  108.             return response.content
  109.             
  110.         except Exception as e:
  111.             error_msg = f"分析失败:{str(e)}"
  112.             self.messages.append(AIMessage(content=error_msg))
  113.             return error_msg
  114. def main():
  115.     analyzer = DeepSeekDBAnalyzer()
  116.     print("人口数据分析系统已启动(输入 exit 退出)")
  117.    
  118.     while True:
  119.         try:
  120.             user_input = input("\n用户提问:").strip()
  121.             
  122.             if user_input.lower() == "exit":
  123.                 print("\n系统退出")
  124.                 break
  125.             print("\n用户提问:%s\n" % user_input)
  126.                
  127.             # 获取分析结果
  128.             response = analyzer.analyze_data(user_input)
  129.             print("\n分析结果:")
  130.             print(response)
  131.             
  132.         except KeyboardInterrupt:
  133.             print("\n操作已取消")
  134.             break
  135. if __name__ == "__main__":
  136.     main()
复制代码
执行结果



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

渣渣兔

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表