异步IO与Tortoise-ORM的数据库

打印 上一主题 下一主题

主题 1786|帖子 1786|积分 5358

title: 异步IO与Tortoise-ORM的数据库
date: 2025/04/29 13:21:47
updated: 2025/04/29 13:21:47
author: cmdragon
excerpt:
异步IO与同步IO的核心区别在于阻塞与非阻塞模式。Tortoise-ORM通过协议层、连接池层和ORM层实现异步数据库操纵,支持高效的并发处理。用户管理系统搭建中,Tortoise-ORM与FastAPI结合,实现了用户创建和查询功能,并通过Pydantic进行数据校验。异步ORM适用于高并发场景,参数化查询可防止SQL注入。最佳实践包括连接池设置、查询优化和事件管理,确保系统性能和数据一致性。
categories:

  • 后端开发
  • FastAPI
tags:

  • 异步IO
  • Tortoise-ORM
  • 数据库操纵
  • FastAPI
  • 异步编程
  • 连接池
  • 事件管理
扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交换与发展
探索数千个预构建的 AI 应用,开启你的下一个伟大创意https://tools.cmdragon.cn/
第一章:异步IO与Tortoise-ORM原理剖析

1.1 同步与异步的本质区别

想象你在快餐店点餐:

  • 同步模式:收银员接单后站在炸薯条机前等待,直到薯条炸好才接待下一位顾客
  • 异步模式:收银员接单后立即将订单交给后厨,转身接待下一位顾客,后厨预备好餐点会主动通知收银员
盘算机范畴的异步IO正是采用这种"非阻塞"模式:
  1. # 同步操作(线程阻塞)
  2. def sync_query():
  3.     result = db.execute("SELECT * FROM users")  # 线程在此等待
  4.     process(result)
  5. # 异步操作(事件驱动)
  6. async def async_query():
  7.     result = await db.execute("SELECT * FROM users")  # 释放控制权
  8.     process(result)
复制代码
1.2 Tortoise-ORM的异步实现

Tortoise-ORM通过三层架构实现异步操纵:
层级职责关键技能协议层数据库通讯协议解析asyncpg/aiomysql连接池层管理异步数据库连接asyncio.QueueORM层模型映射与查询构建Python元类编程典型查询流程解析:
  1. async def get_users():
  2.     # 以下三个步骤交替执行,全程无阻塞
  3.     users = await User.filter(age__gt=18)  # 1.生成SQL语句
  4.     # 2.从连接池获取连接
  5.     # 3.等待数据库响应
  6.     return users
复制代码
1.3 实战:用户管理系统搭建

情况预备
  1. pip install fastapi uvicorn tortoise-orm aiosqlite pydantic
复制代码
项目布局
  1. project/
  2. ├── config.py
  3. ├── models.py
  4. ├── schemas.py
  5. └── main.py
复制代码
模型定义(models.py)
  1. from tortoise.models import Model
  2. from tortoise import fields
  3. class User(Model):
  4.     id = fields.IntField(pk=True)
  5.     username = fields.CharField(max_length=50, unique=True)
  6.     hashed_password = fields.CharField(max_length=128)
  7.     email = fields.CharField(max_length=100)
  8.     created_at = fields.DatetimeField(auto_now_add=True)
  9.     class Meta:
  10.         table = "users"
复制代码
数据校验(schemas.py)
  1. from pydantic import BaseModel, EmailStr
  2. class UserCreate(BaseModel):
  3.     username: str
  4.     password: str
  5.     email: EmailStr
  6.     class Config:
  7.         schema_extra = {
  8.             "example": {
  9.                 "username": "fastapi_user",
  10.                 "password": "strongpassword123",
  11.                 "email": "user@example.com"
  12.             }
  13.         }
复制代码
核心逻辑(main.py)
  1. from fastapi import FastAPI, Depends, HTTPException
  2. from tortoise.contrib.fastapi import register_tortoise
  3. from models import User
  4. from schemas import UserCreate
  5. app = FastAPI()
  6. # 初始化数据库
  7. register_tortoise(
  8.     app,
  9.     db_url="sqlite://db.sqlite3",
  10.     modules={"models": ["models"]},
  11.     generate_schemas=True,
  12.     add_exception_handlers=True,
  13. )
  14. @app.post("/users/", status_code=201)
  15. async def create_user(user_data: UserCreate):
  16.     # 密码哈希处理(实际项目应使用passlib)
  17.     hashed_password = f"hashed_{user_data.password}"
  18.     try:
  19.         user = await User.create(
  20.             username=user_data.username,
  21.             hashed_password=hashed_password,
  22.             email=user_data.email
  23.         )
  24.     except Exception as e:
  25.         raise HTTPException(
  26.             status_code=400,
  27.             detail="Username already exists"
  28.         )
  29.     return {
  30.         "id": user.id,
  31.         "username": user.username,
  32.         "email": user.email
  33.     }
  34. @app.get("/users/{user_id}")
  35. async def get_user(user_id: int):
  36.     user = await User.get_or_none(id=user_id)
  37.     if not user:
  38.         raise HTTPException(status_code=404, detail="User not found")
  39.     return {
  40.         "id": user.id,
  41.         "username": user.username,
  42.         "email": user.email,
  43.         "created_at": user.created_at.isoformat()
  44.     }
复制代码
课后Quiz

题目1:以下哪种场景最适合使用异步ORM?
A) 单用户的桌面应用程序
B) 需要处理数千并发哀求的API服务
C) 执行复杂事件的财务系统
D) 数据堆栈的批量数据处理
答案:B
解析:异步ORM在高并发IO密集型场景下能明显提升吞吐量,而ACD场景更多需要的是事件完整性或盘算能力。
题目2:如何避免在ORM查询时发生SQL注入?
A) 直接拼接字符串
B) 使用ORM的参数化查询
C) 手动过滤特殊字符
D) 限制查询字段长度
答案:B
解析:Tortoise-ORM的查询方法会主动进行参数化处理,有效防止SQL注入,这是最安全的做法。
常见报错解决方案

错误1:422 Validation Error
原因分析:哀求体不符合Pydantic模型要求
解决方法:

  • 检查哀求头Content-Type是否为application/json
  • 使用Swagger文档测试接口
  • 查看返回信息中的错误字段提示
错误2:RuntimeError: Event loop is closed
原因分析:异步代码在错误的位置执行
解决方法:

  • 确保所有异步操纵都在async函数内
  • 使用asyncio.run()正确启动事件循环
  • 检查数据库连接是否正确关闭
错误3:OperationalError: Connection refused
原因分析:数据库连接设置错误
解决方法:

  • 检查db_url格式:dialect://user:password@host:port/database
  • 确认数据库服务是否正常运行
  • 验证网络防火墙设置
最佳实践发起


  • 连接池设置:根据数据库最大连接数设置maxsize
  1. register_tortoise(
  2.     app,
  3.     db_url="postgres://user:pass@localhost:5432/mydb",
  4.     modules={"models": ["models"]},
  5.     generate_schemas=True,
  6.     add_exception_handlers=True,
  7.     connection_params={
  8.         "maxsize": 20  # 控制连接池大小
  9.     }
  10. )
复制代码

  • 查询优化:使用select_related预加载关联数据
  1. # 获取用户及其所有文章
  2. async def get_user_with_posts(user_id: int):
  3.     user = await User.get(id=user_id).prefetch_related('posts')
  4.     return user
复制代码

  • 事件管理:确保数据一致性
  1. async def transfer_funds(from_id, to_id, amount):
  2.     async with in_transaction() as conn:
  3.         from_user = await User.get(id=from_id).for_update()
  4.         to_user = await User.get(id=to_id).for_update()
  5.         if from_user.balance < amount:
  6.             raise ValueError("Insufficient balance")
  7.         from_user.balance -= amount
  8.         to_user.balance += amount
  9.         await from_user.save(using_db=conn)
  10.         await to_user.save(using_db=conn)
复制代码
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交换与发展,阅读完整的文章:
往期文章归档:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万有斥力

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表