扫描二维码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
探索数千个预构建的 AI 应用,开启你的下一个巨大创意
第一章:输入验证体系
1.1 范例安全革命
- from pydantic import BaseModel, PaymentCardNumber
- from pydantic.types import SecretStr
- class UserRequest(BaseModel):
- username: str = Field(min_length=4, regex="^[a-zA-Z0-9_]+$")
- credit_card: PaymentCardNumber
- password: SecretStr
- ip_address: IPv4Address
- # 自动完成:
- # 1. 信用卡格式验证
- # 2. 密码内存加密
- # 3. IP地址合法性检测
复制代码 1.2 深度校验计谋
- from pydantic import validator, root_validator
- class OrderRequest(BaseModel):
- items: list[int]
- total_price: float
- @validator('items', each_item=True)
- def check_item_ids(cls, v):
- if v <= 0:
- raise ValueError("非法商品ID")
- return v
- @root_validator
- def check_price_match(cls, values):
- items = values.get('items')
- price = values.get('total_price')
- # 查询数据库验证价格一致性
- real_price = calc_real_price(items)
- if abs(price - real_price) > 1e-6:
- raise ValueError("价格不匹配")
- return values
复制代码 第二章:注入攻击防护
2.1 SQL注入防护矩阵
- # 危险示例(绝对禁止)
- @app.get("/items")
- async def get_items(name: str):
- # 直接拼接SQL语句
- query = f"SELECT * FROM items WHERE name = '{name}'"
- return await database.fetch_all(query)
- # 安全方案
- from sqlalchemy import text
- @app.get("/items")
- async def safe_get_items(name: str):
- # 参数化查询
- query = text("SELECT * FROM items WHERE name = :name")
- return await database.fetch_all(query, {"name": name})
复制代码 2.2 NoSQL注入防护
- from bson import json_util
- from fastapi.encoders import jsonable_encoder
- class QuerySanitizer:
- @classmethod
- def sanitize(cls, query: dict):
- safe_query = {}
- for k, v in jsonable_encoder(query).items():
- if isinstance(v, str):
- safe_query[k] = {"$eq": v}
- else:
- safe_query[k] = v
- return json_util.dumps(safe_query)
- # 使用示例
- raw_query = {"name": {"$ne": "admin"}}
- safe_query = QuerySanitizer.sanitize(raw_query) # 转换为安全查询
复制代码 第三章:敏感数据处理
3.1 数据掩藏中间件
- from fastapi import Request
- from fastapi.middleware import Middleware
- class DataMaskingMiddleware:
- def __init__(self, app):
- self.app = app
- self.sensitive_keys = {'password', 'token', 'credit_card'}
- async def __call__(self, request: Request, call_next):
- response = await call_next(request)
- body = await response.body()
- # 对敏感字段进行遮蔽
- masked_body = self.mask_sensitive_data(json.loads(body))
- return JSONResponse(
- content=masked_body,
- status_code=response.status_code,
- headers=dict(response.headers)
- )
- def mask_sensitive_data(self, data):
- if isinstance(data, dict):
- return {k: self._mask_value(k, v) for k, v in data.items()}
- return data
- def _mask_value(self, key, value):
- if key in self.sensitive_keys:
- return "***MASKED***"
- return value
复制代码 3.2 密码学存储方案
- from cryptography.fernet import Fernet
- from passlib.context import CryptContext
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- fernet = Fernet(config.SECRET_KEY)
- class PasswordManager:
- @staticmethod
- def hash_password(plain: str) -> str:
- return pwd_context.hash(plain)
- @staticmethod
- def encrypt_data(data: str) -> bytes:
- return fernet.encrypt(data.encode())
- @staticmethod
- def decrypt_data(cipher: bytes) -> str:
- return fernet.decrypt(cipher).decode()
- # 使用示例
- hashed_pwd = PasswordManager.hash_password("user123")
- encrypted_data = PasswordManager.encrypt_data("sensitive_info")
复制代码 第四章:高级安全计谋
4.1 请求签名验证
- import hmac
- from hashlib import sha256
- class SignatureValidator:
- @classmethod
- def generate_signature(cls, data: dict, secret: str) -> str:
- sorted_str = "&".join(f"{k}={v}" for k, v in sorted(data.items()))
- return hmac.new(secret.encode(), sorted_str.encode(), sha256).hexdigest()
- @classmethod
- def validate_signature(cls, data: dict, signature: str, secret: str) -> bool:
- actual = cls.generate_signature(data, secret)
- return hmac.compare_digest(actual, signature)
- # 在依赖项中进行验证
- async def verify_request(
- request: Request,
- body: dict = Body(...),
- signature: str = Header(...)
- ):
- secret = config.API_SECRET
- if not SignatureValidator.validate_signature(body, signature, secret):
- raise HTTPException(403, "非法请求")
- return body
复制代码 4.2 速率限定防御
- from fastapi import Depends
- from fastapi_limiter import FastAPILimiter
- from fastapi_limiter.depends import RateLimiter
- @app.on_event("startup")
- async def startup():
- await FastAPILimiter.init(config.REDIS_URL)
- @app.get("/sensitive", dependencies=[Depends(RateLimiter(times=5, seconds=60))])
- async def sensitive_operation():
- return {"detail": "敏感操作成功"}
复制代码 第五章:错误处理与日志
5.1 安全错误标准化
- from fastapi import HTTPException
- class SecurityException(HTTPException):
- def __init__(self, detail: str):
- super().__init__(
- status_code=403,
- detail=detail,
- headers={"WWW-Authenticate": "Bearer"},
- )
- @app.exception_handler(SecurityException)
- async def security_exception_handler(request, exc):
- return JSONResponse(
- status_code=exc.status_code,
- content={"detail": exc.detail},
- headers=exc.headers
- )
复制代码 5.2 安全日志审计
- import logging
- from logging.handlers import SysLogHandler
- security_logger = logging.getLogger("api.security")
- security_logger.setLevel(logging.INFO)
- handler = SysLogHandler(address=('logs.papertrailapp.com', 12345))
- security_logger.addHandler(handler)
- class SecurityLogger:
- @staticmethod
- def log_suspicious(request: Request):
- log_data = {
- "ip": request.client.host,
- "path": request.url.path,
- "method": request.method,
- "user_agent": request.headers.get("user-agent")
- }
- security_logger.warning("可疑请求: %s", json.dumps(log_data))
复制代码 课后Quiz
Q1:哪种方式能有效防止SQL注入?
A) 利用ORM的参数化查询
B) 拼接用户输入到SQL语句
C) 用正则过滤特别字符
D) 限定数据库权限
Q2:敏感信息掩藏的正确时机是?
Q3:请求签名验证的主要作用是?
- 提升性能
- 防止请求篡改
- 压缩数据体积
- 验证请求来源合法性
错误代码速查表
错误码场景解决方案422参数校验失败查抄字段范例与格式约束403签名验证失败查抄请求签名天生算法429请求频率超限低落操作频率或联系管理员500密钥配置错误查抄加密密钥加载逻辑扩展阅读
- 《OWASP API Security TOP 10》 - API安全威胁权威指南
- 《密码学工程实践》 - 安全存储与传输的现代方案
- 《云原生安全架构》 - 分布式系统安全设计模式
安全箴言:真正的安全防御是分层递进的体系,而非单一技术点的堆砌。建议每月进行安全审计,每季度开展渗透测试,让安全防护与时俱进。记住:安全无小事,防御无止境。
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完备的文章:FastAPI安全防护指南:构建坚不可摧的参数处理体系 | cmdragon's Blog
往期文章归档:
<ul>FastAPI复杂查询终极指南:告别if-else的现代化过滤架构 | cmdragon's Blog
FastAPI 核心机制:分页参数的实现与最佳实践 | cmdragon's Blog
<a href="https://blog.cmdragon.cn/posts/615a966b68d9/" target="_blank" rel="noopener nofollow">FastAPI 错误处理与自界说错误消息完全指南:构建健壮的 API 应用
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |