1. 引言
在测试中,频繁地创建和销毁数据库连接会消耗大量的资源,并且可能导致数据库连接的泄漏大概性能下降。因此,利用连接池可以有用地管理数据库连接,提高步伐的性能和可靠性。
2. 实现MySQL连接池的类
我们将利用Python的pymysql库和dbutils库中的PooledDB来实现MySQL连接池。
- pip install pymysql
- pip install dbutils
复制代码
- 以下是实现连接池的主要类ConnectMysqlPool的详细先容:
- import pymysql
- from dbutils.pooled_db import PooledDB
- import os
- from common_methods.file_operation import f_p
- class ConnectMysqlPool:
- """
- 连接MySQL数据库的连接池类。
- 属性:
- db_account (dict): 数据库账号信息,包括用户名和密码等。
- db (str): 数据库名称。
- pool (PooledDB): MySQL连接池对象。
- 方法:
- __init__: 初始化连接池类实例。
- _obtaining_data: 从配置文件中获取测试数据。
- create_mysql_pool: 创建MySQL连接池。
- get_conn: 从连接池中获取一个连接。
- close: 关闭数据库连接和游标。
- execute: 使用连接执行SQL语句。
- """
- def __init__(self, db, db_account):
- """
- 初始化连接池类实例。
- 参数:
- db (str): 测试库名称。
- db_account (dict): 包含数据库账号信息的字典。
- """
- self._common_data = self._obtaining_data() # 获取测试数据
- self.db_account = db_account # 数据库账号信息
- self.db = db # 测试库
- # 创建连接池
- self.pool = self.create_mysql_pool()
- # 从配置文件中获取测试数据
- def _obtaining_data(self):
- """
- 从指定的YAML文件中读取连接配置数据。
- 返回:
- dict: 包含连接配置信息的字典。
- """
- file_path = os.path.join(
- os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
- "data_driven",
- "mysql_data.yaml"
- )
- common_data = f_p.red_yaml_file(file_path)
- return common_data
- # 创建MySQL连接池
- def create_mysql_pool(self):
- """
- 根据配置信息创建MySQL连接池。
- 返回:
- PooledDB: MySQL连接池对象。
- """
- pool = PooledDB(
- **self._common_data['connect_pool_config'],
- **self.db_account,
- db=self.db,
- creator=pymysql
- )
- return pool
- # 从连接池中获取一个连接
- def get_conn(self):
- """
- 从连接池中获取一个数据库连接。
- 返回:
- connection: 数据库连接对象。
- """
- return self.pool.connection()
- # 关闭数据库连接和游标
- def close(self, conn, cursor):
- """
- 关闭数据库连接和游标。
- 参数:
- conn (connection): 数据库连接对象。
- cursor (cursor): 数据库游标对象。
- """
- try:
- if cursor:
- cursor.close()
- if conn:
- conn.close()
- except Exception as e:
- print(f"关闭连接或游标时发成错误:{e}")
- # 使用连接执行sql
- def execute(self, sql, params):
- """
- 使用获取的连接执行SQL语句。
- 参数:
- sql (str): SQL语句。
- params (tuple): SQL参数。
- 返回:
- list: 执行SQL语句后的结果集,若执行出错则返回None。
- """
- conn = self.get_conn()
- cursor = conn.cursor()
- try:
- cursor.execute(sql, params)
- conn.commit()
- result = cursor.fetchall()
- return result
- except Exception as e:
- print(f"表:{self.db},执行sql:{sql},报错:{e}")
- conn.rollback()
- return None
- finally:
- self.close(conn, cursor)
复制代码
- red_yaml_file:读取yaml文件方法,代码在下方
- def red_yaml_file(self,file_path,name=None):
- try:
- with open(file_path,'r') as file:
- data = yaml.safe_load(file)
- return_data = data if name is None else data.get(name,{}) # 返回数据
- return return_data
- except Exception as e:
- print(f"{file_path}读取数据失败:{e}")
- return None
复制代码
- mysql_data.yaml:yaml文件写入的连接池设置数据
- connect_pool_config:
- charset: "utf8"
- mincached: 2 # 初始化时,连接池中至少创建空闲的链接,0表示不创建
- maxcached: 50 # 连接池允许最大的连接数, 0和None表示不限制连接数
- maxshared: 0 # 连接池最多可共享的连接数量,0和None表示全部共享。PS:pymysql不支持事务
- maxconnections: 10 # 连接池最大并发连接数量
- blocking: True # 连接池中没有可用连接后,是否阻塞等待
- setsession: [] # 开始会话前执行的命令列表
复制代码 3. 利用示例
以下是利用ConnectMysqlPool类的示例代码:
- # 创建连接池对象
- db_account = {
- "host": "localhost",
- "port": 3306,
- "user": "root",
- "password": "password",
- "charset": "utf8mb4"
- }
- db_name = "test_db"
- cmp = ConnectMysqlPool(db_name,db_account)
- sql = "select * from table_name where id < %s"
- params = (100,)
- result = cmp.execute(sql,params) # 返回表中id小于100的数据
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |