ToB企服应用市场:ToB评测及商务社交产业平台

标题: Python开发中,SQLAlchemy 的同步操作和异步操作封装,以及常规CRUD的处理 [打印本页]

作者: 用户国营    时间: 2024-8-22 09:54
标题: Python开发中,SQLAlchemy 的同步操作和异步操作封装,以及常规CRUD的处理
在我们使用Python来和数据库打交道中,SQLAlchemy是一个非常不错的ORM工具,通过它我们可以很好的实现多种数据库的统一模子接入,而且它提供了非常多的特性,通过团结不同的数据库驱动,我们可以实现同步或者异步的处理封装。1、SQLAlchemy介绍

SQLAlchemy 是一个功能强盛且机动的 Python SQL 工具包和对象关系映射(ORM)库。它被广泛用于在 Python 项目中处理关系型数据库的场景,既提供了高级的 ORM 功能,又保留了对底层 SQL 语句的强盛控制力。SQLAlchemy 允许开发者通过 Python 代码与数据库进行交互,而无需直接编写 SQL 语句,同时也支持直接使用原生 SQL 进行复杂查询。下面是SQLAlchemy和我们常规数据库对象的对应关系说明。Engine    连接对象         驱动引擎Session   连接池           事件  由此开始查询Model     表                   类定义Column     列  Query     若干行         可以链式添加多个条件 在使用SQLAlchemy时,通常会将其与数据库对象对应起来。以下是SQLAlchemy和常规数据库对象的对应关系说明:1)数据库表 (Database Table)

  1. from sqlalchemy import Column, Integer, String, create_engine
  2. from sqlalchemy.ext.declarative import declarative_base
  3. Base = declarative_base()
  4. class User(Base):
  5.     __tablename__ = 'users'  # 数据库表名
  6.     id = Column(Integer, primary_key=True)
  7.     name = Column(String)
  8.     email = Column(String)
复制代码
2)数据库列 (Database Column)

  1. id = Column(Integer, primary_key=True)
  2. name = Column(String(50))
复制代码
3)数据库行 (Database Row)

  1. new_user = User(id=1, name='John Doe', email='john@example.com')
复制代码
4)主键 (Primary Key)

  1. id = Column(Integer, primary_key=True)
复制代码
5)外键 (Foreign Key)

  1. from sqlalchemy import ForeignKey
  2. from sqlalchemy.orm import relationship
  3. class Address(Base):
  4.     __tablename__ = 'addresses'
  5.     id = Column(Integer, primary_key=True)
  6.     user_id = Column(Integer, ForeignKey('users.id'))
  7.     user = relationship('User')
复制代码
6)关系 (Relationships)

  1. addresses = relationship("Address", back_populates="user")
复制代码
7)会话 (Session)

  1. from sqlalchemy.orm import sessionmaker
  2. Session = sessionmaker(bind=engine)
  3. session = Session()
  4. session.add(new_user)
  5. session.commit()
复制代码
通过以上对应关系,SQLAlchemy允许开发者以面向对象的方式与数据库交互,提供了一个Pythonic的接口来操作数据库。
 2、SQLAlchemy 的同步操作

 SQLAlchemy 提供了同步和异步两种操作方式,分别适用于不同的应用场景。以下是如何封装 SQLAlchemy 的同步和异步操作的方法说明:
在同步操作中,SQLAlchemy 使用传统的阻塞方式进行数据库操作。起首,定义一个底子的 Session 和 Engine 对象:
  1. from sqlalchemy import create_engine
  2. from sqlalchemy.orm import declarative_base, sessionmaker
  3. from typing import Generator
  4. from core.config import settings
  5. # 常规同步处理
  6. engine = create_engine(settings.DB_URI)
  7. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  8. def get_db() -> Generator:
  9.     """创建一个 SQLAlchemy 数据库会话-同步处理."""
  10.     try:
  11.         db = SessionLocal()
  12.         yield db
  13.     finally:
  14.         db.close()
