使用 LangChain 构建基于自然语言天生 SQL 并查询 MySQL 数据库的智能代理
在当今数据驱动的期间,能够将自然语言转化为数据库查询语句(如 SQL)并获取相应数据,对于非技术用户来说具有极大的代价。借助 LangChain 的强大功能,我们可以开辟一个智能代理(Agent),该代理能够明确自然语言查询,天生相应的 SQL 语句,并与 MySQL 数据库交互,提供所需的分析信息。此外,我们还可以基于 LangServer 为这一功能天生 API 接口,使其更易于集成和使用。本文将带您一步步实现这一目的,包括环境设置、工具集成、Agent 构建及 API 接口的开辟。
实现代码:https://gitcode.com/sequoia00/LangChain-Agent_Mysql/overview
目录
[*]项目概述
[*]环境设置
[*]数据库连接
[*]定义 SQL 天生工具
[*]构建 LangChain Agent
[*]开辟 API 接口
[*]安全性与最佳实践
[*]总结
项目概述
我们将创建一个系统,该系统包括以下部分:
[*]自然语言处置惩罚(NLP):解析用户的自然语言查询并天生相应的 SQL 语句。
[*]数据库交互:执行天生的 SQL 语句,并从 MySQL 数据库中获取数据。
[*]API 接口:提供一个可通过 HTTP 请求调用的 API,使外部应用步伐能够使用该功能。
环境设置
首先,确保您的开辟环境中已安装以下工具和库:
[*]Python 3.7+
[*]MySQL 数据库
[*]OpenAI API Key(如果使用 OpenAI 的模型)
[*]必要的 Python 库:langchain, openai, mysql-connector-python, fastapi, uvicorn
安装必要的库
使用 pip 安装所需的 Python 库:
pip install langchain openai mysql-connector-python fastapi uvicorn
数据库连接
确保您有一个运行中的 MySQL 数据库,并具备相应的访问权限。我们将使用 mysql-connector-python 库与数据库进行交互。
创建数据库连接
以下是一个用于连接 MySQL 数据库的示例代码:
# db.py
import mysql.connector
from mysql.connector import Error
def connect_to_database(host, database, user, password):
"""
连接到MySQL数据库
"""
try:
connection = mysql.connector.connect(
host=host,
database=database,
user=user,
password=password
)
if connection.is_connected():
print("成功连接到数据库")
return connection
except Error as e:
print(f"错误: {e}")
return None
def execute_query(connection, query):
"""
执行SQL查询并返回结果
"""
cursor = connection.cursor()
try:
cursor.execute(query)
result = cursor.fetchall()
columns = cursor.column_names
return columns, result
except Error as e:
print(f"查询错误: {e}")
return None, None
finally:
cursor.close()
定义 SQL 天生工具
我们必要一个工具,将自然语言查询转化为 SQL。这里我们使用 LangChain 的工具系统,团结 OpenAI 的 GPT 模型来天生 SQL 语句。
设置 OpenAI
首先,确保您已得到 OpenAI 的 API 密钥,并将其设置为环境变量:
export OPENAI_API_KEY='your-openai-api-key'
创建 SQL 天生工具
# sql_tool.py
from langchain import ChatOpenAI, LLMChain
from langchain.prompts import PromptTemplate
class SQLGenerator:
def __init__(self, openai_api_key, db_schema):
"""
初始化 SQL 生成器
"""
self.db_schema = db_schema
self.llm = ChatOpenAI(
model_name="gpt-4o-mini"
openai_api_key=openai_api_key,
temperature=0
)
self.prompt = PromptTemplate(
input_variables=["user_query", "db_schema"],
template="""
你是一个SQL生成器,基于用户的自然语言查询生成对应的SQL语句。请确保生成的SQL语句是针对以下数据库模式的:
{db_schema}
用户查询: {user_query}
生成的SQL语句:
"""
)
self.chain = LLMChain(llm=self.llm, prompt=self.prompt)
def generate_sql(self, user_query):
"""
根据用户查询生成SQL语句
"""
sql = self.chain.invoke({
"user_query": user_query,
"db_schema": self.db_schema
})
# 清理生成的 SQL 语句,去掉多余的 markdown 或反引号
sql = sql["text"].replace("```sql\n", "").replace("```", "").strip()# 清理多余标记
return sql
return sql.strip()
获取数据库模式
为了让 LLM 能够明确数据库布局,您必要提供数据库模式。可以通过查询 MySQL 数据库的 information_schema 获取全部表和列的信息。
# schema.py
import mysql.connector
from mysql.connector import Error
def get_db_schema(connection):
"""
获取数据库的模式,包括所有表和列
"""
cursor = connection.cursor()
try:
cursor.execute("SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE()")
rows = cursor.fetchall()
schema = {}
for table, column, data_type in rows:
if table not in schema:
schema = []
schema.append(f"{column} {data_type}")
schema_str = ""
for table, columns in schema.items():
schema_str += f"表 `{table}` ("
schema_str += ", ".join(columns)
schema_str += ")\
"
return schema_str
except Error as e:
print(f"获取模式错误: {e}")
return ""
finally:
cursor.close()
构建 LangChain Agent
LangChain 提供了一个灵活的框架,可以将多个工具集成到一个智能代理(Agent)中,这里也可以用OpenAI Swarm替换,调用起来会更加方便。我们的代理将包括以下组件:
[*]SQL 天生工具:将自然语言查询转化为 SQL。
[*]数据库查询工具:执行 SQL 语句并返回结果。
定义 Agent 的工具
# agent.py
from langchain.agents import Tool, create_openai_functions_agent, AgentExecutor
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.agents import AgentOutputParser
import re
from pydantic import BaseModel
from typing import Any, Dict
from sql_tool import SQLGenerator
from db import execute_query
from schema import get_db_schema
# from security import is_safe_sql
from tabulate import tabulate
class ExecuteSQLArgs(BaseModel):
query: str
def execute_sql_tool(args: ExecuteSQLArgs, sql_generator: SQLGenerator, db_connection) -> str:
query = args.query
sql = sql_generator.generate_sql(query)
print(f"生成的 SQL 语句:\n{sql}")# 调试输出
# if not sql or not is_safe_sql(sql):
# return "生成的SQL语句为空或包含不允许的操作。"
# 执行 SQL 查询
columns, result = execute_query(db_connection, sql)
if columns and result:
# 将结果格式化为表格字符串
table = tabulate(result, headers=columns, tablefmt="grid")
return f"查询结果:\n{table}"
return "查询失败或无结果。"
def create_agent(openai_api_key, db_connection, user_input):
# 获取数据库模式
db_schema = get_db_schema(db_connection)
# 初始化 SQL 生成工具
sql_generator = SQLGenerator(openai_api_key, db_schema)
# 定义执行 SQL 的工具
def execute_sql_function(user_input):
sql = sql_generator.generate_sql(user_input)
columns, result = execute_query(db_connection, sql)
if columns and result:
# 将结果格式化为表格字符串
from tabulate import tabulate
table = tabulate(result, headers=columns, tablefmt="grid")
return f"查询结果:\n{table}"
return "查询失败或无结果。"
execute_sql_tool_instance = Tool(
name="execute_sql",
func=execute_sql_function,
description="执行给定的SQL查询,并返回结果。用户提供的查询将被转化为SQL语句。",
args_schema=ExecuteSQLArgs
)
tools =
# 定义 PromptTemplate
prompt_template = PromptTemplate(
input_variables=["input"],
template="""
你是一个智能的数据分析助手。你将根据用户的自然语言查询生成相应的SQL语句,并执行查询以获取所需的信息。
使用以下工具来完成任务:
{tools}
用户查询: {input}
请根据用户的需求,选择合适的工具并返回结果。如果需要返回数据库查询的结果,将以表格的形式展现。
{agent_scratchpad}
"""
)
# 创建 OpenAI LLM
llm = ChatOpenAI(model_name="gpt-4o-mini", openai_api_key=openai_api_key, temperature=0)
# 创建代理
agent = create_openai_functions_agent(
llm=llm,
tools=tools,
prompt=prompt_template,
# verbose=True
)
agent_executor = AgentExecutor(agent=agent, tools=tools)
response = agent_executor.invoke({"input": user_input,"agent_scratchpad":[],"tools":tools})
return response["output"]
完整的 Agent 构建流程
# main.py
import os
from db import connect_to_database
from agent import create_agent
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
# 定义 API 请求体
class QueryRequest(BaseModel):
query: str
# 定义 API 响应体
class QueryResponse(BaseModel):
response: str
# 初始化数据库连接
db_connection = connect_to_database(
host='0.0.0.0',
database='db',
user='user',
password='password'
)
if not db_connection:
raise Exception("无法连接到数据库")
# 创建Agent
# openai_api_key = os.getenv('OPENAI_API_KEY')
# if not openai_api_key:
# raise Exception("请设置 OPENAI_API_KEY 环境变量")
openai_api_key="请输入自己的key"
# 初始化FastAPI应用
app = FastAPI()
@app.post("/query", response_model=QueryResponse)
def query_database(request: QueryRequest):
user_query = request.query
print("user_query:"+user_query)
try:
result =create_agent(openai_api_key, db_connection,user_query)
print(result)
return QueryResponse(response=result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
开辟 API 接口
为了使我们的智能代理能够通过 HTTP 请求进行访问,我们使用 FastAPI 创建一个简单的 API 接口。
启动 API 服务器
保存上述 main.py 文件后,在终端中运行以下命令启动 API 服务器:
uvicorn main:app --host 0.0.0.0 --port 8000
这将启动一个在 http://0.0.0.0:8000 运行的 API 服务器。您可以通过 http://0.0.0.0:8000/docs 访问自动天生的文档,并测试 API。
使用 API
发送一个 POST 请求到 /query 端点,包含用户的自然语言查询。例如:
请求 URL: http://localhost:8000/query
请求方法: POST
请求体:
{
"query": "查找node有几种type,每种type有多少数据。"
}
相应:
根据查询,以下是不同类型的节点及其数量的数据:
| type | count |
|--------|---------|
| 1 | 4481 |
| 2 | 477 |
| 3 | 106 |
调用的价格也很便宜,不过这个主要和返回查询数据的多少有关,由于返回的数据也必要再次输入大模型,也管帐算token量。
安全性与最佳实践
在构建能够天生和执行 SQL 语句的系统时,必须特殊注意安全性,尤其是防止 SQL 注入攻击。以下是一些最佳实践:
[*]验证 SQL 语句:在执行任何天生的 SQL 语句之前,尽量验证和过滤潜伏的伤害语句。
[*]最小权限原则:数据库用户应仅具有执行必要操作的权限,制止授予过高的权限。
[*]日志记录:记录全部天生和执行的 SQL 语句,以便审计和排查问题。
[*]使用参数化查询:尽大概使用参数化查询来防止 SQL 注入,虽然在本例中通过工具执行 SQL 语句必要特殊小心。
增强 SQL 安全性的示例
以下是一个简单的 SQL 验证函数,可以在执行前对 SQL 语句进行检查:
# security.py
import re
def is_safe_sql(sql):
"""
简单的SQL安全检查函数
"""
# 禁止删除、更新等危险操作
forbidden = ["DROP", "DELETE", "UPDATE", "INSERT", "ALTER"]
for word in forbidden:
if re.search(rf"\\b{word}\\b", sql, re.IGNORECASE):
return False
return True
在 execute_sql 函数中调用这个安全检查:
# 修改 agent.py 中的 execute_sql 函数
from security.py import is_safe_sql
def execute_sql(user_input):
sql = sql_generator.generate_sql(user_input)
if not is_safe_sql(sql):
return "生成的SQL语句包含不允许的操作。"
columns, result = execute_query(db_connection, sql)
if columns and result:
from tabulate import tabulate
table = tabulate(result, headers=columns, tablefmt="grid")
return f"查询结果:\
{table}"
return "查询失败或无结果。"
总结
通过本文的引导,您已经学会了如何使用 LangChain 构建一个智能代理,该代理能够将自然语言查询转化为 SQL 语句,并查询 MySQL 数据库以提供相应的数据分析信息。进一步地,我们还通过 FastAPI 为这一功能提供了一个易于访问的 API 接口。
这一系统的潜伏应用场景非常广泛,例如商业智能、客户服务自动化以及数据分析报告天生等。随着自然语言处置惩罚技术的不断进步,信赖这一类系统将在更多范畴得到应用和扩展。
在实际部署时,请务必思量安全性,确保系统能够抵御潜伏的攻击并掩护敏感数据。同时,根据您的具体需求,可以进一步优化和扩展系统功能,如支持更多数据库类型、增强自然语言明确能力以及提供更丰富的分析工具。
盼望本篇博客对您在构建类似系统时有所帮助。如果您有任何疑问或必要进一步的引导,接待在评论区交换!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]