分布式事务 | 使用DTM 的Saga 模式

打印 上一主题 下一主题

主题 508|帖子 508|积分 1524

分布式事务系列文章
分布式事务 | 使用DTM 的Saga 模式
分布式事务 | 使用 dotnetcore/CAP 的本地消息表模式
分布式事务 | 基于MassTransit的StateMachine实现Saga编排式分布式事务
分布式事务 | 基于MassTransit Courier实现Saga 编排式分布式事务
DTM 简介

前面章节提及的MassTransit、dotnetcore/CAP都提供了分布式事务的处理能力,但也仅局限于Saga和本地消息表模式的实现。那有没有一个独立的分布式事务解决方案,涵盖多种分布式事务处理模式,如Saga、TCC、XA模式等。有,目前业界主要有两种开源方案,其一是阿里开源的Seata,另一个就是DTM。其中Seata仅支持Java、Go和Python语言,因此不在.NET 的选择范围。DTM则通过提供简单易用的HTTP和gRPC接口,屏蔽了语言的无关性,因此支持任何开发语言接入,目前提供了Go、Python、NodeJs、Ruby、Java和C#等语言的SDK。
DTM,全称Distributed Transaction Manager,是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题。它提供了Saga、TCC、 XA和二阶段消息模式以满足不同应用场景的需求,同时其首创的子事务屏障技术可以有效解决幂等、悬挂和空补偿等异常问题。
DTM 事务处理过程及架构

那DTM是如何处理分布式事务的呢?以一个经典的跨行转账业务为例来看下事务处理过程。对于跨行转账业务而言,很显然是跨库跨服务的应用场景,不能简单通过本地事务解决,可以使用Saga模式,以下是基于DTM提供的Saga事务模式成功转账的的时序图:

从以上时序图可以看出,DTM整个全局事务分为如下几步:

  • 用户定义好全局事务所有的事务分支(全局事务的组成部分称为事务分支),然后提交给DTM,DTM持久化全局事务信息后,立即返回
  • DTM取出第一个事务分支,这里是TransOut,调用该服务并成功返回
  • DTM取出第二个事务分支,这里是TransIn,调用该服务并成功返回
  • DTM已完成所有的事务分支,将全局事务的状态修改为已完成
基于以上这个时序图的基础上,再来看下DTM的架构:

整个DTM架构中,一共有三个角色,分别承担了不同的职责:

  • RM-资源管理器:RM是一个应用服务,通常连接到独立的数据库,负责处理全局事务中的本地事务,执行相关数据的修改、提交、回滚、补偿等操作。例如在前面的这个Saga事务时序图中,步骤2、3中被调用的TransIn和TransOut方法所在的服务都是RM。
  • AP-应用程序:AP是一个应用服务,负责全局事务的编排,他会注册全局事务,注册子事务,调用RM接口。例如在前面的这个SAGA事务中,发起步骤1的是AP,它编排了一个包含TransOut、TransIn的全局事务,然后提交给TM
  • TM-事务管理器:TM就是DTM服务,负责全局事务的管理,作为一个独立的服务而存在。每个全局事务都注册到TM,每个事务分支也注册到TM。TM会协调所有的RM来执行不同的事务分支,并根据执行结果决定是否提交或回滚事务。例如在前面的Saga事务时序图中,TM在步骤2、3中调用了各个RM,在步骤4中,完成这个全局事务。
总体而言,AP-应用程序充当全局事务编排器的角色通过DTM提供的开箱即用的SDK进行全局事务和子事务的注册。TM-事务管理器接收到注册的全局事务和子事务后,负责调用RM-资源管理器来执行对应的事务分支,TM-事务管理器根据事务分支的执行结果决定是否提及或回滚事务。
快速上手

百闻不如一见,接下来就来实际上手体验下如何基于DTM来实际应用Saga进行分布式跨行转账事务的处理。
创建示例项目

接下来就来创建一个示例项目:

  • 使用dotnet new webapi -n DtmDemo.Webapi创建示例项目。
  • 添加Nuget包:Dtmcli 和Pomelo.EntityFrameworkCore.MySql。
  • 添加DTM配置项:
  1. {
  2.   "dtm": {
  3.     "DtmUrl": "http://localhost:36789",
  4.     "DtmTimeout": 10000,
  5.     "BranchTimeout": 10000,
  6.     "DBType": "mysql",
  7.     "BarrierTableName": "dtm_barrier.barrier",
  8.   }
  9. }
复制代码

  • 定义银行账户BankAccount实体类:
  1. namespace DtmDemo.WebApi.Models
  2. {
  3.     public class BankAccount
  4.     {
  5.         public int Id { get; set; }
  6.         public decimal Balance { get; set; }
  7.     }
  8. }
复制代码

  • 定义DtmDemoWebApiContext数据库上下文:
  1. using Microsoft.EntityFrameworkCore;
  2. namespace DtmDemo.WebApi.Data
  3. {
  4.     public class DtmDemoWebApiContext : DbContext
  5.     {
  6.         public DtmDemoWebApiContext (DbContextOptions<DtmDemoWebApiContext> options)
  7.             : base(options)
  8.         {
  9.         }
  10.         public DbSet<DtmDemo.WebApi.Models.BankAccount> BankAccount { get; set; } = default!;
  11.     }
  12. }
复制代码

  • 注册DbContext 和DTM服务:
  1. using Microsoft.EntityFrameworkCore;
  2. using DtmDemo.WebApi.Data;
  3. using Dtmcli;
  4. var builder = WebApplication.CreateBuilder(args);
  5. var connectionStr = builder.Configuration.GetConnectionString("DtmDemoWebApiContext");
  6. // 注册DbContext
  7. builder.Services.AddDbContext<DtmDemoWebApiContext>(options =>
  8. {
  9.     options.UseMySql(connectionStr, ServerVersion.AutoDetect(connectionStr));
  10. });
  11. // 注册DTM
  12. builder.Services.AddDtmcli(builder.Configuration, "dtm");
