渣渣兔 发表于 2025-3-8 09:51:03

【deepseek+LangChain】实现postgre数据库分析问答系统

使命介绍

本地有一个postgre数据记载中国近20年人口明细数据和出生率、死亡率明细。
要求使用LangChain0.3和deepseek api毗连本地postgresql数据库,并执行数据分析,整个数据分析系统具备记忆功能,输入exit退出。
具体代码

实现ai大模子本地postgresql数据库查询分析的python代码实现如下:
import re
from langchain_openai import ChatOpenAI
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain.chains import create_sql_query_chain
from langchain_core.prompts import PromptTemplate# 新增关键导入

class DeepSeekDBAnalyzer:
    def __init__(self):
      # 初始化DeepSeek模型
      self.llm = ChatOpenAI(
            base_url="https://api.deepseek.com",
            api_key="your_deepseek_api_key",
            model="deepseek-chat",
            temperature=0
      )
      
      # 连接PostgreSQL数据库
      self.db = SQLDatabase.from_uri(
            "postgresql+psycopg2://user:password@192.168.124.13:5432/my_db",
            include_tables=['total_population_year', 'birth_rate_year'],
      )
      
      # 创建自定义提示模板(关键修复点)
      custom_prompt = PromptTemplate(
            input_variables=["table_info","input","top_k"],# 确保输入变量名与实际使用一致
            template="""
            你是一个专业的数据库分析专家,请根据数据库结构进行SQL查询。
            数据库结构:
            {table_info}
            
            请根据用户问题生成精确的SQL查询并解释结果。
            请不要包含任何解释,只输出SQL代码。
            请使用标准PostgreSQL语法进行查询。
            生成的sql语句不限制limit值为{top_k}。
            
            用户问题:{input}
            """
      )
      
      # 创建SQL查询链
      self.query_chain = create_sql_query_chain(
            llm=self.llm,
            db=self.db,
            prompt=custom_prompt# 使用正确格式的PromptTemplate
      )
      
      # 初始化对话历史
      self.messages = [
            SystemMessage(content="""
                数据库结构:
                1. total_population_year 表字段:
                   - Year (int4)
                   - Total_Population (int4)
                   - Male_Population (int4)
                   - Female_Population (int4)
                   - Urban_Population (int4)
                   - Rural_Population (int4)
                2. birth_rate_year 表字段:
                   - Year (int4)
                   - Birth_Rate (int4)
                   - Death_Rate (int4)
                   - Natural_Growth_Rate (int4)
                请根据用户问题生成精确的SQL查询并解释结果
            """)
      ]

    def _extract_sql(self, query_response):
      """提取纯SQL语句"""
      sql_match = re.search(r"```sql\n(.*?)\n```", query_response, re.DOTALL)
      if sql_match:
            return sql_match.group(1).strip()
      return query_response.split(";") + ";"# 兜底处理

    def analyze_data(self, question):
      try:
            # 生成原始查询
            raw_query = self.query_chain.invoke({"question": question})
            
            # 清理SQL语句
            clean_sql = self._extract_sql(raw_query)
            print(f" Generated SQL: \n{clean_sql}")
            
            # 安全校验
            if any(cmd in clean_sql.upper() for cmd in ["DROP", "DELETE", "UPDATE", "INSERT"]):
                raise ValueError("检测到危险SQL操作")
            
            # 执行查询
            result = self.db.run(clean_sql)
            
            # 构建分析提示
            analysis_prompt = f"""
            SQL执行结果:{result}
            
            请用中文完成以下分析:
            1. 数据趋势描述(数值变化)
            2. 关键年份变化分析
            3. 可能的影响因素推测
            """
            
            # 维护对话历史
            self.messages.extend([
                HumanMessage(content=question),
                AIMessage(content=f"执行查询:{clean_sql}"),
                HumanMessage(content=analysis_prompt)
            ])
            
            response = self.llm.invoke(self.messages)
            self.messages.append(AIMessage(content=response.content))
            
            return response.content
            
      except Exception as e:
            error_msg = f"分析失败:{str(e)}"
            self.messages.append(AIMessage(content=error_msg))
            return error_msg

def main():
    analyzer = DeepSeekDBAnalyzer()
    print("人口数据分析系统已启动(输入 exit 退出)")
   
    while True:
      try:
            user_input = input("\n用户提问:").strip()
            
            if user_input.lower() == "exit":
                print("\n系统退出")
                break

            print("\n用户提问:%s\n" % user_input)
               
            # 获取分析结果
            response = analyzer.analyze_data(user_input)
            print("\n分析结果:")
            print(response)
            
      except KeyboardInterrupt:
            print("\n操作已取消")
            break

if __name__ == "__main__":
    main()
执行结果

https://i-blog.csdnimg.cn/direct/a91c397aa4cd47b59e5b33964ce5d783.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【deepseek+LangChain】实现postgre数据库分析问答系统