FastAPI与SQLAlchemy数据库集成与CRUD操纵

打印 上一主题 下一主题

主题 1588|帖子 1588|积分 4764

title: FastAPI与SQLAlchemy数据库集成与CRUD操纵
date: 2025/04/16 09:50:57
updated: 2025/04/16 09:50:57
author: cmdragon
excerpt:
FastAPI与SQLAlchemy集成基础包括环境准备、数据库毗连设置和模型定义。CRUD操纵通过数据访问层封装和路由层实现,确保线程安全和事务管理。常见错误如422哀求验证错误通过Pydantic模型和中间件处理。Session生命周期管理依赖注入系统保证每个哀求独立会话。常见报错如数据库毗连失败和事务回滚通过查抄服务状态、验证毗连参数和非常处理解决。
categories:

  • 后端开辟
  • FastAPI
tags:

  • FastAPI
  • SQLAlchemy
  • 数据库集成
  • CRUD操纵
  • Session管理
  • 错误处理
  • MySQL
扫描二维码关注或者微信搜一搜:编程智域 前端至全栈互换与成长
探索数千个预构建的 AI 应用,开启你的下一个伟大创意
1. FastAPI 与 SQLAlchemy 同步数据库集成基础

1.1 环境准备与安装

首先创建虚拟环境并安装必要依赖:
  1. python -m venv venv
  2. source venv/bin/activate  # Linux/Mac
  3. venv\Scripts\activate.bat  # Windows
  4. pip install fastapi uvicorn sqlalchemy pymysql
复制代码
1.2 数据库毗连设置

在database.py中设置焦点数据库毗连:
  1. from sqlalchemy import create_engine
  2. from sqlalchemy.orm import sessionmaker
  3. SQLALCHEMY_DATABASE_URL = "mysql+pymysql://user:password@localhost/mydatabase"
  4. engine = create_engine(
  5.     SQLALCHEMY_DATABASE_URL,
  6.     pool_size=20,
  7.     max_overflow=0,
  8.     pool_pre_ping=True
  9. )
  10. SessionLocal = sessionmaker(
  11.     autocommit=False,
  12.     autoflush=False,
  13.     bind=engine,
  14.     expire_on_commit=False
  15. )
复制代码
1.3 模型定义与关系映射

在models.py中定义数据模型:
  1. from sqlalchemy import Column, Integer, String
  2. from database import Base
  3. class User(Base):
  4.     __tablename__ = "users"
  5.     id = Column(Integer, primary_key=True, index=True)
  6.     name = Column(String(50), nullable=False)
  7.     email = Column(String(100), unique=True)
  8.     age = Column(Integer, default=18)
  9.     def __repr__(self):
  10.         return f"<User(name='{self.name}', email='{self.email}')>"
复制代码
2. CRUD 操纵标准实现模式

2.1 数据访问层封装

创建repository.py实现CRUD操纵:
  1. from sqlalchemy.orm import Session
  2. from models import User
  3. class UserRepository:
  4.     @staticmethod
  5.     def create_user(db: Session, user_data: dict):
  6.         """ 创建用户 """
  7.         db_user = User(**user_data)
  8.         db.add(db_user)
  9.         db.commit()
  10.         db.refresh(db_user)
  11.         return db_user
  12.     @staticmethod
  13.     def get_user(db: Session, user_id: int):
  14.         """ 根据ID获取用户 """
  15.         return db.query(User).filter(User.id == user_id).first()
  16.     @staticmethod
  17.     def update_user(db: Session, user_id: int, update_data: dict):
  18.         """ 更新用户信息 """
  19.         db_user = db.query(User).filter(User.id == user_id).first()
  20.         if db_user:
  21.             for key, value in update_data.items():
  22.                 setattr(db_user, key, value)
  23.             db.commit()
  24.             db.refresh(db_user)
  25.         return db_user
  26.     @staticmethod
  27.     def delete_user(db: Session, user_id: int):
  28.         """ 删除用户 """
  29.         db_user = db.query(User).filter(User.id == user_id).first()
  30.         if db_user:
  31.             db.delete(db_user)
  32.             db.commit()
  33.             return True
  34.         return False
复制代码
2.2 路由层实现

在main.py中定义API端点:
  1. from fastapi import Depends, FastAPI, HTTPException
  2. from sqlalchemy.orm import Session
  3. from models import Base
  4. from database import engine, SessionLocal
  5. from repository import UserRepository
  6. from pydantic import BaseModel
  7. Base.metadata.create_all(bind=engine)
  8. app = FastAPI()
  9. # 依赖注入获取数据库会话
  10. def get_db():
  11.     db = SessionLocal()
  12.     try:
  13.         yield db
  14.     finally:
  15.         db.close()
  16. class UserCreate(BaseModel):
  17.     name: str
  18.     email: str
  19.     age: int = 18
  20. @app.post("/users/")
  21. def create_user(user: UserCreate, db: Session = Depends(get_db)):
  22.     return UserRepository.create_user(db, user.dict())
  23. @app.get("/users/{user_id}")
  24. def read_user(user_id: int, db: Session = Depends(get_db)):
  25.     db_user = UserRepository.get_user(db, user_id)
  26.     if db_user is None:
  27.         raise HTTPException(status_code=404, detail="User not found")
  28.     return db_user
复制代码
3. Session 生命周期管理

3.1 Session 线程安全策略

通过依赖注入系统保证每个哀求独立会话:
  1. def get_db():
  2.     db = SessionLocal()
  3.     try:
  4.         yield db
  5.     finally:
  6.         db.close()