复制代码

  • 执行dotnet ef migrations add 'Initial'  创建迁移。
  • 为便于初始化演示数据,定义BankAccountController如下,其中PostBankAccount接口添加了await _context.Database.MigrateAsync();用于自动应用迁移。
  1. using Microsoft.AspNetCore.Mvc;
  2. using Microsoft.EntityFrameworkCore;
  3. using DtmDemo.WebApi.Data;
  4. using DtmDemo.WebApi.Models;
  5. using Dtmcli;
  6. namespace DtmDemo.WebApi.Controllers
  7. {
  8.     [Route("api/[controller]")]
  9.     [ApiController]
  10.     public class BankAccountsController : ControllerBase
  11.     {
  12.         private readonly DtmDemoWebApiContext _context;
  13.         public BankAccountsController(DtmDemoWebApiContext context)
  14.         {
  15.             _context = context;
  16.         }
  17.                 [HttpGet]
  18.         public async Task<ActionResult<IEnumerable<BankAccount>>> GetBankAccount()
  19.         {
  20.             return await _context.BankAccount.ToListAsync();
  21.         }
  22.         [HttpPost]
  23.         public async Task<ActionResult<BankAccount>> PostBankAccount(BankAccount bankAccount)
  24.         {
  25.             await _context.Database.MigrateAsync();
  26.             _context.BankAccount.Add(bankAccount);
  27.             await _context.SaveChangesAsync();
  28.             return Ok(bankAccount);
  29.         }
  30. }
复制代码
应用Saga模式

接下来定义SagaDemoController来使用DTM的Saga模式来模拟跨行转账分布式事务:
  1. using Microsoft.AspNetCore.Mvc;
  2. using Microsoft.EntityFrameworkCore;
  3. using DtmDemo.WebApi.Data;
  4. using DtmDemo.WebApi.Models;
  5. using Dtmcli;
  6. using DtmCommon;
  7. namespace DtmDemo.WebApi.Controllers
  8. {
  9.     [Route("api/[controller]")]
  10.     [ApiController]
  11.     public class SagaDemoController : ControllerBase
  12.     {
  13.         private readonly DtmDemoWebApiContext _context;
  14.         private readonly IConfiguration _configuration;
  15.         private readonly IDtmClient _dtmClient;
  16.         private readonly IDtmTransFactory _transFactory;
  17.         private readonly IBranchBarrierFactory _barrierFactory;
  18.         private readonly ILogger<BankAccountsController> _logger;
  19.         public SagaDemoController(DtmDemoWebApiContext context, IConfiguration configuration, IDtmClient dtmClient, IDtmTransFactory transFactory, ILogger<BankAccountsController> logger, IBranchBarrierFactory barrierFactory)
  20.         {
  21.             this._context = context;
  22.             this._configuration = configuration;
  23.             this._dtmClient = dtmClient;
  24.             this._transFactory = transFactory;
  25.             this._logger = logger;
  26.             this._barrierFactory = barrierFactory;
  27.         }
  28. }
复制代码
对于跨行转账业务,使用DTM的Saga模式,首先要进行事务拆分,可以拆分为以下4个子事务,并分别实现:
转出子事务(TransferOut)
  1.     [HttpPost("TransferOut")]
  2.     public async Task<IActionResult> TransferOut([FromBody] TransferRequest request)
  3.     {
  4.         var msg = $"用户{request.UserId}转出{request.Amount}元";
  5.         _logger.LogInformation($"转出子事务-启动:{msg}");
  6.         // 1. 创建子事务屏障
  7.         var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
  8.         try
  9.         {
  10.             using (var conn = _context.Database.GetDbConnection())
  11.             {
  12.                 // 2. 在子事务屏障内执行事务操作
  13.                 await branchBarrier.Call(conn, async (tx) =>
  14.                 {
  15.                     _logger.LogInformation($"转出子事务-执行:{msg}");
  16.                     await _context.Database.UseTransactionAsync(tx);
  17.                     var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
  18.                     if (bankAccount == null || bankAccount.Balance < request.Amount)
  19.                         throw new InvalidDataException("账户不存在或余额不足!");
  20.                     bankAccount.Balance -= request.Amount;
  21.                     await _context.SaveChangesAsync();
  22.                 });
  23.             }
  24.         }
  25.         catch (InvalidDataException ex)
  26.         {
  27.             _logger.LogInformation($"转出子事务-失败:{ex.Message}");
  28.             // 3. 按照接口协议,返回409,以表示子事务失败
  29.             return new StatusCodeResult(StatusCodes.Status409Conflict);
  30.         }
  31.         _logger.LogInformation($"转出子事务-成功:{msg}");
  32.         return Ok();
  33.     }
复制代码
以上代码中有几点需要额外注意:

  • 使用Saga模式,必须开启子事务屏障:_barrierFactory.CreateBranchBarrier(Request.Query),其中Request.Query中的参数由DTM 生成,类似:?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga,主要包含四个参数:

    • gid:全局事务Id
    • trans_type:事务类型,是saga、msg、xa或者是tcc。
    • branch_id:子事务的Id
    • op:当前操作,对于Saga事务模式,要么为action(正向操作),要么为compensate(补偿操作)。

  • 必须在子事务屏障内执行事务操作:branchBarrier.Call(conn, async (tx) =>{}
  • 对于Saga正向操作而言,业务上的失败与异常是需要做严格区分的,例如前面的余额不足,是业务上的失败,必须回滚。而对于网络抖动等其他外界原因导致的事务失败,属于业务异常,则需要重试。因此若因业务失败(这里是账户不存在或余额不足)而导致子事务失败,则必须通过抛异常的方式并返回**409**状态码以告知DTM 子事务失败。
  • 以上通过抛出异常的方式中断子事务执行并在外围捕获特定异常返回409状态码。在外围捕获异常时切忌放大异常捕获,比如直接catch(Exception),如此会捕获由于网络等其他原因导致的异常,而导致DTM 不再自动处理该异常,比如业务异常时的自动重试。
转出补偿子事务(TransferOut_Compensate)

转出补偿,就是回滚转出操作,进行账户余额归还,实现如下:
  1.     [HttpPost("TransferOut_Compensate")]
  2.     public async Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request)
  3.     {
  4.         var msg = $"用户{request.UserId}回滚转出{request.Amount}元";
  5.         _logger.LogInformation($"转出补偿子事务-启动:{msg}");
  6.         // 1. 创建子事务屏障
  7.         var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
  8.         using (var conn = _context.Database.GetDbConnection())
  9.         {
  10.             // 在子事务屏障内执行事务操作
  11.             await branchBarrier.Call(conn, async (tx) =>
  12.             {
  13.                 _logger.LogInformation($"转出补偿子事务-执行:{msg}");
  14.                 await _context.Database.UseTransactionAsync(tx);
  15.                 var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
  16.                 if (bankAccount == null)
  17.                     return; //对于补偿操作,可直接返回,中断后续操作
  18.                 bankAccount.Balance += request.Amount;
  19.                 await _context.SaveChangesAsync();
  20.             });
  21.         }
  22.         _logger.LogInformation($"转出补偿子事务-成功!");
  23.         // 2. 因补偿操作必须成功,所以必须返回200。
  24.         return Ok();
  25.     }
复制代码
由于DTM设计为总是执行补偿,也就是说即使正向操作子事务失败时,DTM 仍旧会执行补偿逻辑。但子事务屏障会在执行时判断正向操作的执行状态,当子事务失败时,并不会执行补偿逻辑。
另外DTM的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功因此在补偿子事务中,即使补偿子事务中出现业务失败时,也必须返回**200**因此当出现bankAccount==null时可以直接 return。
转入子事务(TransferIn)

转入子事务和转出子事务的实现基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并通过抛异常的方式并最终返回409状态码来显式告知DTM 子事务执行失败。
  1.     [HttpPost("TransferIn")]
  2.     public async Task<IActionResult> TransferIn([FromBody] TransferRequest request)
  3.     {
  4.         var msg = $"用户{request.UserId}转入{request.Amount}元";
  5.         _logger.LogInformation($"转入子事务-启动:{msg}");
  6.         var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
  7.         try
  8.         {
  9.             using (var conn = _context.Database.GetDbConnection())
  10.             {
  11.                 await branchBarrier.Call(conn, async (tx) =>
  12.                 {
  13.                     _logger.LogInformation($"转入子事务-执行:{msg}");
  14.                     await _context.Database.UseTransactionAsync(tx);
  15.                     var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
  16.                     if (bankAccount == null)
  17.                         throw new InvalidDataException("账户不存在!");
  18.                     bankAccount.Balance += request.Amount;
  19.                     await _context.SaveChangesAsync();
  20.                 });
  21.             }
  22.         }
  23.         catch (InvalidDataException ex)
  24.         {
  25.             _logger.LogInformation($"转入子事务-失败:{ex.Message}");
  26.             return new StatusCodeResult(StatusCodes.Status409Conflict);
  27.         }
  28.         _logger.LogInformation($"转入子事务-成功:{msg}");
  29.         return Ok();
  30.     }
复制代码
转入补偿子事务(TransferIn_Compensate)

转入补偿子事务和转出补偿子事务的实现也基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并最终返回200状态码来告知DTM 补偿子事务执行成功。
  1.     [HttpPost("TransferIn_Compensate")]
  2.     public async Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request)
  3.     {
  4.         var msg = "用户{request.UserId}回滚转入{request.Amount}元";
  5.         _logger.LogInformation($"转入补偿子事务-启动:{msg}");
  6.         var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
  7.         using (var conn = _context.Database.GetDbConnection())
  8.         {
  9.             await branchBarrier.Call(conn, async (tx) =>
  10.             {
  11.                 _logger.LogInformation($"转入补偿子事务-执行:{msg}");
  12.                 await _context.Database.UseTransactionAsync(tx);
  13.                 var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
  14.                 if (bankAccount == null) return;
  15.                 bankAccount.Balance -= request.Amount;
  16.                 await _context.SaveChangesAsync();
  17.             });
  18.         }
  19.         _logger.LogInformation($"转入补偿子事务-成功!");
  20.         return Ok();
  21.     }
复制代码
编排Saga事务

拆分完子事务,最后就可以进行Saga事务编排了,其代码如下所示:
  1.     [HttpPost("Transfer")]
  2.     public async Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,
  3.         CancellationToken cancellationToken)
  4.     {
  5.         try
  6.         {
  7.             _logger.LogInformation($"转账事务-启动:用户{fromUserId}转账{amount}元到用户{toUserId}");
  8.             //1. 生成全局事务ID
  9.             var gid = await _dtmClient.GenGid(cancellationToken);
  10.             var bizUrl = _configuration.GetValue<string>("TransferBaseURL");
  11.             //2. 创建Saga
  12.             var saga = _transFactory.NewSaga(gid);
  13.             //3. 添加子事务
  14.                 saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",
  15.                     new TransferRequest(fromUserId, amount))
  16.                 .Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",
  17.                     new TransferRequest(toUserId, amount))
  18.                 .EnableWaitResult(); // 4. 按需启用是否等待事务执行结果
  19.             //5. 提交Saga事务
  20.             await saga.Submit(cancellationToken);
  21.         }
  22.         catch (DtmException ex) // 6. 如果开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。
  23.         {
  24.             _logger.LogError($"转账事务-失败:用户{fromUserId}转账{amount}元到用户{toUserId}失败!");
  25.             return new BadRequestObjectResult($"转账失败:{ex.Message}");
  26.         }
  27.         _logger.LogError($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
  28.         return Ok($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
  29.     }
复制代码
主要步骤如下:

  • 生成全局事务Id:var gid =await _dtmClient.GenGid(cancellationToken);
  • 创建Saga全局事务:_transFactory.NewSaga(gid);
  • 添加子事务:saga.Add(string action, string compensate, object postData);包含正向和反向子事务。
  • 如果依赖事务执行结果,可通过EnableWaitResult()开启事务结果等待。
  • 提交Saga全局事务:saga.Submit(cancellationToken);
  • 若开启了事务结果等待,可以通过try...catch..来捕获DtmExcepiton异常来获取事务执行异常信息。
运行项目

既然DTM作为一个独立的服务存在,其负责通过HTTP或gRPC协议发起子事务的调用,因此首先需要启动一个DTM实例,又由于本项目依赖MySQL,因此我们采用Docker Compose的方式来启动项目。在Visual Studio中通过右键项目->Add->Docker Support->Linux 即可添加Dockerfile如下所示:
  1. FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
  2. WORKDIR /app
  3. EXPOSE 80
  4. EXPOSE 443
  5. FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
  6. WORKDIR /src
  7. COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"]
  8. RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj"
  9. COPY . .
  10. WORKDIR "/src/DtmDemo.WebApi"
  11. RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/build
  12. FROM build AS publish
  13. RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publish
  14. FROM base AS final
  15. WORKDIR /app
  16. COPY --from=publish /app/publish .
  17. ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"]
复制代码
在Visual Studio中通过右键项目->Add Container Orchestrator Support->Docker Compose即可添加docker-compose.yml,由于整个项目依赖mysql和DTM,修改docker-compose.yml如下所示,其中定义了三个服务:db,dtm和dtmdemo.webapi。
  1. version: '3.4'
  2. services:
  3.   db:
  4.     image: 'mysql:5.7'
  5.     container_name: dtm-mysql
  6.     environment:
  7.       MYSQL_ROOT_PASSWORD: 123456 # 指定MySQL初始密码
  8.     volumes:
  9.       - ./docker/mysql/scripts:/docker-entrypoint-initdb.d  # 挂载用于初始化数据库的脚本
  10.     ports:
  11.       - '3306:3306'
  12.   dtm:
  13.     depends_on: ["db"]
  14.     image: 'yedf/dtm:latest'
  15.     container_name: dtm-svc
  16.     environment:
  17.       IS_DOCKER: '1'
  18.       STORE_DRIVER: mysql  # 指定使用MySQL持久化DTM事务数据
  19.       STORE_HOST: db   # 指定MySQL服务名,这里是db
  20.       STORE_USER: root
  21.       STORE_PASSWORD: '123456'
  22.       STORE_PORT: 3306
  23.       STORE_DB: "dtm" # 指定DTM 数据库名
  24.     ports:
  25.       - '36789:36789' # DTM HTTP 端口
  26.       - '36790:36790' # DTM gRPC 端口
  27.   dtmdemo.webapi:
  28.     depends_on: ["dtm", "db"]
  29.     image: ${DOCKER_REGISTRY-}dtmdemowebapi
  30.     environment:
  31.       ASPNETCORE_ENVIRONMENT: docker # 设定启动环境为docker
  32.     container_name: dtm-webapi-demo
  33.     build:
  34.       context: .
  35.       dockerfile: DtmDemo.WebApi/Dockerfile
  36.     ports:
  37.       - '31293:80'   # 映射Demo:80端口到本地31293端口
  38.       - '31294:443'         # 映射Demo:443端口到本地31294端口
复制代码
其中dtmdemo.webapi服务通过ASPNETCORE_ENVIRONMENT: docker 指定启动环境为docker,因此需要在项目下添加appsettings.docker.json以配置应用参数:
  1. {
  2.   "ConnectionStrings": {
  3.     "DtmDemoWebApiContext": "Server=db;port=3306;database=dtm_barrier;user id=root;password=123456;AllowLoadLocalInfile=true"
  4.   },
  5.   "TransferBaseURL": "http://dtmdemo.webapi/api/SagaDemo",
  6.   "dtm": {
  7.     "DtmUrl": "http://dtm:36789",
  8.     "DtmTimeout": 10000,
  9.     "BranchTimeout": 10000,
  10.     "DBType": "mysql",
  11.     "BarrierTableName": "dtm_barrier.barrier"
  12.   }
  13. }
复制代码
另外db服务中通过volumes: ["./docker/mysql/scripts:/docker-entrypoint-initdb.d"]来挂载初始化脚本,以创建DTM依赖的MySQL 存储数据库dtm和示例项目使用子事务屏障需要的barrier数据表。脚本如下:
  1. CREATE DATABASE IF NOT EXISTS dtm
  2. /*!40100 DEFAULT CHARACTER SET utf8mb4 */
  3. ;
  4. drop table IF EXISTS dtm.trans_global;
  5. CREATE TABLE if not EXISTS dtm.trans_global (
  6.   `id` bigint(22) NOT NULL AUTO_INCREMENT,
  7.   `gid` varchar(128) NOT NULL COMMENT 'global transaction id',
  8.   `trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg',
  9.   `status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',
  10.   `query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow',
  11.   `protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc',
  12.   `create_time` datetime DEFAULT NULL,
  13.   `update_time` datetime DEFAULT NULL,
  14.   `finish_time` datetime DEFAULT NULL,
  15.   `rollback_time` datetime DEFAULT NULL,
  16.   `options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout',
  17.   `custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction',
  18.   `next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',
  19.   `next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',
  20.   `owner` varchar(128) not null default '' comment 'who is locking this trans',
  21.   `ext_data` TEXT comment 'result for this trans. currently used in workflow pattern',
  22.   `result` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
  23.   `rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
  24.   PRIMARY KEY (`id`),
  25.   UNIQUE KEY `gid` (`gid`),
  26.   key `owner`(`owner`),
  27.   key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
  28. ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
  29. drop table IF EXISTS dtm.trans_branch_op;
  30. CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
  31.   `id` bigint(22) NOT NULL AUTO_INCREMENT,
  32.   `gid` varchar(128) NOT NULL COMMENT 'global transaction id',
  33.   `url` varchar(1024) NOT NULL COMMENT 'the url of this op',
  34.   `data` TEXT COMMENT 'request body, depreceated',
  35.   `bin_data` BLOB COMMENT 'request body',
  36.   `branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID',
  37.   `op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',
  38.   `status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed',
  39.   `finish_time` datetime DEFAULT NULL,
  40.   `rollback_time` datetime DEFAULT NULL,
  41.   `create_time` datetime DEFAULT NULL,
  42.   `update_time` datetime DEFAULT NULL,
  43.   PRIMARY KEY (`id`),
  44.   UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
  45. ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
  46. drop table IF EXISTS dtm.kv;
  47. CREATE TABLE IF NOT EXISTS dtm.kv (
  48.   `id` bigint(22) NOT NULL AUTO_INCREMENT,
  49.   `cat` varchar(45) NOT NULL COMMENT 'the category of this data',
  50.   `k` varchar(128) NOT NULL,
  51.   `v` TEXT,
  52.   `version` bigint(22) default 1 COMMENT 'version of the value',
  53.   create_time datetime default NULL,
  54.   update_time datetime DEFAULT NULL,
  55.   PRIMARY KEY (`id`),
  56.   UNIQUE key `uniq_k`(`cat`, `k`)
  57. ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
复制代码
  1. create database if not exists dtm_barrier
  2. /*!40100 DEFAULT CHARACTER SET utf8mb4 */
  3. ;
  4. drop table if exists dtm_barrier.barrier;
  5. create table if not exists dtm_barrier.barrier(
  6.   id bigint(22) PRIMARY KEY AUTO_INCREMENT,
  7.   trans_type varchar(45) default '',
  8.   gid varchar(128) default '',
  9.   branch_id varchar(128) default '',
  10.   op varchar(45) default '',
  11.   barrier_id varchar(45) default '',
  12.   reason varchar(45) default '' comment 'the branch type who insert this record',
  13.   create_time datetime DEFAULT now(),
  14.   update_time datetime DEFAULT now(),
  15.   key(create_time),
  16.   key(update_time),
  17.   UNIQUE key(gid, branch_id, op, barrier_id)
  18. );
复制代码
准备完毕,即可通过点击Visual Studio工具栏的Docker Compose的启动按钮,启动后可以在Containers窗口看到启动了dtm-mysql、dtm-svc和dtm-webapi-demo三个容器,并在浏览器中打开了 http://localhost:31293/swagger/index.html Swagger 网页。该种方式启动项目是支持断点调试项目,如下图所示:

通过BankAccouts控制器的POST接口,初始化用户1和用户2各100元。再通过SagaDemo控制器的/api/Transfer接口,进行Saga事务测试。

  • 用户1转账10元到用户2
由于用户1和用户2已存在,且用户1余额足够, 因此该笔转账合法因此会成功,其执行路径为:转出(成功)->转入(成功)-> 事务完成,执行日志如下图所示:


  • 用户3转账10元到用户1
由于用户3不存在,因此执行路径为:转出(失败)->转出补偿(成功)->事务完成。从下图的执行日志可以看出,转出子事务失败,还是会调用对应的转出补偿操作,但子事务屏障会过进行过滤,因此实际上并不会执行真正的转出补偿逻辑,其中红线框住的部分就是证明。


  • 用户1转账10元到用户3
由于用户3不存在,因此执行路径为:转出(成功)->转入(失败)->转入补偿(成功)->转出补偿(成功)->事务完成。从下图的执行日志可以看出,转入子事务失败,还是会调用对应的转入补偿操作,但子事务屏障会过进行过滤,因此实际上并不会执行真正的转入补偿逻辑,其中红线框住的部分就是证明。

子事务屏障

在以上的示例中,重复提及子事务屏障,那子事务屏障具体是什么,这里有必要重点说明下。以上面用户1转账10元到用户3为例,整个事务流转过程中,即转出(成功)->转入(失败)->转入补偿(成功)->转出补偿(成功)->事务完成。
在提交事务之后,首先是全局事务的落库,主要由DTM 服务负责,主要包括两张表:trans_global和trans_branch_op,DTM 依此进行子事务分支的协调。其中trans_global会插入一条全局事务记录,用于记录全局事务的状态信息,如下图1所示。trans_branch_op表为trans_global的子表,记录四条子事务分支数据,如下图2所示:


具体的服务再接收到来自Dtm的子事务分支调用时,每次都会往子事务屏障表barrier中插入一条数据,如下图所示。业务服务就是依赖此表来完成子事务的控制。

而子事务屏障的核心就是子事务屏障表唯一键的设计,以gid、branch_id、op和barrier_id为唯一索引,利用唯一索引,“以改代查”来避免竞态条件。在跨行转账的Saga示例中,子事务分支的执行步骤如下所示:

  • 开启本地事务
  • 对于当前操作op(action|compensate),使用inster ignore into barrier(trans_type, gid, branch_id, op, barrier_id, reason)向子事务屏障表插入一条数据,有几种情况:

    • 插入成功且影响条数大于0,则继续向下执行。
    • 插入成功但影响条数等于0,说明触发唯一键约束,此时会进行空补偿、悬挂和重复请求判断,若是则直接返回,跳过后续子事务分支逻辑的执行。

  • 第2步插入成功,则可以继续执行子事务分支逻辑,执行业务数据表操作,结果分两种请求

    • 子事务成功,子事务屏障表操作和业务数据表操作由于共享同一个本地事务,提交本地事务,因此可实现强一致性,当前子事务分支完成。
    • 子事务失败,回滚本地事务

每个子事务分支通过以上步骤,即可实现下图的效果:

小结

本文主要介绍了DTM的Saga模式的应用,基于DTM 首创的子事务屏障技术,使得开发者基于DTM 提供的SDK能够轻松开发出更可靠的分布式应用,彻底将开发人员从网络异常的处理中解放出来,再也不用担心空补偿、防悬挂、幂等等分布式问题。如果要进行分布式事务框架的选型,DTM 将是不二之选。
关注我的公众号『微服务知多少』,我们微信不见不散。
阅罢此文,如果您觉得本文不错并有所收获,请【打赏】或【推荐】,也可【评论】留下您的问题或建议与我交流。你的支持是我不断创作和分享的不竭动力!
            作者:『圣杰』        出处:http://www.cnblogs.com/sheng-jie/        本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宝塔山

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表