八卦阵 发表于 2024-8-18 22:25:31

Python+Pymysql+PooledDB实现数据库连接池

1. 引言
在测试中,频繁地创建和销毁数据库连接会消耗大量的资源,并且可能导致数据库连接的泄漏大概性能下降。因此,利用连接池可以有用地管理数据库连接,提高步伐的性能和可靠性。
2. 实现MySQL连接池的类
我们将利用Python的pymysql库和dbutils库中的PooledDB来实现MySQL连接池。

[*]终端输入下令安装两个库
pip install pymysql
pip install dbutils

[*]以下是实现连接池的主要类ConnectMysqlPool的详细先容:
import pymysql
from dbutils.pooled_db import PooledDB
import os
from common_methods.file_operation import f_p

class ConnectMysqlPool:
    """
    连接MySQL数据库的连接池类。

    属性:
    db_account (dict): 数据库账号信息,包括用户名和密码等。
    db (str): 数据库名称。
    pool (PooledDB): MySQL连接池对象。

    方法:
    __init__: 初始化连接池类实例。
    _obtaining_data: 从配置文件中获取测试数据。
    create_mysql_pool: 创建MySQL连接池。
    get_conn: 从连接池中获取一个连接。
    close: 关闭数据库连接和游标。
    execute: 使用连接执行SQL语句。
    """
    def __init__(self, db, db_account):
      """
      初始化连接池类实例。

      参数:
      db (str): 测试库名称。
      db_account (dict): 包含数据库账号信息的字典。
      """
      self._common_data = self._obtaining_data() # 获取测试数据
      self.db_account = db_account # 数据库账号信息
      self.db = db # 测试库

      # 创建连接池
      self.pool = self.create_mysql_pool()

    # 从配置文件中获取测试数据
    def _obtaining_data(self):
      """
      从指定的YAML文件中读取连接配置数据。

      返回:
      dict: 包含连接配置信息的字典。
      """
      file_path = os.path.join(
            os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
            "data_driven",
            "mysql_data.yaml"
      )
      common_data = f_p.red_yaml_file(file_path)
      return common_data

    # 创建MySQL连接池
    def create_mysql_pool(self):
      """
      根据配置信息创建MySQL连接池。

      返回:
      PooledDB: MySQL连接池对象。
      """
      pool = PooledDB(
            **self._common_data['connect_pool_config'],
            **self.db_account,
            db=self.db,
            creator=pymysql
      )
      return pool

    # 从连接池中获取一个连接
    def get_conn(self):
      """
      从连接池中获取一个数据库连接。

      返回:
      connection: 数据库连接对象。
      """
      return self.pool.connection()

    # 关闭数据库连接和游标
    def close(self, conn, cursor):
      """
      关闭数据库连接和游标。

      参数:
      conn (connection): 数据库连接对象。
      cursor (cursor): 数据库游标对象。
      """
      try:
            if cursor:
                cursor.close()
            if conn:
                conn.close()
      except Exception as e:
            print(f"关闭连接或游标时发成错误:{e}")

    # 使用连接执行sql
    def execute(self, sql, params):
      """
      使用获取的连接执行SQL语句。

      参数:
      sql (str): SQL语句。
      params (tuple): SQL参数。

      返回:
      list: 执行SQL语句后的结果集,若执行出错则返回None。
      """
      conn = self.get_conn()
      cursor = conn.cursor()

      try:
            cursor.execute(sql, params)
            conn.commit()
            result = cursor.fetchall()
            return result
      except Exception as e:
            print(f"表:{self.db},执行sql:{sql},报错:{e}")
            conn.rollback()
            return None
      finally:
            self.close(conn, cursor)

[*]red_yaml_file:读取yaml文件方法,代码在下方
def red_yaml_file(self,file_path,name=None):

    try:
      with open(file_path,'r') as file:
            data = yaml.safe_load(file)
            return_data = data if name is None else data.get(name,{}) # 返回数据
            return return_data
    except Exception as e:
      print(f"{file_path}读取数据失败:{e}")
      return None

[*]mysql_data.yaml:yaml文件写入的连接池设置数据
connect_pool_config:
charset: "utf8"
mincached: 2 # 初始化时,连接池中至少创建空闲的链接,0表示不创建
maxcached: 50 # 连接池允许最大的连接数, 0和None表示不限制连接数
maxshared: 0 # 连接池最多可共享的连接数量,0和None表示全部共享。PS:pymysql不支持事务
maxconnections: 10 # 连接池最大并发连接数量
blocking: True # 连接池中没有可用连接后,是否阻塞等待
setsession: [] # 开始会话前执行的命令列表
3. 利用示例
以下是利用ConnectMysqlPool类的示例代码:
# 创建连接池对象
db_account = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "password",
    "charset": "utf8mb4"
}
db_name = "test_db"

cmp = ConnectMysqlPool(db_name,db_account)

sql = "select * from table_name where id < %s"
params = (100,)
result = cmp.execute(sql,params) # 返回表中id小于100的数据

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Python+Pymysql+PooledDB实现数据库连接池