ToB企服应用市场:ToB评测及商务社交产业平台

标题: 在 SQLAlchemy 中对数据异步处理的时候,得到关联聚集的处理方式 [打印本页]

作者: 十念    时间: 2024-9-5 09:56
标题: 在 SQLAlchemy 中对数据异步处理的时候,得到关联聚集的处理方式
我们在界说SQLAlchemy对象模型的关系的时候,用到了relationship 来标识关系,此中 lazy 的参数有多种差别的加载策略,本篇随笔介绍它们之间的关系,以及在异步处理中的一些代码案例。
1、在 SQLAlchemy 中界说关系

在 SQLAlchemy 中,relationship() 函数用于界说表之间的关系(如 one-to-many、many-to-one、many-to-many 等)。它支持许多参数来控制如何加载和处理关联的数据。以下是一些常用的 relationship() 参数及其说明:
1. lazy

  在SQLAlchemy中,lazy是一个界说ORM关系如何加载的参数,主要用于控制关联关系(如one-to-many、many-to-one等)在访问时的加载方式。
1)lazy='select' (默认)
2) lazy='selectin'
3) lazy='joined'
4)lazy='immediate'
5)lazy='subquery'
6)lazy='dynamic'
 
2. backref

  1. class Parent(Base):
  2.     __tablename__ = 'parent'
  3.     id = Column(Integer, primary_key=True)
  4.     children = relationship("Child", backref="parent")
  5. class Child(Base):
  6.     __tablename__ = 'child'
  7.     id = Column(Integer, primary_key=True)
  8.     parent_id = Column(Integer, ForeignKey('parent.id'))
复制代码
3. back_populates

  1. class Parent(Base):
  2.     __tablename__ = 'parent'
  3.     id = Column(Integer, primary_key=True)
  4.     children = relationship("Child", back_populates="parent")
  5. class Child(Base):
  6.     __tablename__ = 'child'
  7.     id = Column(Integer, primary_key=True)
  8.     parent_id = Column(Integer, ForeignKey('parent.id'))
  9.     parent = relationship("Parent", back_populates="children")
复制代码
4. cascade

  1. children = relationship("Child", cascade="all, delete-orphan")
复制代码
5. uselist

  1. parent = relationship("Parent", uselist=False)  # one-to-one 关系
复制代码
6. order_by

  1. children = relationship("Child", order_by="Child.name")
复制代码
7. foreign_keys

  1. parent = relationship("Parent", foreign_keys="[Child.parent_id]")
复制代码
8. primaryjoin

  1. parent = relationship("Parent", primaryjoin="Parent.id == Child.parent_id")
复制代码
9. secondary

  1. class Association(Base):
  2.     __tablename__ = 'association'
  3.     parent_id = Column(Integer, ForeignKey('parent.id'))
  4.     child_id = Column(Integer, ForeignKey('child.id'))
  5. children = relationship("Child", secondary="association")
复制代码
10. secondaryjoin

  1. children = relationship("Child", secondary="association",
  2.                         secondaryjoin="Child.id == Association.child_id")
复制代码
11. viewonly

  1. children = relationship("Child", viewonly=True)
复制代码
12. passive_deletes

  1. children = relationship("Child", passive_deletes=True)
复制代码
这些参数可以根据具体的业务需求和场景进行调整,以优化查询和数据管理策略。
 
 2、用户角色表的关系分析

在现实业务中,机构和用户是多对多的关系的,我们以机构表界说来进行分析它们的关系信息。
如机构表的模型界说大抵如下。
  1. class Ou(Base):
  2.     """机构(部门)信息-表模型"""
  3.     __tablename__ = "t_acl_ou"
  4.     id = Column(Integer, primary_key=True, comment="主键", autoincrement=True)
  5.     pid = Column(Integer, ForeignKey("t_acl_ou.id"), comment="父级机构ID", default="-1")
  6.     handno = Column(String, comment="机构编码")
  7.     name = Column(String, comment="机构名称")
  8.     # 定义 parent 关系
  9.     parent = relationship(
  10.         "Ou", remote_side=[id], back_populates="children", lazy="immediate"
  11.     )
  12.     # 定义 children 关系
  13.     children = relationship("Ou", back_populates="parent", lazy="immediate")
  14.     # 定义 users 关系
  15.     users = relationship(
  16.         "User", secondary="t_acl_ou_user", back_populates="ous", lazy="select"
  17.     )