复制代码
前面说了,使用SQLAlchemy可以实现不同数据库的统一模子的处理,我们可以对应创建不同数据库的连接(engine),如下是常规几种关系型数据库的连接处理。
  1. # mysql 数据库引擎
  2. engine = create_engine(
  3.     "mysql+pymysql://root:123456@127.0.0.1:3306/WinFramework",
  4.     pool_recycle=3600,
  5.     # echo=True,
  6. )
  7. # Sqlite 数据库引擎
  8. engine = create_engine("sqlite:///testdir//test.db")
  9. # PostgreSQL 数据库引擎
  10. engine = create_engine(
  11.     "postgresql+psycopg2://postgres:123456@localhost:5432/winframework",
  12.      # echo=True,
  13. )
  14. # SQLServer 数据库引擎
  15. engine = create_engine(
  16.      "mssql+pymssql://sa:123456@localhost/WinFramework?tds_version=7.0",
  17.      # echo=True,
  18. )
复制代码
我们可以根据数据库的CRUD操作方式,封装一些操作,如下所示。
  1. class CRUDOperations:
  2.     def __init__(self, model):
  3.         self.model = model
  4.     def create(self, db, obj_in):
  5.         db_obj = self.model(**obj_in.dict())
  6.         db.add(db_obj)
  7.         db.commit()
  8.         db.refresh(db_obj)
  9.         return db_obj
  10.     def get(self, db, id):
  11.         return db.query(self.model).filter(self.model.id == id).first()
  12.     def update(self, db, db_obj, obj_in):
  13.         obj_data = obj_in.dict(exclude_unset=True)
  14.         for field in obj_data:
  15.             setattr(db_obj, field, obj_data[field])
  16.         db.commit()
  17.         db.refresh(db_obj)
  18.         return db_obj
  19.     def remove(self, db, id):
  20.         obj = db.query(self.model).get(id)
  21.         db.delete(obj)
  22.         db.commit()
  23.         return obj
复制代码
使用时,构建数据访问类进行操作,如下测试代码所示。
  1. crud_user =<strong> CRUDOperations</strong>(<strong>User</strong>)
  2. # Create
  3. with get_db() as db:
  4.     user = crud_user.create(db, user_data)
  5. # Read
  6. with get_db() as db:
  7.     user = crud_user.get(db, user_id)
  8. # Update
  9. with get_db() as db:
  10.     user = crud_user.update(db, user, user_data)
  11. # Delete
  12. with get_db() as db:
  13.     crud_user.remove(db, user_id)
复制代码
 
3、SQLAlchemy 的异步操作封装

对于异步操作,SQLAlchemy 使用 AsyncSession 来管理异步事件。
起首,定义一个异步的 Session 和 Engine 对象:
  1. from sqlalchemy import create_engine, URL
  2. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  3. from typing import AsyncGenerator
  4. def create_engine_and_session(url: str | URL):
  5.     try:
  6.         # 数据库引擎
  7.         engine = create_async_engine(url, pool_pre_ping=True)
  8.     except Exception as e:
  9.         print("❌ 数据库链接失败 {}", e)
  10.         sys.exit()
  11.     else:
  12.         db_session = async_sessionmaker(
  13.             bind=engine, autoflush=False, expire_on_commit=False
  14.         )
  15.         return engine, db_session
  16. # 异步处理
  17. async_engine, async_session = create_engine_and_session(settings.DB_URI_ASYNC)
  18. async def get_db() -> AsyncGenerator[AsyncSession, None]:
  19.     """创建一个 SQLAlchemy 数据库会话-异步处理."""
  20.     async with async_session() as session:
  21.         yield session
复制代码
和同步的处理雷同,不过是换了一个对象来实现,并且函数使用了async await的组合来实现异步操作。
为了实现我的SQLSugar开发框架雷同的封装模式,我们参考SQLSugar开发框架中基类CRUD的定义方式来实现多种接口的封装处理。

 参照上面的实现方式,我们来看看Python中使用泛型的处理封装类的代码。
  1. ModelType = TypeVar("ModelType", bound=Base)
  2. PrimaryKeyType = TypeVar("PrimaryKeyType", int, str, float)  # 限定主键的类型
  3. PageDtoType = TypeVar("PageDtoType", bound=BaseModel)
  4. DtoType = TypeVar("DtoType", bound=BaseModel)
  5. class BaseCrud(Generic[ModelType, PrimaryKeyType, PageDtoType, DtoType]):
  6.     """
  7.     基础CRUD操作类
  8.     """
  9.     def __init__(self, model: Type[ModelType]):
  10.         """
  11.         数据库访问操作的基类对象(CRUD).
  12.         **Parameters**
  13.         * `model`: A SQLAlchemy model class
  14.         """
  15.         self.model = model
复制代码
这样,我们就可以通过泛型定义不同的类型,以及相关的处理类的信息。
该基类函数中,异步定义get_all的返回所有的数据接口如下所示。
  1.     async def get_all(
  2.         self, sorting: Optional[str], db: AsyncSession
  3.     ) -> List[ModelType] | None:
  4.         """根据ID字符串列表获取对象列表
  5.         :param sorting: 格式:name asc 或 name asc,age desc
  6.         """
  7.         query = select(self.model)
  8.         if sorting:
  9.             query = self.apply_sorting(query, sorting)
  10.         result = await db.execute(query)
  11.         items = result.scalars().all()
  12.         return items
复制代码
而对应获得单个对象的操作函数,如下所示。
  1.     async def get(self, id: PrimaryKeyType, db: AsyncSession) -> Optional[ModelType]:
  2.         """根据主键获取一个对象"""
  3.         query = select(self.model).filter(self.model.id == id)
  4.         result = await db.execute(query)
  5.         item = result.scalars().first()
  6.         return item
复制代码
而创建对象的操作函数,如下所示。
  1.     async def create(self, obj_in: DtoType, db: AsyncSession, **kwargs) -> bool:
  2.         """创建对象,使用 kwargs 时可以扩展创建对象时的字段。
  3.         :param obj_in: 对象输入数据
  4.         :param kwargs: 扩展字段,如格式: is_deleted=0, is_active=1
  5.         """
  6.         try:
  7.             if kwargs:
  8.                 instance = self.model(**obj_in.model_dump(), **kwargs)
  9.             else:
  10.                 instance = self.model(**obj_in.model_dump())  # type: ignore
  11.             db.add(instance)
  12.             await db.commit()
  13.             return True
  14.         except SQLAlchemyError as e:
  15.             print(e)
  16.             await db.rollback()
  17.             return False
复制代码
这个异步函数 create 旨在通过 SQLAlchemy 在数据库中创建一个对象,同时允许通过 kwargs 参数动态扩展创建对象时的字段。



通过上面的封装,我们可以测试调用的处理例子
  1. from crud.customer import customer as customer_crud
  2. from models.customer import Customer
  3. from pydantic import BaseModel
  4. from schemas.customer import CustomerDto, CustomerPageDto
  5. async def test_list_customer():
  6.     async with get_db() as db:
  7.         print("get_list")
  8.         totalCount, items = await customer_crud.get_list(
  9.             CustomerPageDto(skipCount=0, maxResultCount=10, name="test"),
  10.             db,
  11.         )
  12.         print(totalCount, items)
  13.         for customer in customers:
  14.             print(customer.name, customer.age)
  15.         print("get_by_name")
  16.         name = "test"
  17.         customer = await customer_crud.get_by_name(
  18.             name,
  19.             db,
  20.         )
  21.         if customer:
  22.             print(customer.name, customer.age)
  23.         else:
  24.             print(f"{name} not found")
  25.         print("soft delete")
  26.         result = await customer_crud.delete_byid(customer.id, db, is_deleted=1)
  27.         print("操作结果:", result)
  28.         print("soft delete_byids")
  29.         result = await customer_crud.delete_byids(
  30.             ["11122", "2C5F8672-2AA7-4B14-85AD-DF56F5BF7F1F"], db, is_deleted=1
  31.         )
  32.         print(f"Soft delete successful: {result}")
  33.         print("update_by_column")
  34.         result = await customer_crud.update_by_column(
  35.             "id", customer.id, {"age": 30}, db
  36.         )
  37.         print("操作结果:", result)
  38.         await db.close()
复制代码
同步和异步处理的差异:
通过封装数据库操作,可以让代码更具复用性和可维护性,支持不同类型的操作场景。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4