ToB企服应用市场:ToB评测及商务社交产业平台

标题: 在Python中使用sqlalchemy来操作数据库的几个小总结 [打印本页]

作者: 十念    时间: 2024-8-1 18:21
标题: 在Python中使用sqlalchemy来操作数据库的几个小总结
 在探索使用 FastAPI, SQLAlchemy, Pydantic,Redis, JWT 构建的项目标时候,其中数据库访问采用SQLAlchemy,并采用异步方式。数据库操作和控制器操作,采用基类继承的方式淘汰重复代码,进步代码复用性。在这个过程中设计接口和测试的时候,对一些题目举行跟踪解决,并记载供参考。
1、SQLAlchemy事务处置惩罚

在异步环境中,批量更新操作需要使用异步方法来执行查询和提交事务。
  1. async def update_range(self, obj_in_list: List[DtoType], db: AsyncSession) -> bool:
  2.     """批量更新对象"""
  3.     try:
  4.         async with db.begin():  # 使用事务块确保批量操作的一致性
  5.             for obj_in in obj_in_list:
  6.                 # 查询对象
  7.                 query = select(self.model).filter(self.model.id == obj_in.id)
  8.                 result = await db.execute(query)
  9.                 db_obj = result.scalars().first()
  10.                
  11.                 if db_obj:
  12.                     # 获取更新数据
  13.                     update_data = obj_in.model_dump(skip_defaults=True)
  14.                     
  15.                     # 更新对象字段
  16.                     for field, value in update_data.items():
  17.                         setattr(db_obj, field, value)
  18.         
  19.         return True
  20.     except SQLAlchemyError as e:
  21.         print(e)
  22.         # 异常处理时,事务会自动回滚
  23.         return False
复制代码
在这个改进后的代码中:
这种方式确保了在异步环境中批量更新操作的正确性和一致性。
在使用 async with db.begin() 举行事务管理时,事务会自动提交。假如在事务块内执行的所有操作都成功,事务会在退出时自动提交;假如出现异常,事务会自动回滚。
因此,手动调用 await db.commit() 是不须要的,由于事务块会处置惩罚这些操作。假如你不使用事务块,并盼望手动控制事务的提交,可以如下修改:
  1. async def update_range(self, obj_in_list: List[DtoType], db: AsyncSession) -> bool:
  2.     """批量更新对象"""
  3.     try:
  4.         for obj_in in obj_in_list:
  5.             query = select(self.model).filter(self.model.id == obj_in.id)
  6.             result = await db.execute(query)
  7.             db_obj = result.scalars().first()
  8.             
  9.             if db_obj:
  10.                 update_data = obj_in.model_dump(skip_defaults=True)
  11.                
  12.                 for field, value in update_data.items():
  13.                     setattr(db_obj, field, value)
  14.         
  15.         await db.commit()  # 手动提交事务
  16.         return True
  17.     except SQLAlchemyError as e:
  18.         print(e)
  19.         await db.rollback()  # 确保在出错时回滚事务
  20.         return False
复制代码
在这个手动提交事务的例子中:
根据需求选择合适的方法举行事务管理。事务块方式通常是更安全和简便的选择。
 在异步环境中,create_update 方法需要对数据库举行异步查询、更新或创建操作。
  1. async def create_update(
  2.     self, obj_in: DtoType, id: PrimaryKeyType, db: AsyncSession
  3. ) -> bool:
  4.     """创建或更新对象"""
  5.     try:
  6.         # 查询对象
  7.         query = select(self.model).filter(self.model.id == id)
  8.         result = await db.execute(query)
  9.         db_obj = result.scalars().first()
  10.         
  11.         if db_obj:
  12.             # 更新对象
  13.             return await self.update(obj_in, db)
  14.         else:
  15.             # 创建对象
  16.             return await self.create(obj_in, db)
  17.     except SQLAlchemyError as e:
  18.         print(e)
  19.         # 确保在出错时回滚事务
  20.         await db.rollback()
  21.         return False
