SqlAlchemy使用教程(七) 异步访问数据库

打印 上一主题 下一主题

主题 958|帖子 958|积分 2874


   

  • SqlAlchemy使用教程(一) 原理与情况搭建
  • SqlAlchemy使用教程(二) 入门示例及编程步骤
  • SqlAlchemy使用教程(三) CoreAPI访问与操作数据库详解
  • SqlAlchemy使用教程(四) MetaData 与 SQL Express Language 的使用
  • SqlAlchemy使用教程(五) ORM API 编程入门
  • SqlAlchemy使用教程(六) – ORM 表间关系的界说与CRUD操作
  注:本章要求熟悉Python异步编程的基础知识
1、SqlAlchemy 异步编程基础

1.1 异步访问数据的优点



  • 当数据库访问较频仍时,异步编程通常可以得到2-5倍性能提升结果。
  • 可以配合FastAPI 等异步框架,用异步访问使用SqlAlchemy,充分发挥异步框架的优势 。
SqlAlchmy 1.4 提供了Core层的异步接口, 2.0提供了 异步ORM接口
1.2 编程情况准备

(1) 安装异步依赖库
sqlalchemy 的异步接口基于 greenlet,在setuptools配置中为可选安装,
安装异步
pip install sqlalchemy[asyncio]
或者自已在setup.py 中查找greenlet版本号,手工
pip install greenlet==xx.yy.zz
(2)安装数据库的异步驱动。
pip install aiosqlite
以下是常见数据库的异步驱动库
sqlite3 :


  • aiosqlite
mysql:


  • aiomysql。
  • asyncmy: 这是1个支持 MySQL/MariaDB 的高性能异步库
PostgreSQL:


  • aiopg,
  • asyncpg:
  • asyncpgsa: 是asyncpg库的封装,实用于Sqlalchemy.
2、Core Async API

Core API层的异步编程步骤


  • 起首通过 create_async_engine() 创建1个异步AsyncEngine对象,
  • AsyncEngine.connect() 天生AsyncConnection对象,用start()方法启动. 或者通过上下文使用AsyncEngine.begin()创建的AsynConection对象。
  • 通过AsyncConnection对象的execute()方法实行SQL表达式. (SQL表达式可以参考本教程第3章先容)。
  • Async还提供了AsyncConnection.run_sync() 用于实行一些内部的同步方法,如 MetaData.create_all()。
通常须要AsyncConnection对象传入协程使命函数,每个协程必须使用不同的AsyncConnection 对象。
下面我们通过实现来查看1个完备过程
  1. import asyncio
  2. from sqlalchemy import Column, MetaData, select, String, Table
  3. from sqlalchemy.ext.asyncio import create_async_engine
  4. meta = MetaData()
  5. Person = Table(
  6.     'person',
  7.     meta,
  8.     Column('name', String(50), primary_key=True),
  9.     Column('profile', String(50), nullable=True),
  10. )
  11. async def insert_data(conn, data):
  12.         """ Insert one row """
  13.     if not isinstance(data, dict):
  14.         return False
  15.     try:
  16.         await conn.start()
  17.         await conn.execute(Person.insert(), data)
  18.         await conn.commit()
  19.     except Exception as e:
  20.         print("Error while inserting data to db : ", e)
  21.         await conn.rollback()
  22.         return False
  23.     finally:
  24.         await conn.close()
  25.     return True
  26. async def get_by_name(conn, name):
  27.         """Query data by name"""
  28.     query = select(Person).where(Person.c.name == name)
  29.     result = await conn.execute(query)
  30.     return result.first()
  31. async def main():
  32.         """ main function for async coroutines """
  33.     engine = create_async_engine("sqlite+aiosqlite:///:memory:")
  34.     # 执行DDL语句创建表
  35.     async with engine.begin() as conn:
  36.         await conn.run_sync(meta.create_all)
  37.     # 创建两个异步任务,分别插入两条数据
  38.     data = [
  39.         {"name": "Zhang Fei", 'profile': 'some profile 1'},
  40.         {"name": "Li Dian", 'profile': 'some profile 2'}
  41.     ]
  42.     task_1 = asyncio.create_task(insert_data(engine.connect(), data[0]))
  43.     task_2 = asyncio.create_task(insert_data(engine.connect(), data[1]))
  44.     result = await asyncio.gather(task_1, task_2)
  45.     print(result)
  46.     # 查询数据
  47.     async with engine.connect() as conn:
  48.         res = await get_by_name(conn, "Zhang Fei")
  49.         print(res)
  50.     await engine.dispose()
  51. asyncio.run(main())
复制代码
异步流式查询
使用AsyncConnection.stream() 实行SQL, 返回AsyncResult对象。
  1. async with engine.connect() as conn:
  2.     async_result = await conn.stream(select(t1))
  3.     async for row in async_result:
  4.         print("row: %s" % (row,))
复制代码
3、异步ORM 编程API

3.1 异步ORM API先容

ORM的异步接口紧张由AsyncSession 类来提供。
AsyncSession对象由async_sessionmaker() 工厂方法来创建。
注意:1个AsyncSession 实例只能用在1个coroutine 内。 每个协程要使用不同的AsyncSession对象。
  1. async_session = async_sessionmaker(async_engine, expire_on_commit=False)
