python 利用flask+sqlalchemy 实现简朴数据查询接口

打印 上一主题 下一主题

主题 1020|帖子 1020|积分 3060

数据库表结构和部门数据

  1. SET NAMES utf8mb4;
  2. SET FOREIGN_KEY_CHECKS = 0;
  3. -- ----------------------------
  4. -- Table structure for user
  5. -- ----------------------------
  6. DROP TABLE IF EXISTS `user`;
  7. CREATE TABLE `user`  (
  8.   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  9.   `name` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '用户名',
  10.   `password` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '密码',
  11.   `avatar` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '头像',
  12.   `sex` smallint(6) NULL DEFAULT 0 COMMENT '性别: 0 未知 1 男 2 女',
  13.   `phone` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '手机号',
  14.   `is_deleted` smallint(6) NULL DEFAULT 0 COMMENT '是否删除: 0 未删除 1 已删除',
  15.   `create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',
  16.   `update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  17.   PRIMARY KEY (`id`) USING BTREE
  18. ) ENGINE = InnoDB AUTO_INCREMENT = 31 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC;
  19. -- ----------------------------
  20. -- Records of user
  21. -- ----------------------------
  22. INSERT INTO `user` VALUES (1, 'admin', '30780cc6f2e56945aaf9c9578c932e22', '1', 0, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
  23. INSERT INTO `user` VALUES (2, 'user', '30780cc6f2e56945aaf9c9578c932e22', '', 1, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
  24. INSERT INTO `user` VALUES (3, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
  25. INSERT INTO `user` VALUES (4, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
  26. INSERT INTO `user` VALUES (5, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
  27. SET FOREIGN_KEY_CHECKS = 1;
复制代码
python 代码

  1. import logging
  2. from datetime import datetime
  3. from typing import List, Optional
  4. import uvicorn
  5. from fastapi import FastAPI, HTTPException, Query
  6. from pydantic import BaseModel
  7. from pydantic import create_model
  8. from sqlalchemy import create_engine, Column, Integer, String, SmallInteger, DateTime, and_
  9. from sqlalchemy.ext.declarative import declarative_base
  10. from sqlalchemy.future import select
  11. from sqlalchemy.orm import sessionmaker
  12. # 配置 SQLAlchemy 日志,打印 SQL 语句
  13. logging.basicConfig()
  14. logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
  15. # 修改为您的数据库连接信息
  16. DATABASE_URL = "mysql://root:wonderful2021@127.0.0.1:3306/demo"
  17. engine = create_engine(DATABASE_URL, echo=True)  # echo=True 打印 SQL 语句
  18. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  19. Base = declarative_base()
  20. class User(Base):
  21.     __tablename__ = "user"
  22.     id = Column(Integer, primary_key=True, index=True)
  23.     name = Column(String(60), nullable=False)
  24.     password = Column(String(64), nullable=False)
  25.     avatar = Column(String(60), nullable=True)
  26.     sex = Column(SmallInteger, default=0)
  27.     phone = Column(String(30), nullable=True)
  28.     is_deleted = Column(SmallInteger, default=0)
  29.     create_time = Column(DateTime, default=datetime.utcnow)
  30.     update_time = Column(DateTime, default=datetime.utcnow)
  31. class UserResponse(BaseModel):
  32.     id: int
  33.     name: str
  34.     avatar: Optional[str] = None
  35.     sex: int
  36.     phone: Optional[str] = None
  37.     is_deleted: int
  38.     create_time: datetime
  39.     update_time: datetime
  40.     class Config:
  41.         orm_mode = True
  42.         json_encoders = {
  43.             datetime: lambda v: v.strftime('%Y-%m-%d %H:%M:%S')  # 自定义日期时间格式
  44.         }
  45. app = FastAPI()
  46. # # 固定返回字段 字段与类型的映射
  47. field_type_mapping = {
  48.     'id': int,
  49.     'name': str,
  50.     'phone': str,
  51.     'avatar': str,
  52.     'create_time': str,  # 这里使用 str,因为我们会在返回时格式化
  53.     'update_time': str  # 同上
  54. }
  55. # 动态生成 Pydantic 模型
  56. DynamicUserResponse = create_model(
  57.     'DynamicUserResponse',
  58.     **{field: (field_type_mapping[field], ...) for field in field_type_mapping.keys()}
  59. )
  60. def get_selected_fields(model, fields):
  61.     """根据字段列表动态选择模型字段"""
  62.     return [getattr(model, field.strip()) for field in fields]
  63. def handle_none_values(data: dict) -> dict:
  64.     """处理返回值中的 None 值,转换为空字符串"""
  65.     return {k: (v if v is not None else "") for k, v in data.items()}
  66. def format_datetime(dt: Optional[datetime]) -> str:
  67.     """格式化日期时间,返回字符串"""
  68.     return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else ""
  69. @app.get("/users/", response_model=List[DynamicUserResponse])
  70. async def read_users(
  71.         skip: int = Query(0, ge=0),
  72.         limit: int = Query(10, ge=1),
  73.         name: Optional[str] = None,
  74.         sex: Optional[int] = None,
  75.         phone: Optional[str] = None
  76. ):
  77.     with SessionLocal() as session:
  78.         filters = [User.is_deleted == 0]  # 默认条件:未删除
  79.         if name:
  80.             filters.append(User.name.like(f"%{name}%"))  # 模糊查询
  81.         if sex is not None:
  82.             filters.append(User.sex == sex)  # 精确查询
  83.         if phone:
  84.             filters.append(User.phone.like(f"%{phone}%"))  # 模糊查询
  85.         # 固定返回字段
  86.         selected_fields = get_selected_fields(User, field_type_mapping.keys())
  87.         stmt = select(*selected_fields).where(and_(*filters)).offset(skip).limit(limit)
  88.         result = session.execute(stmt)
  89.         users = result.all()
  90.         # 返回字典列表,处理 None 值并格式化日期时间
  91.         return [
  92.             handle_none_values({
  93.                 **dict(zip(field_type_mapping.keys(), user)),
  94.                 'create_time': format_datetime(user.create_time),
  95.                 'update_time': format_datetime(user.update_time)
  96.             }) for user in users
  97.         ]
  98. @app.get("/users/{user_id}", response_model=DynamicUserResponse)
  99. async def read_user(
  100.         user_id: int
  101. ):
  102.     with SessionLocal() as session:
  103.         # 固定返回字段
  104.         selected_fields = get_selected_fields(User, field_type_mapping.keys())
  105.         stmt = select(*selected_fields).where(User.id == user_id, User.is_deleted == 0)
  106.         result = session.execute(stmt)
  107.         user = result.first()
  108.         if user is None:
  109.             raise HTTPException(status_code=404, detail="User not found")
  110.         # 返回字典,处理 None 值并格式化日期时间
  111.         return handle_none_values({
  112.             **dict(zip(field_type_mapping.keys(), user)),
  113.             'create_time': format_datetime(user.create_time),
  114.             'update_time': format_datetime(user.update_time)
  115.         })
  116. @app.get("/usersallfiled/", response_model=List[UserResponse])
  117. async def read_users(
  118.         skip: int = Query(0, ge=0),
  119.         limit: int = Query(10, ge=1),
  120.         name: Optional[str] = None,
  121.         sex: Optional[int] = None,
  122.         phone: Optional[str] = None
  123. ):
  124.     with SessionLocal() as session:
  125.         filters = [User.is_deleted == 0]  # 默认条件:未删除
  126.         if name:
  127.             filters.append(User.name.like(f"%{name}%"))  # 模糊查询
  128.         if sex is not None:
  129.             filters.append(User.sex == sex)  # 精确查询
  130.         if phone:
  131.             filters.append(User.phone.like(f"%{phone}%"))  # 模糊查询
  132.         stmt = select(User).where(and_(*filters)).offset(skip).limit(limit)
  133.         result = session.execute(stmt)
  134.         users = result.scalars().all()
  135.         return users
  136. @app.get("/usersallfiled/{user_id}", response_model=UserResponse)
  137. async def read_user(
  138.         user_id: int
  139. ):
  140.     with SessionLocal() as session:
  141.         stmt = select(User).where(User.id == user_id, User.is_deleted == 0)
  142.         result = session.execute(stmt)
  143.         user = result.scalars().first()
  144.         if user is None:
  145.             raise HTTPException(status_code=404, detail="User not found")
  146.         return user
  147. if __name__ == "__main__":
  148.     uvicorn.run(app, host="127.0.0.1", port=8000)
复制代码
调用方式

  get请求http://127.0.0.1:8000/usersallfiled/?name=admin&skip=0&limit=10
返回效果[{"id":1,"name":"admin","avatar":"1","sex":0,"phone":null,"is_deleted":0,"create_time":"2023-03-29 02:34:58","update_time":"2023-03-29 02:34:58"}]

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户云卷云舒

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表