Python+Pymysql+PooledDB实现数据库连接池
1. 引言在测试中,频繁地创建和销毁数据库连接会消耗大量的资源,并且可能导致数据库连接的泄漏大概性能下降。因此,利用连接池可以有用地管理数据库连接,提高步伐的性能和可靠性。
2. 实现MySQL连接池的类
我们将利用Python的pymysql库和dbutils库中的PooledDB来实现MySQL连接池。
[*]终端输入下令安装两个库
pip install pymysql
pip install dbutils
[*]以下是实现连接池的主要类ConnectMysqlPool的详细先容:
import pymysql
from dbutils.pooled_db import PooledDB
import os
from common_methods.file_operation import f_p
class ConnectMysqlPool:
"""
连接MySQL数据库的连接池类。
属性:
db_account (dict): 数据库账号信息,包括用户名和密码等。
db (str): 数据库名称。
pool (PooledDB): MySQL连接池对象。
方法:
__init__: 初始化连接池类实例。
_obtaining_data: 从配置文件中获取测试数据。
create_mysql_pool: 创建MySQL连接池。
get_conn: 从连接池中获取一个连接。
close: 关闭数据库连接和游标。
execute: 使用连接执行SQL语句。
"""
def __init__(self, db, db_account):
"""
初始化连接池类实例。
参数:
db (str): 测试库名称。
db_account (dict): 包含数据库账号信息的字典。
"""
self._common_data = self._obtaining_data() # 获取测试数据
self.db_account = db_account # 数据库账号信息
self.db = db # 测试库
# 创建连接池
self.pool = self.create_mysql_pool()
# 从配置文件中获取测试数据
def _obtaining_data(self):
"""
从指定的YAML文件中读取连接配置数据。
返回:
dict: 包含连接配置信息的字典。
"""
file_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"data_driven",
"mysql_data.yaml"
)
common_data = f_p.red_yaml_file(file_path)
return common_data
# 创建MySQL连接池
def create_mysql_pool(self):
"""
根据配置信息创建MySQL连接池。
返回:
PooledDB: MySQL连接池对象。
"""
pool = PooledDB(
**self._common_data['connect_pool_config'],
**self.db_account,
db=self.db,
creator=pymysql
)
return pool
# 从连接池中获取一个连接
def get_conn(self):
"""
从连接池中获取一个数据库连接。
返回:
connection: 数据库连接对象。
"""
return self.pool.connection()
# 关闭数据库连接和游标
def close(self, conn, cursor):
"""
关闭数据库连接和游标。
参数:
conn (connection): 数据库连接对象。
cursor (cursor): 数据库游标对象。
"""
try:
if cursor:
cursor.close()
if conn:
conn.close()
except Exception as e:
print(f"关闭连接或游标时发成错误:{e}")
# 使用连接执行sql
def execute(self, sql, params):
"""
使用获取的连接执行SQL语句。
参数:
sql (str): SQL语句。
params (tuple): SQL参数。
返回:
list: 执行SQL语句后的结果集,若执行出错则返回None。
"""
conn = self.get_conn()
cursor = conn.cursor()
try:
cursor.execute(sql, params)
conn.commit()
result = cursor.fetchall()
return result
except Exception as e:
print(f"表:{self.db},执行sql:{sql},报错:{e}")
conn.rollback()
return None
finally:
self.close(conn, cursor)
[*]red_yaml_file:读取yaml文件方法,代码在下方
def red_yaml_file(self,file_path,name=None):
try:
with open(file_path,'r') as file:
data = yaml.safe_load(file)
return_data = data if name is None else data.get(name,{}) # 返回数据
return return_data
except Exception as e:
print(f"{file_path}读取数据失败:{e}")
return None
[*]mysql_data.yaml:yaml文件写入的连接池设置数据
connect_pool_config:
charset: "utf8"
mincached: 2 # 初始化时,连接池中至少创建空闲的链接,0表示不创建
maxcached: 50 # 连接池允许最大的连接数, 0和None表示不限制连接数
maxshared: 0 # 连接池最多可共享的连接数量,0和None表示全部共享。PS:pymysql不支持事务
maxconnections: 10 # 连接池最大并发连接数量
blocking: True # 连接池中没有可用连接后,是否阻塞等待
setsession: [] # 开始会话前执行的命令列表
3. 利用示例
以下是利用ConnectMysqlPool类的示例代码:
# 创建连接池对象
db_account = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "password",
"charset": "utf8mb4"
}
db_name = "test_db"
cmp = ConnectMysqlPool(db_name,db_account)
sql = "select * from table_name where id < %s"
params = (100,)
result = cmp.execute(sql,params) # 返回表中id小于100的数据
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]