复制代码
手动关闭AsyncSession对象,
AsyncSession.close()
异步实行SQL操作
AsyncSession.execute()
AsyncSession.scalars()
对于具有relation 关系的表的操作,同步模式下存在由lazy load 带来的Implicit IO,异步模式下不支持lazy load.
3.2 完备示例

  1. import asyncio
  2. from typing import List
  3. from sqlalchemy import ForeignKey, select, String, Integer
  4. from sqlalchemy.ext.asyncio import (create_async_engine,
  5.                                     async_sessionmaker,
  6.                                     AsyncSession,
  7.                                     AsyncAttrs
  8.                                     )
  9. from sqlalchemy.orm import (DeclarativeBase,
  10.                             Mapped,
  11.                             mapped_column,
  12.                             relationship,
  13.                             selectinload)
  14. class Base(DeclarativeBase):
  15.     pass
  16. class User(Base):
  17.     __tablename__ = "user"
  18.     id: Mapped[int] = mapped_column(primary_key=True)
  19.     name: Mapped[str] = mapped_column(String(30))
  20.     age: Mapped[int] = mapped_column(Integer())
  21.     company_id: Mapped[int] = mapped_column(ForeignKey("company.id"))
  22.     company: Mapped["Company"] = relationship(back_populates="users")
  23. class Company(Base):
  24.     __tablename__ = "company"
  25.     id: Mapped[int] = mapped_column(primary_key=True)
  26.     name: Mapped[str] = mapped_column(String(30))
  27.     users: Mapped[List[User]] = relationship()
  28. async def insert_data(async_session: async_sessionmaker[AsyncSession]) -> None:
  29.     async with async_session() as session:
  30.         async with session.begin():
  31.             session.add_all([
  32.                 Company(name="Baidu", users=[]),
  33.                 Company(name="Alibaba", users=[]),
  34.                 User(name='Tom', age=21, company_id=1),
  35.                 User(name='Jerry', age=22, company_id=2),
  36.                 User(name='Jack', age=23, company_id=1),
  37.             ])
  38. async def main() -> None:
  39.     engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
  40.     async_session = async_sessionmaker(engine, expire_on_commit=False)
  41.     # 创建表
  42.     async with engine.begin() as conn:
  43.         await conn.run_sync(Base.metadata.create_all)
  44.     # 使用异步协程插入数据
  45.     await asyncio.gather(insert_data(async_session))
  46.     # await insert_data(async_session)  # 或者直接执行
  47.     # 查询User表数据,联合查询Company表数据
  48.     async with async_session() as session:
  49.         stmt = select(User, Company).join(User.company).order_by(User.name)
  50.         result = await session.execute(stmt)
  51.         for row in result.scalars():
  52.             print(row.id, row.name, row.age, row.company.name)
  53.     # 查询 Company 数据,反向查询User表数据
  54.     print("查询 Company 数据,反向查询User表数据")
  55.     async with async_session() as session:
  56.         stmt = select(Company).options(selectinload(Company.users))
  57.         result = await session.execute(stmt)
  58.         for row in result.scalars():
  59.             print(row.id, row.name)
  60.             for user in row.users:
  61.                 print('\t', user.id, user.name, user.age)
  62.     await engine.dispose()
  63. asyncio.run(main())
复制代码
实行上述代码,输出为:
  1. output
  2. 3 Jack 23 Baidu
  3. 2 Jerry 22 Alibaba
  4. 1 Tom 21 Baidu
  5. 查询 Company 数据,反向查询User表数据
  6. 1 Baidu
  7.          1 Tom 21
  8.          3 Jack 23
  9. 2 Alibaba
  10.          2 Jerry 22
复制代码
阐明:


  • User表界说有外键字段,与company是1对1对多关系,查询User表时,假如希望同时得到 Company表数据,应使用连合查询。
  • 查询Company表中,反向查询 User表数据,须处理懒加载问题,参考下一节.
3.3 关系查询中懒加载问题

假如A, B之间存在1对多关系, B中的外键指向A,SqlAlchemy在查询 A表的数据后,假如设置了反向查询字段,默认 SqlAlchemy会对关联表隐式地发送查询请求。由于这个I/O是同步的,因此 AsyncSession是不支持此操作。会Block此操作。
有两种解决办法:
方法一:引入AsyncAttrs Mixin混入类
  1. from sqlalchemy.ext.asyncio import AsyncAttrs
  2. class Base(AsyncAttrs, DeclarativeBase):   # Base引入AsyncAttrs
  3.     pass
  4. # 表A与B之间存在外键关系。
  5. class A(Base):
  6.     __tablename__ = "a"
  7.     # ... rest of mapping ...
  8.     bs: Mapped[List[B]] = relationship()  # 反射查询关系
  9. class B(Base):
  10.     __tablename__ = "b"
  11.     a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
  12.     # ... rest of mapping ...
复制代码
A的 bs属性查询时是lazy load,将被做为 AsyncAttrs来处理,阻止其发磅IO
连合查询时,要手工用异步方式实行查询操作
  1. a1 = (await session.scalars(select(A))).one()
  2. for b1 in await a1.awaitable_attrs.bs:
  3.     print(b1)
复制代码
方式2: 用异步eager load加载关系表数据
假如不使用AsyncAttrs 方式,可用 eager load 来解决:
最常用eager load方法为selectinload() ,其与select()形成链式调用
  1. stmt = select(A).options(selectinload(A.bs))
  2. result = wait session.scalars(stmt)
  3. for r in result:
  4.     print(r.id, r.data, r.bs)
复制代码
注意:


  • 当A构建新对象时,对bs总是赋个空值, 如  A(bs=[], data="a2")
3.4 运行同步方法

如同上一节提到,Core API 的 AsyncConnection对象提供了run_sync()方法运行同步方法,同样ORM API中,AsyncSession对象也提供了run_sync() 实行同步方法。
  1. await session.run_sync(fetch_and_update_objects)    # fetch_and_update_objects() 是1个同步方法
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户国营

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表