十念 发表于 2025-3-26 08:28:18

使用 LangChain 构建基于自然语言天生 SQL 并查询 MySQL 数据库的智能代理

在当今数据驱动的期间,能够将自然语言转化为数据库查询语句(如 SQL)并获取相应数据,对于非技术用户来说具有极大的代价。借助 LangChain 的强大功能,我们可以开辟一个智能代理(Agent),该代理能够明确自然语言查询,天生相应的 SQL 语句,并与 MySQL 数据库交互,提供所需的分析信息。此外,我们还可以基于 LangServer 为这一功能天生 API 接口,使其更易于集成和使用。
本文将带您一步步实现这一目的,包括环境设置、工具集成、Agent 构建及 API 接口的开辟。
实现代码:https://gitcode.com/sequoia00/LangChain-Agent_Mysql/overview
目录


[*]项目概述
[*]环境设置
[*]数据库连接
[*]定义 SQL 天生工具
[*]构建 LangChain Agent
[*]开辟 API 接口
[*]安全性与最佳实践
[*]总结
项目概述

我们将创建一个系统,该系统包括以下部分:

[*]自然语言处置惩罚(NLP):解析用户的自然语言查询并天生相应的 SQL 语句。
[*]数据库交互:执行天生的 SQL 语句,并从 MySQL 数据库中获取数据。
[*]API 接口:提供一个可通过 HTTP 请求调用的 API,使外部应用步伐能够使用该功能。
环境设置

首先,确保您的开辟环境中已安装以下工具和库:


[*]Python 3.7+
[*]MySQL 数据库
[*]OpenAI API Key(如果使用 OpenAI 的模型)
[*]必要的 Python 库:langchain, openai, mysql-connector-python, fastapi, uvicorn
安装必要的库

使用 pip 安装所需的 Python 库:
pip install langchain openai mysql-connector-python fastapi uvicorn
数据库连接

确保您有一个运行中的 MySQL 数据库,并具备相应的访问权限。我们将使用 mysql-connector-python 库与数据库进行交互。
创建数据库连接

以下是一个用于连接 MySQL 数据库的示例代码:
# db.py
import mysql.connector
from mysql.connector import Error

def connect_to_database(host, database, user, password):
    """
    连接到MySQL数据库
    """
    try:
      connection = mysql.connector.connect(
            host=host,
            database=database,
            user=user,
            password=password
      )
      if connection.is_connected():
            print("成功连接到数据库")
            return connection
    except Error as e:
      print(f"错误: {e}")
      return None

def execute_query(connection, query):
    """
    执行SQL查询并返回结果
    """
    cursor = connection.cursor()
    try:
      cursor.execute(query)
      result = cursor.fetchall()
      columns = cursor.column_names
      return columns, result
    except Error as e:
      print(f"查询错误: {e}")
      return None, None
    finally:
      cursor.close()
定义 SQL 天生工具

我们必要一个工具,将自然语言查询转化为 SQL。这里我们使用 LangChain 的工具系统,团结 OpenAI 的 GPT 模型来天生 SQL 语句。
设置 OpenAI

首先,确保您已得到 OpenAI 的 API 密钥,并将其设置为环境变量:
export OPENAI_API_KEY='your-openai-api-key'
创建 SQL 天生工具

# sql_tool.py
from langchain import ChatOpenAI, LLMChain
from langchain.prompts import PromptTemplate

class SQLGenerator:
    def __init__(self, openai_api_key, db_schema):
      """
      初始化 SQL 生成器
      """
      self.db_schema = db_schema
      self.llm = ChatOpenAI(
            model_name="gpt-4o-mini"
            openai_api_key=openai_api_key,
            temperature=0
      )
      self.prompt = PromptTemplate(
            input_variables=["user_query", "db_schema"],
            template="""
            你是一个SQL生成器,基于用户的自然语言查询生成对应的SQL语句。请确保生成的SQL语句是针对以下数据库模式的:

            {db_schema}

            用户查询: {user_query}

            生成的SQL语句:
            """
      )
      self.chain = LLMChain(llm=self.llm, prompt=self.prompt)
   
    def generate_sql(self, user_query):
      """
      根据用户查询生成SQL语句
      """
      sql = self.chain.invoke({
            "user_query": user_query,
            "db_schema": self.db_schema
      })
      # 清理生成的 SQL 语句,去掉多余的 markdown 或反引号
      sql = sql["text"].replace("```sql\n", "").replace("```", "").strip()# 清理多余标记
      return sql
      return sql.strip()
获取数据库模式

