分布式事务解决方案

打印 上一主题 下一主题

主题 662|帖子 662|积分 1986

数据不会无缘无故丢失,也不会莫名其妙增加
一、概述

1、曾几何时,知了在一家小公司做项目的时候,都是一个服务打天下,所以涉及到数据一致性的问题,都是直接用本地事务处理。

2、随着时间的推移,用户量增大了,发现一个Java服务扛不住了,于是技术大佬决定对于系统进行升级。根据系统的业务对于单体的一个服务进行拆分,然后对于开发人员也进行划分,一个开发人员只开发和维护一个或几个服务中的问题,大家各司其职,分工合作。

3、当然服务拆分不是一蹴而就的,这是一个耗时耗力的庞大工程,大多数系统都是进行多轮拆分,而后慢慢形成一个稳定的系统。遵守一个核心思想:先按总体业务进行一轮拆分,后面再根据拆分后的服务模块,进行一个细致的拆分。
4、随着服务拆分之后,用户量是抗住了,但是发现数据都在不同的服务中存取,这就引出了一个新的问题:跨服务器,如何保证数据的一致性?当然,跨服务的分布式系统中不仅仅这个问题,还有其他的一些列问题,如:服务可用性、服务容错性、服务间调用的网络问题等等,这里只讨论数据一致性问题。
5、说到数据一致性,大致分为三种:强一致性、弱一致性、最终一致性。

  • 强一致性:数据一旦写入,在任一时刻都能读取到最新的值。
  • 弱一致性:当写入一个数据的时候,其他地方去读这些数据,可能查到的数据不是最新的
  • 最终一致性:它是弱一致性的一个变种,不追求系统任意时刻数据要达到一致,但是在一定时间后,数据最终要达到一致。
从这三种一致型的模型上来说,我们可以看到,弱一致性和最终一致性一般来说是异步冗余的,而强一致性是同步冗余的,异步处理带来了更好的性能,但也需要处理数据的补偿。同步意味着简单,但也必然会降低系统的性能。
二、理论

上述说的数据一致性问题,其实也就是在说分布式事务的问题,现在有一些解决方案,相信大家多多少少都看到过,这里带大家回顾下。
2.1、二阶段提交

2PC是一种强一致性设计方案,通过引入一个事务协调器来协调各个本地事务(也称为事务参与者)的提交和回滚。
2PC主要分为2个阶段:
1、第一阶段:事务协调器会向每个事务参与者发起一个开启事务的命令,每个事务参与者执行准备操作,然后再向事务协调器回复是否准备完成。但是不会提交本地事务,但是这个阶段资源是需要被锁住的。
2、第二阶段:事务协调器收到每个事务参与者的回复后,统计每个参与者的回复,如果每个参与者都回复“可以提交”,那么事务协调器会发送提交命令,参与者正式提交本地事务,释放所有资源,结束全局事务。但是有一个参与者回复“拒绝提交”,那么事务协调器发送回滚命令,所有参与者都回滚本地事务,待全部回滚完成,释放资源,取消全局事务。
事务提交流程

事务回滚流程

当然2PC存在的问题这里也提一下,一个是同步阻塞,这个会消耗性能。另一个是协调器故障问题,一旦协调器发生故障,那么所有的参与者处理资源锁定状态,那么所有参与者都会被阻塞。
2.2、三阶段提交

3PC主要是在2PC的基础上做了改进,主要为了解决2PC的阻塞问题。它主要是将2PC的第一阶段分为2个步骤,先准备,再锁定资源,并且引入了超时机制(这也意味着会造成数据不一致)。3PC的三个阶段包括:CanCommit、PreCommit 和 DoCommit
具体细节就不展开赘述了,就一个核心观点:在CanCommit的时候并不锁定资源,除非所有参与者都同意了,才开始锁资源
2.3、TCC柔性事务

相比较前面的2PC和3PC,TCC和那哥俩的本质区别就是它是业务层面的分布式事务,而2PC和3PC是数据库层面的。TCC是三个单词的缩写:Try、Confirm、Cancel,也分为这三个流程。
Try:尝试,即尝试预留资源,锁定资源
Confirm:确认,即执行预留的资源,如果执行失败会重试
Cancel:取消,撤销预留的资源,如果执行失败会重试

