使命介绍
本地有一个postgre数据记载中国近20年人口明细数据和出生率、死亡率明细。
要求使用LangChain0.3和deepseek api毗连本地postgresql数据库,并执行数据分析,整个数据分析系统具备记忆功能,输入exit退出。
具体代码
实现ai大模子本地postgresql数据库查询分析的python代码实现如下:
- import re
- from langchain_openai import ChatOpenAI
- from langchain_community.utilities import SQLDatabase
- from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
- from langchain.chains import create_sql_query_chain
- from langchain_core.prompts import PromptTemplate # 新增关键导入
- class DeepSeekDBAnalyzer:
- def __init__(self):
- # 初始化DeepSeek模型
- self.llm = ChatOpenAI(
- base_url="https://api.deepseek.com",
- api_key="your_deepseek_api_key",
- model="deepseek-chat",
- temperature=0
- )
-
- # 连接PostgreSQL数据库
- self.db = SQLDatabase.from_uri(
- "postgresql+psycopg2://user:password@192.168.124.13:5432/my_db",
- include_tables=['total_population_year', 'birth_rate_year'],
- )
-
- # 创建自定义提示模板(关键修复点)
- custom_prompt = PromptTemplate(
- input_variables=["table_info","input","top_k"], # 确保输入变量名与实际使用一致
- template="""
- 你是一个专业的数据库分析专家,请根据数据库结构进行SQL查询。
- 数据库结构:
- {table_info}
-
- 请根据用户问题生成精确的SQL查询并解释结果。
- 请不要包含任何解释,只输出SQL代码。
- 请使用标准PostgreSQL语法进行查询。
- 生成的sql语句不限制limit值为{top_k}。
-
- 用户问题:{input}
- """
- )
-
- # 创建SQL查询链
- self.query_chain = create_sql_query_chain(
- llm=self.llm,
- db=self.db,
- prompt=custom_prompt # 使用正确格式的PromptTemplate
- )
-
- # 初始化对话历史
- self.messages = [
- SystemMessage(content="""
- 数据库结构:
- 1. total_population_year 表字段:
- - Year (int4)
- - Total_Population (int4)
- - Male_Population (int4)
- - Female_Population (int4)
- - Urban_Population (int4)
- - Rural_Population (int4)
- 2. birth_rate_year 表字段:
- - Year (int4)
- - Birth_Rate (int4)
- - Death_Rate (int4)
- - Natural_Growth_Rate (int4)
- 请根据用户问题生成精确的SQL查询并解释结果
- """)
- ]
- def _extract_sql(self, query_response):
- """提取纯SQL语句"""
- sql_match = re.search(r"```sql\n(.*?)\n```", query_response, re.DOTALL)
- if sql_match:
- return sql_match.group(1).strip()
- return query_response.split(";")[0] + ";" # 兜底处理
- def analyze_data(self, question):
- try:
- # 生成原始查询
- raw_query = self.query_chain.invoke({"question": question})
-
- # 清理SQL语句
- clean_sql = self._extract_sql(raw_query)
- print(f"[DEBUG] Generated SQL: \n{clean_sql}")
-
- # 安全校验
- if any(cmd in clean_sql.upper() for cmd in ["DROP", "DELETE", "UPDATE", "INSERT"]):
- raise ValueError("检测到危险SQL操作")
-
- # 执行查询
- result = self.db.run(clean_sql)
-
- # 构建分析提示
- analysis_prompt = f"""
- SQL执行结果:{result}
-
- 请用中文完成以下分析:
- 1. 数据趋势描述(数值变化)
- 2. 关键年份变化分析
- 3. 可能的影响因素推测
- """
-
- # 维护对话历史
- self.messages.extend([
- HumanMessage(content=question),
- AIMessage(content=f"执行查询:{clean_sql}"),
- HumanMessage(content=analysis_prompt)
- ])
-
- response = self.llm.invoke(self.messages)
- self.messages.append(AIMessage(content=response.content))
-
- return response.content
-
- except Exception as e:
- error_msg = f"分析失败:{str(e)}"
- self.messages.append(AIMessage(content=error_msg))
- return error_msg
- def main():
- analyzer = DeepSeekDBAnalyzer()
- print("人口数据分析系统已启动(输入 exit 退出)")
-
- while True:
- try:
- user_input = input("\n用户提问:").strip()
-
- if user_input.lower() == "exit":
- print("\n系统退出")
- break
- print("\n用户提问:%s\n" % user_input)
-
- # 获取分析结果
- response = analyzer.analyze_data(user_input)
- print("\n分析结果:")
- print(response)
-
- except KeyboardInterrupt:
- print("\n操作已取消")
- break
- if __name__ == "__main__":
- main()
复制代码 执行结果
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |