分布式事务
一、分布式事务基础
什么是事务?
事务指的就是一个操作单元,在这个操作单元中的所有操作最终要保持一致的行为,要么所有操作都成功,要么所有的操作都被撤销。简单地说,事务提供一种“要么什么都不做,要么做全套”机制
本地事物
本地事物其实可以认为是数据库提供的事务机制。说到数据库事务就不得不说,数据库事务中的四
大特性:
- A:原子性(Atomicity),一个事务中的所有操作,要么全部完成,要么全-部不完成
- C:一致性(Consistency),在一个事务执行之前和执行之后数据库都必须处于一致性状态
- I:隔离性(Isolation),在并发环境中,当不同的事务同时操作相同的数据时,事务之间互不影响
- D:持久性(Durability),指的是只要事务成功结束,它对数据库所做的更新就必须永久的保存下来
数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,该执行单元中的所有操作要么都成功,要么都失败,只要其中任一操作执行失败,都将导致整个事务的回滚
分布式事务
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。
简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。
本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
分布式事务的场景
一个服务需要调用多个数据库实例完成数据的增删改操作

多个服务需要调用一个数据库实例完成数据的增删改操作

多个服务需要调用一个数据库实例完成数据的增删改操作

二、分布式事务解决方案
全局事务
全局事务基于DTP模型实现。DTP是由X/Open组织提出的一种分布式事务模型——X/Open Distributed Transaction Processing Reference Model。它规定了要实现分布式事务,需要三种⻆色:
- AP: Application 应用系统 (微服务)
- TM: Transaction Manager 事务管理器 (全局事务管理)
- RM: Resource Manager 资源管理器 (数据库)
整个事务分成两个阶段:
- 阶段一: 表决阶段,所有参与者都将本事务执行预提交,并将能否成功的信息反馈发给协调者。
- 阶段二: 执行阶段,协调者根据所有参与者的反馈,通知所有参与者,步调一致地执行提交或者回滚。

优点
缺点
- 单点问题: 事务协调者宕机
- 同步阻塞: 延迟了提交时间,加⻓了资源阻塞时间
- 数据不一致: 提交第二阶段,依然存在commit结果未知的情况,有可能导致数据不一致
可靠消息服务
基于可靠消息服务的方案是通过消息中间件保证上、下游应用数据操作的一致性。假设有A和B两个系统,分别可以处理任务A和任务B。此时存在一个业务流程,需要将任务A和任务B在同一个事务中处理。就可以使用消息中间件来实现这种分布式事务。
RocketMQ事务消息流程图

1)事务消息发送及提交
(1) 发送消息(half消息)
(2) 服务端响应消息写入结果
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可⻅,本地逻辑不执行)
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生产消息索引,消息对消费者可⻅)
2) 事务补偿
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对于的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用户解决消息Commit或者Rollback发生超时或者失效的情况
3) 事务消息状态
事务消息共有三种状态,提交状态,回查状态,中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息
- TransactionStatus.RollbackTransaction: 回滚事务,它代表消息将被删除,不允许被消费
- TransactionStatus.Unknown: 中间状态,它代表需要消息队列来确认状态
消息生产者实现
发送代码如下:
[code]Message message =MessageBuilder.withPayload(vo).setHeader("orderNo",orderNo).build();TransactionSendResult sendResult =rocketMQTemplate.sendMessageInTransaction("tx_group", "tx_topic", message,orderNo);String sendStatus = sendResult.getSendStatus().name();String localTXState = sendResult.getLocalTransactionState().name();og.info(">>>> send status={},localTransactionState={} |