从上图可知,TCC对于业务的侵入是很大的,而且紧紧的耦合在一起。TCC相比较2PC和3PC,试用范围更广,可实现跨库,跨不同系统去实现分布式事务。缺点是要在业务代码中去开发大量的逻辑实现这三个步骤,需要和代码耦合在一起,提高开发成本。
事务日志:在TCC模式中,事务发起者和事务参与者都会去记录事务日志(事务状态、信息等)。这个事务日志是整个分布式事务出现意外情况(宕机、重启、网络中断等),实现提交和回滚的关键。
幂等性:在TCC第二阶段,confirm或者cancel的时候,这两个操作都需要保证幂等性。一旦由于网络等原因导致执行失败,就会发起不断重试。
防悬挂:由于网络的不可靠性,有异常情况的时候,try请求可能比cancel请求更晚到达。cancel可能会执行空回滚,但是try请求被执行的时候也不会预留资源。
2.4、Seata

关于seata这里就不多提了,用的最多的是AT模式,上回知了逐步分析过,配置完后只需要在事务发起的方法上添加@GlobalTransactional注解就可以开启全局事务,对于业务无侵入,低耦合。感兴趣的话请参考之前讨论Seata的内容。
三、应用场景

知了之前在一家公司遇到过这样的业务场景;用户通过页面投保,提交一笔订单过来,这个订单通过上游服务,处理保单相关的业务逻辑,最后流入下游服务,处理业绩、人员晋升、分润处理等等业务。对于这个场景,两边处理的业务逻辑不在同一个服务中,接入的是不同的数据库。涉及到数据一致性问题,需要用到分布式事务。
对于上面介绍的几种方案,只是讨论了理论和思路,下面我来总结下这个业务场景中运用的一种实现方案。采用了本地消息表+MQ异步消息的方案实现了事务最终一致性,也符合当时的业务场景,相对强一致性,实现的性能较高。下面是该方案的思路图


  • 真实业务处理的状态可能会有多种,因此需要明确哪种状态需要定时任务补偿
  • 假如某条单据一直无法处理结束,定时任务也不能无限制下发,所以本地消息表需要增加轮次的概念,重试多少次后告警,人工介入处理
  • 因为MQ和定时任务的存在,难免会出现重复请求,因此下游要做好幂等防重,否则会出现重复数据,导致数据不一致