复制代码
3.2 事务管理最佳实践
  1. def transfer_funds(db: Session, from_id: int, to_id: int, amount: float):
  2.     try:
  3.         from_user = UserRepository.get_user(db, from_id)
  4.         to_user = UserRepository.get_user(db, to_id)
  5.         if from_user.balance < amount:
  6.             raise ValueError("Insufficient funds")
  7.         from_user.balance -= amount
  8.         to_user.balance += amount
  9.         db.commit()
  10.     except Exception as e:
  11.         db.rollback()
  12.         raise e
复制代码
4. 常见错误处理

4.1 422 哀求验证错误

示例场景
  1. {
  2.   "detail": [
  3.     {
  4.       "loc": [
  5.         "body",
  6.         "age"
  7.       ],
  8.       "msg": "value is not a valid integer",
  9.       "type": "type_error.integer"
  10.     }
  11.   ]
  12. }
复制代码
解决方案

  • 查抄哀求体是否匹配Pydantic模型定义
  • 使用OpenAPI文档进行测试
  • 添加中间件捕获验证错误:
  1. from fastapi.exceptions import RequestValidationError
  2. @app.exception_handler(RequestValidationError)
  3. async def validation_exception_handler(request, exc):
  4.     return JSONResponse(
  5.         status_code=400,
  6.         content={"detail": exc.errors(), "body": exc.body},
  7.     )
复制代码
课后Quiz

问题1:以下哪种方式可以有效防止SQL注入攻击?
A) 使用字符串拼接SQL语句
B) 使用ORM的查询参数化功能
C) 在数据库毗连字符串添加特别参数
D) 禁用所有输入验证
答案:B) 正确。SQLAlchemy等ORM框架会自动进行参数化查询,将用户输入作为参数传递而不是直接拼接到SQL语句中。
问题2:为什么需要在finally块中关闭数据库会话?
A) 为了提升查询性能
B) 确保会话在任何情况下都会被正确关闭
C) 防止其他哀求使用该会话
D) 满意数据库毗连池的要求
答案:B) 正确。无论是否发生非常,finally块中的代码都会执行,保证会话资源的正确开释。
常见报错解决方案

**报错1
**:sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2003, "Can't connect to MySQL server on 'localhost'")
缘故原由分析

  • 数据库服务未启动
  • 毗连参数(用户名/密码/端口)错误
  • 网络防火墙阻止毗连
解决方案

  • 查抄MySQL服务状态
  • 验证毗连字符串参数
  • 使用telnet测试端口连通性
  • 添加毗连超时参数:
  1. create_engine(connect_args={"connect_timeout": 10})
复制代码
**报错2
**:sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush.
缘故原由分析

  • 数据库操纵违背约束(如唯一性约束)
  • 事务未正确处理非常
解决方案

  • 查抄数据完整性约束
  • 在事务代码块中添加try/except
  • 执行显式回滚操纵
  • 使用session.expire_all()重置会话状态
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈互换与成长,阅读完整的文章:FastAPI与SQLAlchemy数据库集成与CRUD操纵 | cmdragon's Blog
往期文章归档:

<ul>FastAPI与SQLAlchemy同步数据库集成 | cmdragon's Blog
SQLAlchemy 焦点概念与同步引擎设置详解 | cmdragon's Blog
FastAPI依赖注入性能优化策略 | cmdragon's Blog
FastAPI安全认证中的依赖组合 | cmdragon's Blog
FastAPI依赖注入系统及调试本领 | cmdragon's Blog
FastAPI依赖覆盖与测试环境模拟 | cmdragon's Blog
FastAPI中的依赖注入与数据库事务管理 | cmdragon's Blog
FastAPI依赖注入实践:工厂模式与实例复用的优化策略 | cmdragon's Blog
FastAPI依赖注入:链式调用与多级参数传递 | cmdragon's Blog
FastAPI依赖注入:从基础概念到应用 | cmdragon's Blog
FastAPI中实现动态条件必填字段的实践 | cmdragon's Blog
FastAPI中Pydantic异步分布式唯一性校验 | cmdragon's Blog
把握FastAPI与Pydantic的跨字段验证本领 | cmdragon's Blog
FastAPI中的Pydantic密码验证机制与实现 | cmdragon's Blog
深入把握FastAPI与OpenAPI规范的高级适配本领 | cmdragon's Blog
Pydantic字段元数据指南:从基础到企业级文档增强 | cmdragon's Blog
Pydantic Schema天生指南:自定义JSON Schema | cmdragon's Blog
Pydantic递归模型深度校验36计:从无限嵌套到亿级数据的优化法则 | cmdragon's Blog
Pydantic异步校验器深:构建高并发验证系统 | cmdragon's Blog
Pydantic根校验器:构建跨字段验证系统 | cmdragon's Blog
Pydantic设置继承抽象基类模式 | cmdragon's Blog
Pydantic多态模型:用鉴别器构建类型安全的API接口 | cmdragon's Blog
FastAPI性能优化指南:参数解析与惰性加载 | cmdragon's Blog
FastAPI依赖注入:参数共享与逻辑复用 | cmdragon's Blog
FastAPI安全防护指南:构建坚不可摧的参数处理体系 | cmdragon's Blog
FastAPI复杂查询终极指南:告别if-else的现代化过滤架构 | cmdragon's Blog
FastAPI 焦点机制:分页参数的实现与最佳实践 | cmdragon's Blog
<a href="https://blog.cmdragon.cn/posts/615a966b68d9/" target="_blank" rel="noopener nofollow">FastAPI 错误处理与自定义错误消息完全指南:构建健壮的 API 应用
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

渣渣兔

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表