FastAPI删除mongodb重复数据(数据洗濯)

打印 上一主题 下一主题

主题 993|帖子 993|积分 2979

在 FastAPI 中删除 MongoDB 重复数据,你需要结合使用 MongoDB 查询和 FastAPI 的路由功能。以下是一个通用的例子,演示怎样删除特定字段上的重复数据:
1. 定义数据模子:
  1. from pydantic import BaseModel, Field
  2. from bson import ObjectId
  3. from typing import Optional
  4. class PyObjectId(ObjectId):
  5.     @classmethod
  6.     def __get_validators__(cls):
  7.         yield cls.validate
  8.     @classmethod
  9.     def validate(cls, v):
  10.         if not ObjectId.is_valid(v):
  11.             raise ValueError("Invalid objectid")
  12.         return ObjectId(v)
  13.     @classmethod
  14.     def __modify_schema__(cls, field_schema):
  15.         field_schema.update(type="string")
  16. class ItemBase(BaseModel):
  17.     field_to_check: str # 需要检查重复的字段
  18. class Item(ItemBase):
  19.     id: Optional[PyObjectId] = Field(alias="_id")
  20.     class Config:
  21.         arbitrary_types_allowed = True
  22.         json_encoders = {ObjectId: str}
复制代码
2. 创建 MongoDB 连接:
  1. from motor.motor_asyncio import AsyncIOMotorClient
  2. MONGO_DETAILS = "mongodb://localhost:27017" # 替换为你的 MongoDB 连接字符串
  3. client = AsyncIOMotorClient(MONGO_DETAILS)
  4. database = client["your_database_name"] # 替换为你的数据库名称
  5. collection = database.get_collection("your_collection_name") # 替换为你的集合名称
复制代码
3. 实现删除逻辑:
  1. from fastapi import FastAPI, HTTPException
  2. app = FastAPI()
  3. @app.delete("/items/duplicates/", response_model=list[Item])
  4. async def delete_duplicate_items(field_name: str = "field_to_check"):
  5.     """
  6.     删除指定字段上的重复数据。
  7.     Args:
  8.         field_name (str, optional): 需要检查重复的字段名. Defaults to "field_to_check".
  9.     Returns:
  10.         list[Item]: 返回删除的重复文档列表.
  11.     """
  12.     # 使用聚合管道查找并删除重复项
  13.     pipeline = [
  14.         {"$match": {"version": 1}},  # 只处理 version 为 1 的文档
  15.         {"$group": {"_id": {"{}".format(field_name): "$"+field_name}, "count": {"$sum": 1}, "dups": {"$push": "$_id"}}},
  16.         {"$match": {"count": {"$gt": 1}}},
  17.         {"$unwind": "$dups"},
  18.         {"$skip": 1},
  19.         {"$project": {"_id": "$dups"}}
  20.     ]
  21.     duplicate_ids = [doc["_id"] async for doc in collection.aggregate(pipeline)]
  22.     if duplicate_ids:
  23.         deleted_items = []
  24.         for item_id in duplicate_ids:
  25.             result = await collection.find_one_and_delete({"_id": item_id})
  26.             if result:
  27.                 deleted_items.append(Item(**result))
  28.         return deleted_items
  29.     raise HTTPException(status_code=404, detail="没有找到重复数据")
复制代码
4. 运行 FastAPI 应用:
  1. uvicorn main:app --reload
复制代码
解释:


  • 数据模子: 使用 Pydantic 定义数据模子,确保数据同等性.
  • MongoDB 连接: 使用 motor 库异步连接到 MongoDB 数据库.
  • 聚合管道: 使用 MongoDB 的聚合管道查找重复数据:

    • $group: 按指定字段分组,盘算每个分组中文档数目.
    • $match: 筛选数目大于 1 的分组,即存在重复数据的组.
    • $unwind: 将 dups 数组睁开为多行.
    • $skip: 跳过每组的第一个文档,因为我们只删除重复的.
    • $project: 只生存 _id 字段.

  • 删除数据: 使用 find_one_and_delete 方法删除找到的重复文档.
  • 错误处置惩罚: 如果没有找到重复数据,抛出 404 错误.
注意:


  • 将代码中的占位符替换为你自己的数据库和聚集名称.
  • 可以根据需要修改聚合管道,以适应差别的重复数据查找需求.

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

伤心客

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表