ToB企服应用市场:ToB评测及商务社交产业平台

标题: Python+Pymysql+PooledDB实现数据库连接池 [打印本页]

作者: 八卦阵    时间: 2024-8-18 22:25
标题: Python+Pymysql+PooledDB实现数据库连接池
1. 引言
在测试中,频繁地创建和销毁数据库连接会消耗大量的资源,并且可能导致数据库连接的泄漏大概性能下降。因此,利用连接池可以有用地管理数据库连接,提高步伐的性能和可靠性。
2. 实现MySQL连接池的类
我们将利用Python的pymysql库和dbutils库中的PooledDB来实现MySQL连接池。
  1. pip install pymysql
  2. pip install dbutils
复制代码
  1. import pymysql
  2. from dbutils.pooled_db import PooledDB
  3. import os
  4. from common_methods.file_operation import f_p
  5. class ConnectMysqlPool:
  6.     """
  7.     连接MySQL数据库的连接池类。
  8.     属性:
  9.     db_account (dict): 数据库账号信息,包括用户名和密码等。
  10.     db (str): 数据库名称。
  11.     pool (PooledDB): MySQL连接池对象。
  12.     方法:
  13.     __init__: 初始化连接池类实例。
  14.     _obtaining_data: 从配置文件中获取测试数据。
  15.     create_mysql_pool: 创建MySQL连接池。
  16.     get_conn: 从连接池中获取一个连接。
  17.     close: 关闭数据库连接和游标。
  18.     execute: 使用连接执行SQL语句。
  19.     """
  20.     def __init__(self, db, db_account):
  21.         """
  22.         初始化连接池类实例。
  23.         参数:
  24.         db (str): 测试库名称。
  25.         db_account (dict): 包含数据库账号信息的字典。
  26.         """
  27.         self._common_data = self._obtaining_data() # 获取测试数据
  28.         self.db_account = db_account # 数据库账号信息
  29.         self.db = db # 测试库
  30.         # 创建连接池
  31.         self.pool = self.create_mysql_pool()
  32.     # 从配置文件中获取测试数据
  33.     def _obtaining_data(self):
  34.         """
  35.         从指定的YAML文件中读取连接配置数据。
  36.         返回:
  37.         dict: 包含连接配置信息的字典。
  38.         """
  39.         file_path = os.path.join(
  40.             os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
  41.             "data_driven",
  42.             "mysql_data.yaml"
  43.         )
  44.         common_data = f_p.red_yaml_file(file_path)
  45.         return common_data
  46.     # 创建MySQL连接池
  47.     def create_mysql_pool(self):
  48.         """
  49.         根据配置信息创建MySQL连接池。
  50.         返回:
  51.         PooledDB: MySQL连接池对象。
  52.         """
  53.         pool = PooledDB(
  54.             **self._common_data['connect_pool_config'],
  55.             **self.db_account,
  56.             db=self.db,
  57.             creator=pymysql
  58.         )
  59.         return pool
  60.     # 从连接池中获取一个连接
  61.     def get_conn(self):
  62.         """
  63.         从连接池中获取一个数据库连接。
  64.         返回:
  65.         connection: 数据库连接对象。
  66.         """
  67.         return self.pool.connection()
  68.     # 关闭数据库连接和游标
  69.     def close(self, conn, cursor):
  70.         """
  71.         关闭数据库连接和游标。
  72.         参数:
  73.         conn (connection): 数据库连接对象。
  74.         cursor (cursor): 数据库游标对象。
  75.         """
  76.         try:
  77.             if cursor:
  78.                 cursor.close()
  79.             if conn:
  80.                 conn.close()
  81.         except Exception as e:
  82.             print(f"关闭连接或游标时发成错误:{e}")
  83.     # 使用连接执行sql
  84.     def execute(self, sql, params):
  85.         """
  86.         使用获取的连接执行SQL语句。
  87.         参数:
  88.         sql (str): SQL语句。
  89.         params (tuple): SQL参数。
  90.         返回:
  91.         list: 执行SQL语句后的结果集,若执行出错则返回None。
  92.         """
  93.         conn = self.get_conn()
  94.         cursor = conn.cursor()
  95.         try:
  96.             cursor.execute(sql, params)
  97.             conn.commit()
  98.             result = cursor.fetchall()
  99.             return result
  100.         except Exception as e:
  101.             print(f"表:{self.db},执行sql:{sql},报错:{e}")
  102.             conn.rollback()
  103.             return None
  104.         finally:
  105.             self.close(conn, cursor)
复制代码
  1. def red_yaml_file(self,file_path,name=None):
  2.     try:
  3.         with open(file_path,'r') as file:
  4.             data = yaml.safe_load(file)
  5.             return_data = data if name is None else data.get(name,{}) # 返回数据
  6.             return return_data
  7.     except Exception as e:
  8.         print(f"{file_path}读取数据失败:{e}")
  9.         return None
复制代码
  1. connect_pool_config:
  2.   charset: "utf8"
  3.   mincached: 2 # 初始化时,连接池中至少创建空闲的链接,0表示不创建
  4.   maxcached: 50 # 连接池允许最大的连接数, 0和None表示不限制连接数
  5.   maxshared: 0 # 连接池最多可共享的连接数量,0和None表示全部共享。PS:pymysql不支持事务
  6.   maxconnections: 10 # 连接池最大并发连接数量
  7.   blocking: True # 连接池中没有可用连接后,是否阻塞等待
  8.   setsession: [] # 开始会话前执行的命令列表
复制代码
3. 利用示例
以下是利用ConnectMysqlPool类的示例代码:
  1. # 创建连接池对象
  2. db_account = {
  3.     "host": "localhost",
  4.     "port": 3306,
  5.     "user": "root",
  6.     "password": "password",
  7.     "charset": "utf8mb4"
  8. }
  9. db_name = "test_db"
  10. cmp = ConnectMysqlPool(db_name,db_account)
  11. sql = "select * from table_name where id < %s"
  12. params = (100,)
  13. result = cmp.execute(sql,params) # 返回表中id小于100的数据
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4