为了让 LLM 能够明确数据库布局,您必要提供数据库模式。可以通过查询 MySQL 数据库的 information_schema 获取全部表和列的信息。
# schema.py
import mysql.connector
from mysql.connector import Error

def get_db_schema(connection):
    """
    获取数据库的模式,包括所有表和列
    """
    cursor = connection.cursor()
    try:
      cursor.execute("SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE()")
      rows = cursor.fetchall()
      schema = {}
      for table, column, data_type in rows:
            if table not in schema:
                schema = []
            schema.append(f"{column} {data_type}")
      schema_str = ""
      for table, columns in schema.items():
            schema_str += f"表 `{table}` ("
            schema_str += ", ".join(columns)
            schema_str += ")\
"
      return schema_str
    except Error as e:
      print(f"获取模式错误: {e}")
      return ""
    finally:
      cursor.close()
构建 LangChain Agent

LangChain 提供了一个灵活的框架,可以将多个工具集成到一个智能代理(Agent)中,这里也可以用OpenAI Swarm替换,调用起来会更加方便。我们的代理将包括以下组件:

[*]SQL 天生工具:将自然语言查询转化为 SQL。
[*]数据库查询工具:执行 SQL 语句并返回结果。
定义 Agent 的工具

# agent.py
from langchain.agents import Tool, create_openai_functions_agent, AgentExecutor
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.agents import AgentOutputParser
import re
from pydantic import BaseModel
from typing import Any, Dict
from sql_tool import SQLGenerator
from db import execute_query
from schema import get_db_schema
# from security import is_safe_sql
from tabulate import tabulate

class ExecuteSQLArgs(BaseModel):
    query: str

def execute_sql_tool(args: ExecuteSQLArgs, sql_generator: SQLGenerator, db_connection) -> str:
    query = args.query
    sql = sql_generator.generate_sql(query)
    print(f"生成的 SQL 语句:\n{sql}")# 调试输出
    # if not sql or not is_safe_sql(sql):
    #   return "生成的SQL语句为空或包含不允许的操作。"

    # 执行 SQL 查询
    columns, result = execute_query(db_connection, sql)
    if columns and result:
      # 将结果格式化为表格字符串
      table = tabulate(result, headers=columns, tablefmt="grid")
      return f"查询结果:\n{table}"
    return "查询失败或无结果。"

def create_agent(openai_api_key, db_connection, user_input):
    # 获取数据库模式
    db_schema = get_db_schema(db_connection)

    # 初始化 SQL 生成工具
    sql_generator = SQLGenerator(openai_api_key, db_schema)

    # 定义执行 SQL 的工具
    def execute_sql_function(user_input):
      sql = sql_generator.generate_sql(user_input)
      columns, result = execute_query(db_connection, sql)
      if columns and result:
            # 将结果格式化为表格字符串
            from tabulate import tabulate
            table = tabulate(result, headers=columns, tablefmt="grid")
            return f"查询结果:\n{table}"
      return "查询失败或无结果。"

    execute_sql_tool_instance = Tool(
      name="execute_sql",
      func=execute_sql_function,
      description="执行给定的SQL查询,并返回结果。用户提供的查询将被转化为SQL语句。",
      args_schema=ExecuteSQLArgs
    )

    tools =

    # 定义 PromptTemplate
    prompt_template = PromptTemplate(
      input_variables=["input"],
      template="""
    你是一个智能的数据分析助手。你将根据用户的自然语言查询生成相应的SQL语句,并执行查询以获取所需的信息。

    使用以下工具来完成任务:
    {tools}

    用户查询: {input}

    请根据用户的需求,选择合适的工具并返回结果。如果需要返回数据库查询的结果,将以表格的形式展现。
    {agent_scratchpad}
    """
    )

    # 创建 OpenAI LLM
    llm = ChatOpenAI(model_name="gpt-4o-mini", openai_api_key=openai_api_key, temperature=0)

    # 创建代理
    agent = create_openai_functions_agent(
      llm=llm,
      tools=tools,
      prompt=prompt_template,
      # verbose=True
    )

    agent_executor = AgentExecutor(agent=agent, tools=tools)
    response = agent_executor.invoke({"input": user_input,"agent_scratchpad":[],"tools":tools})

    return response["output"]
完整的 Agent 构建流程

# main.py
import os
from db import connect_to_database
from agent import create_agent
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

# 定义 API 请求体
class QueryRequest(BaseModel):
    query: str

# 定义 API 响应体
class QueryResponse(BaseModel):
    response: str

# 初始化数据库连接
db_connection = connect_to_database(
    host='0.0.0.0',
    database='db',
    user='user',
    password='password'
)