复制代码
在这个代码中:
在异步环境中,批量插入对象通常需要使用异步方法来执行数据库操作。由于 bulk_insert_mappings 在 SQLAlchemy 的异步版本中可能不直接支持,你可以使用 add_all 方法来批量添加对象。
  1. async def save_import(self, data: List[DtoType], db: AsyncSession) -> bool:
  2.     """批量导入对象"""
  3.     try:
  4.         # 将 DTO 转换为模型实例
  5.         db_objs = [self.model(**obj_in.model_dump()) for obj_in in data]
  6.         
  7.         # 批量添加对象
  8.         db.add_all(db_objs)
  9.         
  10.         # 提交事务
  11.         await db.commit()
  12.         
  13.         return True
  14.     except SQLAlchemyError as e:
  15.         print(e)
  16.         await db.rollback()  # 确保在出错时回滚事务
  17.         return False
复制代码
代码说明:
这种方式确保了在异步环境中正确地举行批量导入操作,并处置惩罚可能出现的异常。
 
2、在 SQLAlchemy 中select(...).where(...) 和 select(...).filter(...)的差异

在 SQLAlchemy 中,select(...).where(...) 和 select(...).filter(...) 都用于构造查询条件,但它们有一些细微的差异和适用场景。
1. where(...)

2. filter(...)

主要差异
使用 where 的示例(SQLAlchemy Core):
  1. from sqlalchemy.future import select
  2. from sqlalchemy.ext.asyncio import AsyncSession
  3. async def get(self, id: int, db: AsyncSession) -> Optional[ModelType]:
  4.     query = select(self.model).where(self.model.id == id)
  5.     result = await db.execute(query)
  6.     return result.scalars().first()
复制代码
使用 filter 的示例(SQLAlchemy ORM):
  1. from sqlalchemy.orm import sessionmaker
  2. async def get(self, id: int, db: AsyncSession) -> Optional[ModelType]:
  3.     query = select(self.model).filter(self.model.id == id)
  4.     result = await db.execute(query)
  5.     return result.scalars().first()
复制代码
总结

在 SQLAlchemy 2.0 及更高版本中,select 的 where 和 filter 的用法变得越来越一致,你可以根据自己的风俗和需求选择其中一种。在实际开发中,选择哪一种方法通常取决于你的代码上下文和个人偏好。
 
3、model_dump(exclude_unset=True) 和model_dump(skip_defaults=True)有什么差异

model_dump(exclude_unset=True) 和 model_dump(skip_defaults=True) 是用于处置惩罚模型实例的序列化方法,它们的用途和行为略有不同。这两个方法通常用于将模型实例转换为字典,以便举行进一步的处置惩罚或传输。
model_dump(exclude_unset=True)

exclude_unset=True 是一个选项,通常用于序列化方法中,表示在转换模型实例为字典时,排除那些未设置的字段。
  1. # 假设模型有字段 'name' 和 'age',且 'age' 使用了默认值
  2. model_instance = MyModel(name='Alice', age=25)
  3. # 如果 age 的默认值是 0, exclude_unset=True 将只包含 'name'
  4. serialized_data = model_instance.model_dump(exclude_unset=True)
复制代码
model_dump(skip_defaults=True)

skip_defaults=True 是另一个选项,表示在转换模型实例为字典时,排除那些使用了默认值的字段。
  1. # 假设模型有字段 'name' 和 'age',且 'age' 使用了默认值
  2. model_instance = MyModel(name='Alice', age=25)
  3. # 如果 age 的默认值是 0, skip_defaults=True 将只包含 'name'
  4. serialized_data = model_instance.model_dump(skip_defaults=True)
复制代码
主要区别

 
4、使用**kwargs 参数,在接口中实现数据软删除的处置惩罚

例如我们在删除接口中,假如传递了 kwargs 参数,则举行软删除(更新记载),否则举行硬删除(删除记载)。
  1.     async def delete_byid(self, id: PrimaryKeyType, db: AsyncSession, **kwargs) -> bool:
  2.         """根据主键删除一个对象
  3.         :param kwargs: for soft deletion only
  4.         """
  5.         if not kwargs:
  6.             result = await db.execute(sa_delete(self.model).where(self.model.id == id))
  7.         else:
  8.             result = await db.execute(
  9.                 sa_update(self.model).where(self.model.id == id)<strong>.values(**</strong><strong>kwargs)</strong>
  10.             )
  11.         await db.commit()
  12.         return result.rowcount > 0
