- SqlAlchemy使用教程(一) 原理与情况搭建
- SqlAlchemy使用教程(二) 入门示例及编程步骤
- SqlAlchemy使用教程(三) CoreAPI访问与操作数据库详解
- SqlAlchemy使用教程(四) MetaData 与 SQL Express Language 的使用
- SqlAlchemy使用教程(五) ORM API 编程入门
- SqlAlchemy使用教程(六) – ORM 表间关系的界说与CRUD操作
注:本章要求熟悉Python异步编程的基础知识
1、SqlAlchemy 异步编程基础
1.1 异步访问数据的优点
- 当数据库访问较频仍时,异步编程通常可以得到2-5倍性能提升结果。
- 可以配合FastAPI 等异步框架,用异步访问使用SqlAlchemy,充分发挥异步框架的优势 。
SqlAlchmy 1.4 提供了Core层的异步接口, 2.0提供了 异步ORM接口
1.2 编程情况准备
(1) 安装异步依赖库
sqlalchemy 的异步接口基于 greenlet,在setuptools配置中为可选安装,
安装异步
pip install sqlalchemy[asyncio]
或者自已在setup.py 中查找greenlet版本号,手工
pip install greenlet==xx.yy.zz
(2)安装数据库的异步驱动。
pip install aiosqlite
以下是常见数据库的异步驱动库
sqlite3 :
mysql:
- aiomysql。
- asyncmy: 这是1个支持 MySQL/MariaDB 的高性能异步库
PostgreSQL:
- aiopg,
- asyncpg:
- asyncpgsa: 是asyncpg库的封装,实用于Sqlalchemy.
2、Core Async API
Core API层的异步编程步骤
- 起首通过 create_async_engine() 创建1个异步AsyncEngine对象,
- AsyncEngine.connect() 天生AsyncConnection对象,用start()方法启动. 或者通过上下文使用AsyncEngine.begin()创建的AsynConection对象。
- 通过AsyncConnection对象的execute()方法实行SQL表达式. (SQL表达式可以参考本教程第3章先容)。
- Async还提供了AsyncConnection.run_sync() 用于实行一些内部的同步方法,如 MetaData.create_all()。
通常须要AsyncConnection对象传入协程使命函数,每个协程必须使用不同的AsyncConnection 对象。
下面我们通过实现来查看1个完备过程
- import asyncio
- from sqlalchemy import Column, MetaData, select, String, Table
- from sqlalchemy.ext.asyncio import create_async_engine
- meta = MetaData()
- Person = Table(
- 'person',
- meta,
- Column('name', String(50), primary_key=True),
- Column('profile', String(50), nullable=True),
- )
- async def insert_data(conn, data):
- """ Insert one row """
- if not isinstance(data, dict):
- return False
- try:
- await conn.start()
- await conn.execute(Person.insert(), data)
- await conn.commit()
- except Exception as e:
- print("Error while inserting data to db : ", e)
- await conn.rollback()
- return False
- finally:
- await conn.close()
- return True
- async def get_by_name(conn, name):
- """Query data by name"""
- query = select(Person).where(Person.c.name == name)
- result = await conn.execute(query)
- return result.first()
- async def main():
- """ main function for async coroutines """
- engine = create_async_engine("sqlite+aiosqlite:///:memory:")
- # 执行DDL语句创建表
- async with engine.begin() as conn:
- await conn.run_sync(meta.create_all)
- # 创建两个异步任务,分别插入两条数据
- data = [
- {"name": "Zhang Fei", 'profile': 'some profile 1'},
- {"name": "Li Dian", 'profile': 'some profile 2'}
- ]
- task_1 = asyncio.create_task(insert_data(engine.connect(), data[0]))
- task_2 = asyncio.create_task(insert_data(engine.connect(), data[1]))
- result = await asyncio.gather(task_1, task_2)
- print(result)
- # 查询数据
- async with engine.connect() as conn:
- res = await get_by_name(conn, "Zhang Fei")
- print(res)
- await engine.dispose()
- asyncio.run(main())
复制代码 异步流式查询
使用AsyncConnection.stream() 实行SQL, 返回AsyncResult对象。
- async with engine.connect() as conn:
- async_result = await conn.stream(select(t1))
- async for row in async_result:
- print("row: %s" % (row,))
复制代码 3、异步ORM 编程API
3.1 异步ORM API先容
ORM的异步接口紧张由AsyncSession 类来提供。
AsyncSession对象由async_sessionmaker() 工厂方法来创建。
注意:1个AsyncSession 实例只能用在1个coroutine 内。 每个协程要使用不同的AsyncSession对象。
- async_session = async_sessionmaker(async_engine, expire_on_commit=False)
复制代码 手动关闭AsyncSession对象,
AsyncSession.close()
异步实行SQL操作
AsyncSession.execute()
AsyncSession.scalars()
对于具有relation 关系的表的操作,同步模式下存在由lazy load 带来的Implicit IO,异步模式下不支持lazy load.
3.2 完备示例
- import asyncio
- from typing import List
- from sqlalchemy import ForeignKey, select, String, Integer
- from sqlalchemy.ext.asyncio import (create_async_engine,
- async_sessionmaker,
- AsyncSession,
- AsyncAttrs
- )
- from sqlalchemy.orm import (DeclarativeBase,
- Mapped,
- mapped_column,
- relationship,
- selectinload)
- class Base(DeclarativeBase):
- pass
- class User(Base):
- __tablename__ = "user"
- id: Mapped[int] = mapped_column(primary_key=True)
- name: Mapped[str] = mapped_column(String(30))
- age: Mapped[int] = mapped_column(Integer())
- company_id: Mapped[int] = mapped_column(ForeignKey("company.id"))
- company: Mapped["Company"] = relationship(back_populates="users")
- class Company(Base):
- __tablename__ = "company"
- id: Mapped[int] = mapped_column(primary_key=True)
- name: Mapped[str] = mapped_column(String(30))
- users: Mapped[List[User]] = relationship()
- async def insert_data(async_session: async_sessionmaker[AsyncSession]) -> None:
- async with async_session() as session:
- async with session.begin():
- session.add_all([
- Company(name="Baidu", users=[]),
- Company(name="Alibaba", users=[]),
- User(name='Tom', age=21, company_id=1),
- User(name='Jerry', age=22, company_id=2),
- User(name='Jack', age=23, company_id=1),
- ])
- async def main() -> None:
- engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
- async_session = async_sessionmaker(engine, expire_on_commit=False)
- # 创建表
- async with engine.begin() as conn:
- await conn.run_sync(Base.metadata.create_all)
- # 使用异步协程插入数据
- await asyncio.gather(insert_data(async_session))
- # await insert_data(async_session) # 或者直接执行
- # 查询User表数据,联合查询Company表数据
- async with async_session() as session:
- stmt = select(User, Company).join(User.company).order_by(User.name)
- result = await session.execute(stmt)
- for row in result.scalars():
- print(row.id, row.name, row.age, row.company.name)
- # 查询 Company 数据,反向查询User表数据
- print("查询 Company 数据,反向查询User表数据")
- async with async_session() as session:
- stmt = select(Company).options(selectinload(Company.users))
- result = await session.execute(stmt)
- for row in result.scalars():
- print(row.id, row.name)
- for user in row.users:
- print('\t', user.id, user.name, user.age)
- await engine.dispose()
- asyncio.run(main())
复制代码 实行上述代码,输出为:
- output
- 3 Jack 23 Baidu
- 2 Jerry 22 Alibaba
- 1 Tom 21 Baidu
- 查询 Company 数据,反向查询User表数据
- 1 Baidu
- 1 Tom 21
- 3 Jack 23
- 2 Alibaba
- 2 Jerry 22
复制代码 阐明:
- User表界说有外键字段,与company是1对1对多关系,查询User表时,假如希望同时得到 Company表数据,应使用连合查询。
- 查询Company表中,反向查询 User表数据,须处理懒加载问题,参考下一节.
3.3 关系查询中懒加载问题
假如A, B之间存在1对多关系, B中的外键指向A,SqlAlchemy在查询 A表的数据后,假如设置了反向查询字段,默认 SqlAlchemy会对关联表隐式地发送查询请求。由于这个I/O是同步的,因此 AsyncSession是不支持此操作。会Block此操作。
有两种解决办法:
方法一:引入AsyncAttrs Mixin混入类
- from sqlalchemy.ext.asyncio import AsyncAttrs
- class Base(AsyncAttrs, DeclarativeBase): # Base引入AsyncAttrs
- pass
- # 表A与B之间存在外键关系。
- class A(Base):
- __tablename__ = "a"
- # ... rest of mapping ...
- bs: Mapped[List[B]] = relationship() # 反射查询关系
- class B(Base):
- __tablename__ = "b"
- a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
- # ... rest of mapping ...
复制代码 A的 bs属性查询时是lazy load,将被做为 AsyncAttrs来处理,阻止其发磅IO
连合查询时,要手工用异步方式实行查询操作
- a1 = (await session.scalars(select(A))).one()
- for b1 in await a1.awaitable_attrs.bs:
- print(b1)
复制代码 方式2: 用异步eager load加载关系表数据
假如不使用AsyncAttrs 方式,可用 eager load 来解决:
最常用eager load方法为selectinload() ,其与select()形成链式调用
- stmt = select(A).options(selectinload(A.bs))
- result = wait session.scalars(stmt)
- for r in result:
- print(r.id, r.data, r.bs)
复制代码 注意:
- 当A构建新对象时,对bs总是赋个空值, 如 A(bs=[], data="a2")
3.4 运行同步方法
如同上一节提到,Core API 的 AsyncConnection对象提供了run_sync()方法运行同步方法,同样ORM API中,AsyncSession对象也提供了run_sync() 实行同步方法。
- await session.run_sync(fetch_and_update_objects) # fetch_and_update_objects() 是1个同步方法
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |