FastAPI安全防护指南:构建坚不可摧的参数处理体系

打印 上一主题 下一主题

主题 640|帖子 640|积分 1920



扫描二维码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
探索数千个预构建的 AI 应用,开启你的下一个巨大创意
第一章:输入验证体系

1.1 范例安全革命
  1. from pydantic import BaseModel, PaymentCardNumber
  2. from pydantic.types import SecretStr
  3. class UserRequest(BaseModel):
  4.     username: str = Field(min_length=4, regex="^[a-zA-Z0-9_]+$")
  5.     credit_card: PaymentCardNumber
  6.     password: SecretStr
  7.     ip_address: IPv4Address
  8. # 自动完成:
  9. # 1. 信用卡格式验证
  10. # 2. 密码内存加密
  11. # 3. IP地址合法性检测
复制代码
1.2 深度校验计谋
  1. from pydantic import validator, root_validator
  2. class OrderRequest(BaseModel):
  3.     items: list[int]
  4.     total_price: float
  5.     @validator('items', each_item=True)
  6.     def check_item_ids(cls, v):
  7.         if v <= 0:
  8.             raise ValueError("非法商品ID")
  9.         return v
  10.     @root_validator
  11.     def check_price_match(cls, values):
  12.         items = values.get('items')
  13.         price = values.get('total_price')
  14.         # 查询数据库验证价格一致性
  15.         real_price = calc_real_price(items)
  16.         if abs(price - real_price) > 1e-6:
  17.             raise ValueError("价格不匹配")
  18.         return values
复制代码
第二章:注入攻击防护

2.1 SQL注入防护矩阵
  1. # 危险示例(绝对禁止)
  2. @app.get("/items")
  3. async def get_items(name: str):
  4.     # 直接拼接SQL语句
  5.     query = f"SELECT * FROM items WHERE name = '{name}'"
  6.     return await database.fetch_all(query)
  7. # 安全方案
  8. from sqlalchemy import text
  9. @app.get("/items")
  10. async def safe_get_items(name: str):
  11.     # 参数化查询
  12.     query = text("SELECT * FROM items WHERE name = :name")
  13.     return await database.fetch_all(query, {"name": name})
复制代码
2.2 NoSQL注入防护
  1. from bson import json_util
  2. from fastapi.encoders import jsonable_encoder
  3. class QuerySanitizer:
  4.     @classmethod
  5.     def sanitize(cls, query: dict):
  6.         safe_query = {}
  7.         for k, v in jsonable_encoder(query).items():
  8.             if isinstance(v, str):
  9.                 safe_query[k] = {"$eq": v}
  10.             else:
  11.                 safe_query[k] = v
  12.         return json_util.dumps(safe_query)
  13. # 使用示例
  14. raw_query = {"name": {"$ne": "admin"}}
  15. safe_query = QuerySanitizer.sanitize(raw_query)  # 转换为安全查询
复制代码
第三章:敏感数据处理

3.1 数据掩藏中间件
  1. from fastapi import Request
  2. from fastapi.middleware import Middleware
  3. class DataMaskingMiddleware:
  4.     def __init__(self, app):
  5.         self.app = app
  6.         self.sensitive_keys = {'password', 'token', 'credit_card'}
  7.     async def __call__(self, request: Request, call_next):
  8.         response = await call_next(request)
  9.         body = await response.body()
  10.         # 对敏感字段进行遮蔽
  11.         masked_body = self.mask_sensitive_data(json.loads(body))
  12.         return JSONResponse(
  13.             content=masked_body,
  14.             status_code=response.status_code,
  15.             headers=dict(response.headers)
  16.         )
  17.     def mask_sensitive_data(self, data):
  18.         if isinstance(data, dict):
  19.             return {k: self._mask_value(k, v) for k, v in data.items()}
  20.         return data
  21.     def _mask_value(self, key, value):
  22.         if key in self.sensitive_keys:
  23.             return "***MASKED***"
  24.         return value
复制代码
3.2 密码学存储方案
  1. from cryptography.fernet import Fernet
  2. from passlib.context import CryptContext
  3. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  4. fernet = Fernet(config.SECRET_KEY)
  5. class PasswordManager:
  6.     @staticmethod
  7.     def hash_password(plain: str) -> str:
  8.         return pwd_context.hash(plain)
  9.     @staticmethod
  10.     def encrypt_data(data: str) -> bytes:
  11.         return fernet.encrypt(data.encode())
  12.     @staticmethod
  13.     def decrypt_data(cipher: bytes) -> str:
  14.         return fernet.decrypt(cipher).decode()
  15. # 使用示例
  16. hashed_pwd = PasswordManager.hash_password("user123")
  17. encrypted_data = PasswordManager.encrypt_data("sensitive_info")
复制代码
第四章:高级安全计谋

4.1 请求签名验证
  1. import hmac
  2. from hashlib import sha256
  3. class SignatureValidator:
  4.     @classmethod
  5.     def generate_signature(cls, data: dict, secret: str) -> str:
  6.         sorted_str = "&".join(f"{k}={v}" for k, v in sorted(data.items()))
  7.         return hmac.new(secret.encode(), sorted_str.encode(), sha256).hexdigest()
  8.     @classmethod
  9.     def validate_signature(cls, data: dict, signature: str, secret: str) -> bool:
  10.         actual = cls.generate_signature(data, secret)
  11.         return hmac.compare_digest(actual, signature)
  12. # 在依赖项中进行验证
  13. async def verify_request(
  14.         request: Request,
  15.         body: dict = Body(...),
  16.         signature: str = Header(...)
  17. ):
  18.     secret = config.API_SECRET
  19.     if not SignatureValidator.validate_signature(body, signature, secret):
  20.         raise HTTPException(403, "非法请求")
  21.     return body
复制代码
4.2 速率限定防御
  1. from fastapi import Depends
  2. from fastapi_limiter import FastAPILimiter
  3. from fastapi_limiter.depends import RateLimiter
  4. @app.on_event("startup")
  5. async def startup():
  6.     await FastAPILimiter.init(config.REDIS_URL)
  7. @app.get("/sensitive", dependencies=[Depends(RateLimiter(times=5, seconds=60))])
  8. async def sensitive_operation():
  9.     return {"detail": "敏感操作成功"}
复制代码
第五章:错误处理与日志

5.1 安全错误标准化
  1. from fastapi import HTTPException
  2. class SecurityException(HTTPException):
  3.     def __init__(self, detail: str):
  4.         super().__init__(
  5.             status_code=403,
  6.             detail=detail,
  7.             headers={"WWW-Authenticate": "Bearer"},
  8.         )
  9. @app.exception_handler(SecurityException)
  10. async def security_exception_handler(request, exc):
  11.     return JSONResponse(
  12.         status_code=exc.status_code,
  13.         content={"detail": exc.detail},
  14.         headers=exc.headers
  15.     )
复制代码
5.2 安全日志审计
  1. import logging
  2. from logging.handlers import SysLogHandler
  3. security_logger = logging.getLogger("api.security")
  4. security_logger.setLevel(logging.INFO)
  5. handler = SysLogHandler(address=('logs.papertrailapp.com', 12345))
  6. security_logger.addHandler(handler)
  7. class SecurityLogger:
  8.     @staticmethod
  9.     def log_suspicious(request: Request):
  10.         log_data = {
  11.             "ip": request.client.host,
  12.             "path": request.url.path,
  13.             "method": request.method,
  14.             "user_agent": request.headers.get("user-agent")
  15.         }
  16.         security_logger.warning("可疑请求: %s", json.dumps(log_data))
复制代码
课后Quiz

Q1:哪种方式能有效防止SQL注入?
A) 利用ORM的参数化查询
B) 拼接用户输入到SQL语句
C) 用正则过滤特别字符
D) 限定数据库权限
Q2:敏感信息掩藏的正确时机是?

  • 数据库存储时
  • 日志记录时
  • API响应时
  • 全部正确
Q3:请求签名验证的主要作用是?

  • 提升性能
  • 防止请求篡改
  • 压缩数据体积
  • 验证请求来源合法性
错误代码速查表

错误码场景解决方案422参数校验失败查抄字段范例与格式约束403签名验证失败查抄请求签名天生算法429请求频率超限低落操作频率或联系管理员500密钥配置错误查抄加密密钥加载逻辑扩展阅读


  • 《OWASP API Security TOP 10》 - API安全威胁权威指南
  • 《密码学工程实践》 - 安全存储与传输的现代方案
  • 《云原生安全架构》 - 分布式系统安全设计模式
安全箴言:真正的安全防御是分层递进的体系,而非单一技术点的堆砌。建议每月进行安全审计,每季度开展渗透测试,让安全防护与时俱进。记住:安全无小事,防御无止境。
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完备的文章:FastAPI安全防护指南:构建坚不可摧的参数处理体系 | cmdragon's Blog
往期文章归档:

<ul>FastAPI复杂查询终极指南:告别if-else的现代化过滤架构 | cmdragon's Blog
FastAPI 核心机制:分页参数的实现与最佳实践 | cmdragon's Blog
<a href="https://blog.cmdragon.cn/posts/615a966b68d9/" target="_blank" rel="noopener nofollow">FastAPI 错误处理与自界说错误消息完全指南:构建健壮的 API 应用
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

李优秀

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表