复制代码
实例代码如下所示。
  1. # 示例模型
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy import Column, Integer, String, Boolean
  4. Base = declarative_base()
  5. class Customer(Base):
  6.     __tablename__ = 'customer'
  7.     id = Column(Integer, primary_key=True)
  8.     name = Column(String)
  9.     is_deleted = Column(Boolean, default=False)
  10. # 示例使用
  11. async def main():
  12.     async with AsyncSession(engine) as session:
  13.         controller = BaseController(Customer)
  14.         
  15.         # 硬删除
  16.         result = await controller.delete_byid(1, session)
  17.         print(f"Hard delete successful: {result}")
  18.         
  19.         # 软删除
  20.         result = await controller.delete_byid(2, session, is_deleted=True)
  21.         print(f"Soft delete successful: {result}")
  22. # 确保运行主程序
  23. import asyncio
  24. if __name__ == "__main__":
  25.     asyncio.run(main())
复制代码
留意事项
  1. # 示例硬删除调用
  2. await controller.delete_byid(1, session)
  3. # 示例软删除调用
  4. await controller.delete_byid(2, session, is_deleted=True)
复制代码
假如我们的is_deleted 字段是Int类型的,如下所示,那么处置惩罚有所不同
  1. class Customer(Base):
  2.     __tablename__ = "t_customer"
  3.     id = Column(String, primary_key=True, comment="主键")
  4.     name = Column(String, comment="姓名")
  5.     age = Column(Integer, comment="年龄")
  6.     creator = Column(String, comment="创建人")
  7.     createtime = Column(DateTime, comment="创建时间")
  8.     is_deleted = Column(Integer, comment="是否删除")
复制代码
操作代码
  1.         # 硬删除
  2.         result = await controller.delete_byid("1", session)
  3.         print(f"Hard delete successful: {result}")
  4.         
  5.         # 软删除
  6.         result = await controller.delete_byid("2", session, <strong>is_deleted=1</strong>)
  7.         print(f"Soft delete successful: {result}")
复制代码
留意事项
通过确保正确传递参数而且模型包含正确的字段,你应该能够正确执行软删除和硬删除操作。
 
5、Python处置惩罚接口的时候,Iterable 和List有什么差异

在 Python 中,Iterable 和 List 是两个不同的概念,它们有各自的特点和用途:
Iterable

Iterable 是一个更广泛的概念,指的是任何可以返回一个迭代器的对象。迭代器是一个实现了 __iter__() 方法的对象,能够逐个返回元素。险些所有的容器类型(如列表、元组、字典、集合等)都是可迭代的。要检查一个对象是否是可迭代的,可以使用 collections.abc.Iterable 来举行检查。
特点
  1. from collections.abc import Iterable
  2. print(isinstance([1, 2, 3], Iterable))  # True
  3. print(isinstance((1, 2, 3), Iterable))  # True
  4. print(isinstance({1, 2, 3}, Iterable))  # True
  5. print(isinstance({'a': 1}, Iterable))   # True
  6. print(isinstance((x for x in range(3)), Iterable))  # True
复制代码
List

List 是 Python 中的一种具体的容器类型,表示一个有序的元素集合,可以包含重复的元素。它是最常用的可变序列类型之一,支持索引访问、切片操作以及其他多种方法来操作列表中的元素。
特点
  1. my_list = [1, 2, 3]
  2. print(my_list)  # [1, 2, 3]
  3. my_list.append(4)  # [1, 2, 3, 4]
  4. my_list[0] = 10  # [10, 2, 3, 4]
复制代码
总结一下:
Iterable 是一个抽象概念,而 List 是一个具体的实现。你可以在 List 之上使用许多操作和方法来处置惩罚数据,而 Iterable 主要关注的是是否可以举行迭代。
因此吸收结合的处置惩罚,我们可以使用Iterable接口更加通用一些。
  1.     async def create_range(
  2.         self, obj_in_list: Iterable[DtoType], db: AsyncSession
  3.     ) -> bool:
  4.         """批量创建对象"""
  5.         try:
  6.             # 将 DTO 转换为模型实例
  7.             db_objs = [self.model(**obj_in.model_dump()) for obj_in in obj_in_list]
  8.             # 批量添加到数据库
  9.             db.add_all(db_objs)
  10.             await db.commit()
  11.             return True
  12.         except SQLAlchemyError as e:
  13.             print(e)
  14.             await db.rollback()  # 确保在出错时回滚事务
  15.             return False
复制代码
以上就是在Python中使用sqlalchemy来操作数据库的时候,对一些小题目标总结,供各人参考。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4