复制代码
我们可以看到此中加载的多对多关系是接纳lazy=select的方式的。
当你使用 await session.get(Ou, ou_id) 来获取一个 Ou 对象后,访问其关系属性(如 ou.users)时,可能会遇到异步相关的问题。原因是,SQLAlchemy 的异步会话需要使用 selectinload 或其他异步加载选项来确保在异步环境中正确地加载关联数据。
在默认的 lazy='select' 关系中,加载关系对象会触发一个同步查询,而这与异步会话不兼容,导致错误。为相识决这个问题,你需要确保关系的加载是通过异步的方式进行的。
解决方法:

1. 使用 selectinload 进行预加载
在查询时,显式地通过 selectinload 来加载关联的 users 关系:
  1. from sqlalchemy.orm import selectinload
  2. ou = await session.get(Ou, ou_id, options=[selectinload(Ou.users)])
  3. # 现在你可以访问 ou.users,关系对象已经被异步加载
  4. print(ou.users)
复制代码
 
2. 使用 lazy='selectin' 或其他异步兼容的加载策略
你还可以在界说模型的关联关系时,将 lazy='selectin' 设置为默认的加载方式,这样当访问关联属性时,SQLAlchemy 会自动使用异步兼容的加载机制:
  1. class Ou(Base):
  2.     __tablename__ = 'ou'
  3.     id = Column(Integer, primary_key=True)
  4.     users = relationship("User", lazy='selectin')  # 使用 selectin 异步加载
  5. ou = await session.get(Ou, ou_id)
  6. print(ou.users)  # 关联对象可以正常异步访问
复制代码
总结:

因此,如果机构和用户的关系信息,我们可以通过selectload关系实现加载,也可以考虑使用中间表的关系进行获取,如下代码所示:获取指定用户的关联的机构列表.
  1.     async def get_ous_by_user(self, db: AsyncSession, user_id: str) -> list[int]:
  2.         """获取指定用户的关联的机构列表"""
  3.         # 方式一,子查询方式
  4.         stmt = select(User).options(selectinload(User.ous)).where(User.id == user_id)
  5.         result = await db.execute(stmt)
  6.         user = result.scalars().first()
  7.         ous = user.ous if user else []
  8.         # 方式二,关联表方式
  9.         # stmt = (
  10.         #     select(Ou)
  11.         #     .join(user_ou, User.id == user_ou.c.user_id)
  12.         #     .where(user_ou.c.user_id == user_id)
  13.         # )
  14.         # result = await db.execute(stmt)
  15.         # ous = result.scalars().all()
  16.         ouids = [ou.id for ou in ous]
  17.         return ouids
复制代码
上面两种方式是等效的,一个是通过orm关系进行获取关系聚集,一个是通过中间表的关系检索主表数据聚集。
通过中间表,我们也可以很方便的添加角色的关系,如下面是为角色添加用户,也就是在中间表进行处理即可。
  1.     async def add_user(self, db: AsyncSession, role_id: int, user_id: int) -> bool:
  2.         """添加角色-用户关联"""
  3.         stmt = select(user_role).where(
  4.             and_(
  5.                 user_role.c.role_id == role_id,
  6.                 user_role.c.user_id == user_id,
  7.             )
  8.         )
  9.         if not (await db.execute(stmt)).scalars().first():
  10.             await db.execute(
  11.                 user_role.insert().values(role_id=role_id, user_id=user_id)
  12.             )
  13.             await db.commit()
  14.             return True
  15.         return False
复制代码
当然。如果我们不用这种中间表的处理方式,也是可以使用常规多对多关系进行添加处理,不过需要对数据进行多一些检索,也许性能会差一些。
  1.     async def add_user(self, db: AsyncSession, ou_id: int, user_id: int) -> bool:
  2.         """给机构添加用户"""
  3.         # 可以使用下面方式,也可以使用中间表方式处理
  4.         # 先判断用户是否存在
  5.         user = await db.get(User, user_id)
  6.         if not user:
  7.             return False
  8.         # 再判断机构是否存在
  9.         result = await db.execute(
  10.             select(Ou).options(selectinload(Ou.users)).filter_by(id=ou_id)
  11.         )
  12.         # await db.get(Ou, ou_id) #这种方式不能获得users,因为配置为selectin
  13.         # await db.get(Ou, ou_id, options=[selectinload(Ou.users)])  # 这种方式可以获得users
  14.         ou = result.scalars().first()
  15.         if not ou:
  16.             return False
  17.         # 再判断用户是否已经存在于机构中
  18.         if user in ou.users:
  19.             return False
  20.         # 加入机构
  21.         ou.users.append(user)
  22.         await db.commit()
  23.         return True
复制代码
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4