if not db_connection:
    raise Exception("无法连接到数据库")

# 创建Agent
# openai_api_key = os.getenv('OPENAI_API_KEY')
# if not openai_api_key:
#   raise Exception("请设置 OPENAI_API_KEY 环境变量")
openai_api_key="请输入自己的key"



# 初始化FastAPI应用
app = FastAPI()

@app.post("/query", response_model=QueryResponse)
def query_database(request: QueryRequest):
    user_query = request.query
    print("user_query:"+user_query)
    try:
      result =create_agent(openai_api_key, db_connection,user_query)
      print(result)
      return QueryResponse(response=result)
    except Exception as e:
      raise HTTPException(status_code=500, detail=str(e))
开辟 API 接口

为了使我们的智能代理能够通过 HTTP 请求进行访问,我们使用 FastAPI 创建一个简单的 API 接口。
启动 API 服务器

保存上述 main.py 文件后,在终端中运行以下命令启动 API 服务器:
uvicorn main:app --host 0.0.0.0 --port 8000
这将启动一个在 http://0.0.0.0:8000 运行的 API 服务器。您可以通过 http://0.0.0.0:8000/docs 访问自动天生的文档,并测试 API。
使用 API

发送一个 POST 请求到 /query 端点,包含用户的自然语言查询。例如:
请求 URL: http://localhost:8000/query
请求方法: POST
请求体:
{
    "query": "查找node有几种type,每种type有多少数据。"
}
相应:
根据查询,以下是不同类型的节点及其数量的数据:

|   type |   count |
|--------|---------|
|      1 |    4481 |
|      2 |   477 |
|      3 |   106 |
调用的价格也很便宜,不过这个主要和返回查询数据的多少有关,由于返回的数据也必要再次输入大模型,也管帐算token量。
安全性与最佳实践

在构建能够天生和执行 SQL 语句的系统时,必须特殊注意安全性,尤其是防止 SQL 注入攻击。以下是一些最佳实践:

[*]验证 SQL 语句:在执行任何天生的 SQL 语句之前,尽量验证和过滤潜伏的伤害语句。
[*]最小权限原则:数据库用户应仅具有执行必要操作的权限,制止授予过高的权限。
[*]日志记录:记录全部天生和执行的 SQL 语句,以便审计和排查问题。
[*]使用参数化查询:尽大概使用参数化查询来防止 SQL 注入,虽然在本例中通过工具执行 SQL 语句必要特殊小心。
增强 SQL 安全性的示例

以下是一个简单的 SQL 验证函数,可以在执行前对 SQL 语句进行检查:
# security.py
import re

def is_safe_sql(sql):
    """
    简单的SQL安全检查函数
    """
    # 禁止删除、更新等危险操作
    forbidden = ["DROP", "DELETE", "UPDATE", "INSERT", "ALTER"]
    for word in forbidden:
      if re.search(rf"\\b{word}\\b", sql, re.IGNORECASE):
            return False
    return True
在 execute_sql 函数中调用这个安全检查:
# 修改 agent.py 中的 execute_sql 函数
from security.py import is_safe_sql

def execute_sql(user_input):
    sql = sql_generator.generate_sql(user_input)
   
    if not is_safe_sql(sql):
      return "生成的SQL语句包含不允许的操作。"
   
    columns, result = execute_query(db_connection, sql)
    if columns and result:
      from tabulate import tabulate
      table = tabulate(result, headers=columns, tablefmt="grid")
      return f"查询结果:\
{table}"
    return "查询失败或无结果。"
总结

通过本文的引导,您已经学会了如何使用 LangChain 构建一个智能代理,该代理能够将自然语言查询转化为 SQL 语句,并查询 MySQL 数据库以提供相应的数据分析信息。进一步地,我们还通过 FastAPI 为这一功能提供了一个易于访问的 API 接口。
这一系统的潜伏应用场景非常广泛,例如商业智能、客户服务自动化以及数据分析报告天生等。随着自然语言处置惩罚技术的不断进步,信赖这一类系统将在更多范畴得到应用和扩展。
在实际部署时,请务必思量安全性,确保系统能够抵御潜伏的攻击并掩护敏感数据。同时,根据您的具体需求,可以进一步优化和扩展系统功能,如支持更多数据库类型、增强自然语言明确能力以及提供更丰富的分析工具。
盼望本篇博客对您在构建类似系统时有所帮助。如果您有任何疑问或必要进一步的引导,接待在评论区交换!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 使用 LangChain 构建基于自然语言天生 SQL 并查询 MySQL 数据库的智能代理