点赞背后的技能大冒险:分布式事务与SAGA模式

打印 上一主题 下一主题

主题 2058|帖子 2058|积分 6174

title: 点赞背后的技能大冒险:分布式事务与SAGA模式
date: 2025/05/07 00:12:40
updated: 2025/05/07 00:12:40
author: cmdragon
excerpt:
在微服务架构中,点赞操纵涉及多个服务的数据更新,传统数据库事务在分布式系统中失效,需接纳SAGA事务模式。SAGA将事务分解为多个本地事务,通过补偿机制包管最终一致性。每个操纵需定义对应的补偿操纵,补偿操纵需幂等,并记录事务状态和实现超机遇制。代码实现包罗基础模型定义、事务上下文管理器和核心业务逻辑,测试验证正常和异常流程。生产环境中建议添加事务日志、实现定时补偿使命和服务降级策略。
categories:

  • 后端开发
  • FastAPI
tags:

  • 分布式事务
  • SAGA模式
  • 微服务架构
  • 补偿机制
  • Python实现
  • 事务管理
  • 数据库操纵
扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
探索数千个预构建的 AI 应用,开启你的下一个伟大创意https://tools.cmdragon.cn/
1. 分布式事务的挑衅与办理方案

在微服务架构中,点赞这类看似简单的操纵可能涉及多个服务的数据更新。假设我们有两个微服务:

  • 文章服务(存储文章信息和点赞数)
  • 用户服务(记录用户点赞举动)
当用户点赞时,必要同时更新:

  • 文章服务的点赞计数器
  • 用户服务的点赞记录
传统数据库事务在分布式系统中失效,我们必要接纳SAGA事务模式。这种模式将事务分解为多个本地事务,通过补偿机制包管最终一致性。
2. SAGA事务模式原理

2.1 执行流程示例

正常流程:
  1. [文章服务+1] -> [用户服务创建记录]
复制代码
异常处理:
  1. [文章服务+1] -> [用户服务失败] -> [文章服务-1补偿]
复制代码
2.2 补偿机制要点


  • 每个操纵必须定义对应的补偿操纵
  • 补偿操纵必要幂等(重复执行效果一致)
  • 必须记录事务状态
  • 必要实现事务超机遇制
3. 实现代码详解

3.1 基础模型定义
  1. # 文章服务模型
  2. class Article(Tortoise.Model):
  3.     id = fields.IntField(pk=True)
  4.     title = fields.CharField(max_length=255)
  5.     likes = fields.IntField(default=0)
  6. # 用户服务模型
  7. class UserLikeRecord(Tortoise.Model):
  8.     id = fields.UUIDField(pk=True)
  9.     user_id = fields.BigIntField()
  10.     article_id = fields.BigIntField()
  11.     created_at = fields.DatetimeField(auto_now_add=True)
  12. # Pydantic响应模型
  13. class LikeResponse(BaseModel):
  14.     article_id: int
  15.     current_likes: int
  16.     user_record_id: UUID
复制代码
3.2 事务上下文管理器
  1. class SagaTransaction:
  2.     def __init__(self):
  3.         self.compensation_actions = []
  4.     async def __aenter__(self):
  5.         return self
  6.     async def __aexit__(self, exc_type, exc, traceback):
  7.         if exc_type is not None:
  8.             await self.compensate()
  9.     def add_compensation(self, coro_func, *args):
  10.         self.compensation_actions.append((coro_func, args))
  11.     async def compensate(self):
  12.         for coro_func, args in reversed(self.compensation_actions):
  13.             try:
  14.                 await coro_func(*args)
  15.             except Exception as e:
  16.                 logging.error(f"Compensation failed: {str(e)}")
复制代码
3.3 核心业务实现
  1. @app.post("/articles/{article_id}/like", response_model=LikeResponse)
  2. async def like_article(
  3.         article_id: int,
  4.         user_id: int = Header(..., alias="X-User-ID")
  5. ):
  6.     async with SagaTransaction() as saga:
  7.         # 第一步:更新文章点赞数
  8.         article = await Article.get(id=article_id)
  9.         original_likes = article.likes
  10.         article.likes += 1
  11.         await article.save()
  12.         # 记录补偿操作(回滚点赞数)
  13.         saga.add_compensation(
  14.             self.compensate_article_likes,
  15.             article_id,
  16.             original_likes
  17.         )
  18.         # 第二步:创建用户点赞记录
  19.         try:
  20.             record = await UserLikeRecord.create(
  21.                 user_id=user_id,
  22.                 article_id=article_id
  23.             )
  24.         except Exception as e:
  25.             # 自动触发补偿流程
  26.             raise HTTPException(500, "Like record creation failed")
  27.         # 记录补偿操作(删除记录)
  28.         saga.add_compensation(
  29.             self.compensate_user_record,
  30.             record.id
  31.         )
  32.         return LikeResponse(
  33.             article_id=article_id,
  34.             current_likes=article.likes,
  35.             user_record_id=record.id
  36.         )
  37. # 补偿方法示例
  38. async def compensate_article_likes(article_id: int, original_count: int):
  39.     article = await Article.get(id=article_id)
  40.     article.likes = original_count
  41.     await article.save()
  42. async def compensate_user_record(record_id: UUID):
  43.     await UserLikeRecord.filter(id=record_id).delete()
复制代码
4. 测试与验证

4.1 正常流程测试
  1. async def test_successful_like():
  2.     async with AsyncClient(app=app, base_url="http://test") as ac:
  3.         response = await ac.post(
  4.             "/articles/1/like",
  5.             headers={"X-User-ID": "123"}
  6.         )
  7.         assert response.status_code == 200
  8.         data = response.json()
  9.         assert data["current_likes"] == 1
复制代码
4.2 异常流程测试
  1. async def test_failed_transaction():
  2.     with patch("UserLikeRecord.create", side_effect=Exception("DB Error")):
  3.         response = await ac.post(
  4.             "/articles/1/like",
  5.             headers={"X-User-ID": "123"}
  6.         )
  7.         assert response.status_code == 500
  8.         # 验证补偿是否执行
  9.         article = await Article.get(id=1)
  10.         assert article.likes == 0
复制代码
5. 课后Quiz

Q1:为什么补偿操纵必要设计为幂等?
A. 提高系统性能
B. 防止重复补偿导致数据错误
C. 减少数据库连接数
D. 满意HTTP协议规范
正确答案:B
分析:网络重试可能导致补偿操纵被多次触发,幂等设计确保多次执行效果一致,制止数据不一致。
Q2:以下哪些情况必要触发补偿机制?(多选)
A. 用户服务数据库连接超时
B. 文章不存在返回404错误
C. 用户重复点赞
D. 数据库主从同步延迟
正确答案:A
分析:404属于业务校验错误应在事务开始前检查,重复点赞属于业务逻辑错误,主从同步属于基础架构题目。只有跨服务操纵失败必要补偿。
6. 常见报错与办理方案

报错1:TransactionManagementError - 事务已关闭
  1. TransactionManagementError: Transaction already closed
复制代码
原因:异步上下文管理器中过早关闭数据库连接
办理方案

  • 检查事务作用域范围
  • 确保全部数据库操纵在同一个事务上下文中
  • 更新Tortoise-ORM到最新版本
报错2:HTTP 422 Unprocessable Entity
  1. {
  2.   "detail": "Field required"
  3. }
复制代码
原因:请求体缺少必要字段或类型不匹配
办理方案

  • 检查请求头是否包含X-User-ID
  • 验证URL参数类型是否正确
  • 利用Swagger文档测试接口格式
报错3:TimeoutError - 数据库操纵超时
  1. TimeoutError: Connection pool exhausted
复制代码
原因:数据库连接池不足或查询未优化
办理方案

  • 增加连接池大小配置:
  1. TORTOISE_CONFIG["connections"]["default"]["pool_size"] = 20
复制代码

  • 为高频查询字段添加索引
  • 利用select_related优化关联查询
7. 生产环境建议


  • 添加事务日志
  1. class TransactionLog(Tortoise.Model):
  2.     transaction_id = fields.UUIDField()
  3.     service_name = fields.CharField(max_length=50)
  4.     action_type = fields.CharField(max_length=20)  # main/compensation
  5.     status = fields.CharField(max_length=10)  # pending/done/failed
  6.     created_at = fields.DatetimeField(auto_now_add=True)
复制代码

  • 实现定时补偿使命
  1. async def check_hanging_transactions():
  2.     # 查找超过5分钟未完成的事务
  3.     pending = await TransactionLog.filter(
  4.         status="pending",
  5.         created_at__lt=datetime.now() - timedelta(minutes=5)
  6.     )
  7.     for transaction in pending:
  8.         # 执行补偿逻辑
  9.         await retry_compensation(transaction)
复制代码

  • 服务降级策略


  • 当连续补偿失败超过阈值时,触发人工干预警报
  • 提供强制完成事务的管理员接口
  • 实现事务状态查询接口供前端展示
(完备示例代码需配合PostgreSQL数据库运行,安装依赖:fastapi uvicorn tortoise-orm httpx python-multipart)
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完备的文章:点赞背后的技能大冒险:分布式事务与SAGA模式 | cmdragon's Blog
往期文章归档:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

杀鸡焉用牛刀

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表