用户云卷云舒 发表于 2025-3-9 06:23:49

python 利用flask+sqlalchemy 实现简朴数据查询接口

数据库表结构和部门数据

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`name` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '用户名',
`password` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '密码',
`avatar` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '头像',
`sex` smallint(6) NULL DEFAULT 0 COMMENT '性别: 0 未知 1 男 2 女',
`phone` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '手机号',
`is_deleted` smallint(6) NULL DEFAULT 0 COMMENT '是否删除: 0 未删除 1 已删除',
`create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',
`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 31 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC;

-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (1, 'admin', '30780cc6f2e56945aaf9c9578c932e22', '1', 0, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (2, 'user', '30780cc6f2e56945aaf9c9578c932e22', '', 1, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (3, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (4, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (5, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');

SET FOREIGN_KEY_CHECKS = 1;python 代码

import logging
from datetime import datetime
from typing import List, Optional

import uvicorn
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from pydantic import create_model
from sqlalchemy import create_engine, Column, Integer, String, SmallInteger, DateTime, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker

# 配置 SQLAlchemy 日志,打印 SQL 语句
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)

# 修改为您的数据库连接信息
DATABASE_URL = "mysql://root:wonderful2021@127.0.0.1:3306/demo"

engine = create_engine(DATABASE_URL, echo=True)# echo=True 打印 SQL 语句
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()


class User(Base):
    __tablename__ = "user"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(60), nullable=False)
    password = Column(String(64), nullable=False)
    avatar = Column(String(60), nullable=True)
    sex = Column(SmallInteger, default=0)
    phone = Column(String(30), nullable=True)
    is_deleted = Column(SmallInteger, default=0)
    create_time = Column(DateTime, default=datetime.utcnow)
    update_time = Column(DateTime, default=datetime.utcnow)


class UserResponse(BaseModel):
    id: int
    name: str
    avatar: Optional = None
    sex: int
    phone: Optional = None
    is_deleted: int
    create_time: datetime
    update_time: datetime

    class Config:
      orm_mode = True
      json_encoders = {
            datetime: lambda v: v.strftime('%Y-%m-%d %H:%M:%S')# 自定义日期时间格式
      }


app = FastAPI()

# # 固定返回字段 字段与类型的映射
field_type_mapping = {
    'id': int,
    'name': str,
    'phone': str,
    'avatar': str,
    'create_time': str,# 这里使用 str,因为我们会在返回时格式化
    'update_time': str# 同上
}

# 动态生成 Pydantic 模型
DynamicUserResponse = create_model(
    'DynamicUserResponse',
    **{field: (field_type_mapping, ...) for field in field_type_mapping.keys()}
)


def get_selected_fields(model, fields):
    """根据字段列表动态选择模型字段"""
    return


def handle_none_values(data: dict) -> dict:
    """处理返回值中的 None 值,转换为空字符串"""
    return {k: (v if v is not None else "") for k, v in data.items()}


def format_datetime(dt: Optional) -> str:
    """格式化日期时间,返回字符串"""
    return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else ""


@app.get("/users/", response_model=List)
async def read_users(
      skip: int = Query(0, ge=0),
      limit: int = Query(10, ge=1),
      name: Optional = None,
      sex: Optional = None,
      phone: Optional = None
):
    with SessionLocal() as session:
      filters = # 默认条件:未删除

      if name:
            filters.append(User.name.like(f"%{name}%"))# 模糊查询
      if sex is not None:
            filters.append(User.sex == sex)# 精确查询
      if phone:
            filters.append(User.phone.like(f"%{phone}%"))# 模糊查询

      # 固定返回字段
      selected_fields = get_selected_fields(User, field_type_mapping.keys())
      stmt = select(*selected_fields).where(and_(*filters)).offset(skip).limit(limit)

      result = session.execute(stmt)
      users = result.all()

      # 返回字典列表,处理 None 值并格式化日期时间
      return [
            handle_none_values({
                **dict(zip(field_type_mapping.keys(), user)),
                'create_time': format_datetime(user.create_time),
                'update_time': format_datetime(user.update_time)
            }) for user in users
      ]


@app.get("/users/{user_id}", response_model=DynamicUserResponse)
async def read_user(
      user_id: int
):
    with SessionLocal() as session:
      # 固定返回字段
      selected_fields = get_selected_fields(User, field_type_mapping.keys())
      stmt = select(*selected_fields).where(User.id == user_id, User.is_deleted == 0)

      result = session.execute(stmt)
      user = result.first()

      if user is None:
            raise HTTPException(status_code=404, detail="User not found")

      # 返回字典,处理 None 值并格式化日期时间
      return handle_none_values({
            **dict(zip(field_type_mapping.keys(), user)),
            'create_time': format_datetime(user.create_time),
            'update_time': format_datetime(user.update_time)
      })


@app.get("/usersallfiled/", response_model=List)
async def read_users(
      skip: int = Query(0, ge=0),
      limit: int = Query(10, ge=1),
      name: Optional = None,
      sex: Optional = None,
      phone: Optional = None
):
    with SessionLocal() as session:
      filters = # 默认条件:未删除

      if name:
            filters.append(User.name.like(f"%{name}%"))# 模糊查询
      if sex is not None:
            filters.append(User.sex == sex)# 精确查询
      if phone:
            filters.append(User.phone.like(f"%{phone}%"))# 模糊查询

      stmt = select(User).where(and_(*filters)).offset(skip).limit(limit)

      result = session.execute(stmt)
      users = result.scalars().all()

      return users


@app.get("/usersallfiled/{user_id}", response_model=UserResponse)
async def read_user(
      user_id: int
):
    with SessionLocal() as session:
      stmt = select(User).where(User.id == user_id, User.is_deleted == 0)

      result = session.execute(stmt)
      user = result.scalars().first()

      if user is None:
            raise HTTPException(status_code=404, detail="User not found")

      return user


if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)调用方式

get请求http://127.0.0.1:8000/usersallfiled/?name=admin&skip=0&limit=10
返回效果[{"id":1,"name":"admin","avatar":"1","sex":0,"phone":null,"is_deleted":0,"create_time":"2023-03-29 02:34:58","update_time":"2023-03-29 02:34:58"}]

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: python 利用flask+sqlalchemy 实现简朴数据查询接口