对于落地实现,话不多说,直接上代码。先定义两张表tb_order和tb_notice_message,分别存订单信息和本地事务信息
  1. CREATE TABLE `tb_order` (
  2.   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
  3.   `user_id` int(11) NOT NULL COMMENT '下单人id',
  4.   `order_no` varchar(255) CHARACTER SET latin1 NOT NULL COMMENT '订单编号',
  5.   `insurance_amount` decimal(16,2) NOT NULL COMMENT '保额',
  6.   `order_amount` decimal(16,2) DEFAULT NULL COMMENT '保费',
  7.   `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  8.   `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  9.   `is_delete` tinyint(4) DEFAULT '0' COMMENT '删除标识:0-不删除;1-删除',
  10.   PRIMARY KEY (`id`)
  11. ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
复制代码
  1. CREATE TABLE `tb_notice_message` (
  2.   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
  3.   `type` tinyint(4) NOT NULL COMMENT '业务类型:1-下单',
  4.   `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '状态:1-待处理,2-已处理,3-预警',
  5.   `data` varchar(255) NOT NULL COMMENT '信息',
  6.   `retry_count` tinyint(4) DEFAULT '0' COMMENT '重试次数',
  7.   `create_time` datetime NOT NULL COMMENT '创建时间',
  8.   `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  9.   `is_delete` tinyint(4) NOT NULL DEFAULT '0' COMMENT '删除标识:0-不删除;1-删除',
  10.   PRIMARY KEY (`id`)
  11. ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
复制代码
处理订单service,这里可以用到我们之前说过的装饰器模式,去装饰这个service。把保存本地事务,发送mq消息,交给装饰器类去做,而service只需要关心业务逻辑即可,也符合开闭原则
  1. /**
  2. * @author 往事如风
  3. * @version 1.0
  4. * @date 2022/12/13 10:58
  5. * @description
  6. */
  7. @Service
  8. @Slf4j
  9. @AllArgsConstructor
  10. public class OrderService implements BaseHandler<Object, Order> {
  11.     private final OrderMapper orderMapper;
  12.     /**
  13.      * 订单处理方法:只处理订单关联逻辑
  14.      * @param o
  15.      * @return
  16.      */
  17.     @Override
  18.     public Order handle(Object o) {
  19.         // 订单信息
  20.         Order order = Order.builder()
  21.                 .orderNo("2345678")
  22.                 .createTime(LocalDateTime.now())
  23.                 .userId(1)
  24.                 .insuranceAmount(new BigDecimal(2000000))
  25.                 .orderAmount(new BigDecimal(5000))
  26.                 .build();
  27.         orderMapper.insert(order);
  28.         return order;
  29.     }
  30. }
复制代码
新增OrderService的装饰类OrderServiceDecorate,负责对订单逻辑的扩展,这里是添加本地事务消息,以及发送MQ信息,扩展方法添加了Transactional注解,确保订单逻辑和本地事务消息的数据在同一个事务中进行,确保原子性。其中事务消息标记处理中,待下游服务处理完业务逻辑,再更新处理完成。
  1. /**
  2. * @author 往事如风
  3. * @version 1.0
  4. * @date 2022/12/14 18:48
  5. * @description
  6. */
  7. @Slf4j
  8. @AllArgsConstructor
  9. @Decorate(scene = SceneConstants.ORDER, type = DecorateConstants.CREATE_ORDER)
  10. public class OrderServiceDecorate extends AbstractHandler {
  11.     private final NoticeMessageMapper noticeMessageMapper;
  12.     private final RabbitTemplate rabbitTemplate;
  13.     /**
  14.      * 装饰方法:对订单处理逻辑进行扩展
  15.      * @param o
  16.      * @return
  17.      */
  18.     @Override
  19.     @Transactional
  20.     public Object handle(Object o) {
  21.         // 调用service方法,实现保单逻辑
  22.         Order order = (Order) service.handle(o);
  23.         // 扩展:1、保存事务消息,2、发送MQ消息
  24.         // 本地事务消息
  25.         String data = "{"orderNo":"2345678", "userId":1, "insuranceAmount":2000000, "orderAmount":5000}";
  26.         NoticeMessage noticeMessage = NoticeMessage.builder()
  27.                 .retryCount(0)
  28.                 .data(data)
  29.                 .status(1)
  30.                 .type(1)
  31.                 .createTime(LocalDateTime.now())
  32.                 .build();
  33.         noticeMessageMapper.insert(noticeMessage);
  34.         // 发送mq消息
  35.         log.info("发送mq消息....");
  36.         rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
  37.         return null;
  38.     }
  39. }
复制代码
关于这个装饰者模式,之前有讲到过,可以看下之前发布的内容。
下游服务监听消息,处理完自己的业务逻辑后(如:业绩、分润、晋升等),需要发送MQ,上游服务监听消息,更新本地事务状态为已处理。这需要注意的是下游服务需要做幂等处理,防止异常情况下,上游服务数据的重试。
  1. /**
  2. * @author 往事如风
  3. * @version 1.0
  4. * @date 2022/12/13 18:07
  5. * @description
  6. */
  7. @Component
  8. @Slf4j
  9. @RabbitListener(queues = "trans.queue")
  10. public class FenRunListener {
  11.     @Autowired
  12.     private RabbitTemplate rabbitTemplate;
  13.     @RabbitHandler
  14.     public void orderHandler(String msg) {
  15.         log.info("监听到订单消息:{}", msg);
  16.         // 需要注意幂等,幂等逻辑
  17.         log.info("下游服务业务逻辑。。。。。");
  18.         JSONObject json = JSONUtil.parseObj(msg);
  19.         rabbitTemplate.convertAndSend("trans", "trans.update.order.queue.key", json.getInt("id"));
  20.     }
  21. }
复制代码
这里插个题外话,关于幂等的处理,我这里大致有两种思路
1、比如根据订单号查一下记录是否存在,存在就直接返回成功。
2、redis存一个唯一的请求号,处理完再删除,不存在请求号的直接返回成功,可以写个AOP去处理,与业务隔离。
言归正传,上游服务消息监听,下游发送MQ消息,更新本地事务消息为已处理,分布式事务流程结束。
  1. /**
  2. * @author 往事如风
  3. * @version 1.0
  4. * @date 2022/12/13 18:29
  5. * @description
  6. */
  7. @Component
  8. @Slf4j
  9. @RabbitListener(queues = "trans.update.order.queue")
  10. public class OrderListener {
  11.     @Autowired
  12.     private NoticeMessageMapper noticeMessageMapper;
  13.     @RabbitHandler
  14.     public void updateOrder(Integer msgId) {
  15.         log.info("监听消息,更新本地事务消息,消息id:{}", msgId);
  16.         NoticeMessage msg = NoticeMessage.builder().status(2).id(msgId).updateTime(LocalDateTime.now()).build();
  17.         noticeMessageMapper.updateById(msg);
  18.     }
  19. }
复制代码
存在异常情况时,会通过定时任务,轮询的往MQ中发送消息,尽最大努力去让下游服务达到数据一致,当然重试也要设置上限;若达到上限以后还一直是失败,那不得不考虑是下游服务自身存在问题了(有可能就是代码逻辑存在问题)。
  1. /**
  2. * @author 往事如风
  3. * @version 1.0
  4. * @date 2022/12/14 10:25
  5. * @description
  6. */
  7. @Configuration
  8. @EnableScheduling
  9. @AllArgsConstructor
  10. @Slf4j
  11. public class RetryOrderJob {
  12.     private final RabbitTemplate rabbitTemplate;
  13.     private final NoticeMessageMapper noticeMessageMapper;
  14.     /**
  15.      * 最大自动重试次数
  16.      */
  17.     private final Integer MAX_RETRY_COUNT = 5;
  18.     @Scheduled(cron = "0/20 * * * * ? ")
  19.     public void retry() {
  20.         log.info("定时任务,重试异常订单");
  21.         LambdaQueryWrapper<NoticeMessage> wrapper = Wrappers.lambdaQuery(NoticeMessage.class);
  22.         wrapper.eq(NoticeMessage::getStatus, 1);
  23.         List<NoticeMessage> noticeMessages = noticeMessageMapper.selectList(wrapper);
  24.         for (NoticeMessage noticeMessage : noticeMessages) {
  25.             // 重新发送mq消息
  26.             rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
  27.             // 重试次数+1
  28.             noticeMessage.setRetryCount(noticeMessage.getRetryCount() + 1);
  29.             noticeMessageMapper.updateById(noticeMessage);
  30.             // 判断重试次数,等于最长限制次数,直接更新为报警状态
  31.             if (MAX_RETRY_COUNT.equals(noticeMessage.getRetryCount())) {
  32.                 noticeMessage.setStatus(3);
  33.                 noticeMessageMapper.updateById(noticeMessage);
  34.                 // 发送告警,通知对应人员
  35.                 // 告警逻辑(短信、邮件、企微群,等等)....
  36.             }
  37.         }
  38.     }
  39. }
复制代码
其实这里有个问题,一个上游服务对应多个下游服务的时候。这个时候往往不能存一条本地消息记录。

  • 这里可以在消息表多加个字段next_server_count,表示一个订单发起方,需要调用的下游服务数量。上游服务监听的时候,每次会与下游的回调都减去1,直到数值是0的时候,再更新状态是已处理。但是要控制并发,这个字段是被多个下游服务共享的。
  • 还有一种处理方案是为每个下游服务,都记录一条事务消息,用type字段去区分,标记类型。实现上游和下游对于事务消息的一对一关系。
  • 最后,达到最大重试次数以后,可以将消息加入到一个告警列表,这个告警列表可以展示在管理后台或其他监控系统中,展示一些必要的信息,去供公司内部人员去人工介入,处理这种异常的数据,使得数据达到最终一致性。
四、总结

其实分布式事务没有一个完美的处理方案,只能说是尽量去满足业务需求,满足数据一致。如果程序不能处理了,最后由人工去兜底,做数据的补偿方案。
五、参考源码
  1. 编程文档:
  2. https://gitee.com/cicadasmile/butte-java-note
  3. 应用仓库:
  4. https://gitee.com/cicadasmile/butte-flyer-parent
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

农妇山泉一亩田

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表