Spring 多线程的事务处理

打印 上一主题 下一主题

主题 875|帖子 875|积分 2625

问题起因

Spring 的 JDBC 相关的依赖库已经提供了对 JDBC 类事务处理的统一解决方案,在正常情况下,我们只需要在需要添加事务的业务处理方法上加上 @Transactional 注解即可开启声明式的事务处理。这种方式在单线程的处理模式下都是可行的,这是因为 Spring 在对 @Transactional 注解的切面处理上通过一些 ThreaLocal 变量来绑定了事务的相关信息,因此在单线程的模式下能够很好地工作。
然而,由于与数据库的交互属于 IO 密集型的操作,在某些情况下,与数据库的交互次数可能会成为性能的瓶颈。在这种情况下,一种可行的优化方式便是通过多线程的方式来并行化与数据库的交互,因为在大部分的情况下,数据的查询之间属于相互独立的任务,因此使用多线程的方式来解决这一类性能问题在概念上来讲是可行的
如果通过多线程的方式来优化,随之而来的一个问题就是对于事务的处理,可能客户端希望在一个任务出现异常时就回滚整个事务,就像使用 @Transactional 注解的效果一样。但是很遗憾,在这种情况下,我们不能直接使用 @Transactional 来开启事务,因为多线程的处理导致预先绑定的事务信息无法被找到,因此需要寻找其它的解决方案
Spring 事务源码分析

本文将仅分析声明式的事务处理方式,同时不会分析有关切面的具体加载逻辑,因为我们的目标是通过分析源码来找到在多线程下事务的可行解决方案
@Transaction 对应的切面处理是在 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction 方法中处理的,由于大部分情况下我们使用的都是声明式的阻塞式事务处理形式,因此我们也只关心这部分的处理逻辑:
  1. protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
  2.                                          final InvocationCallback invocation) throws Throwable {
  3.    
  4.     /*
  5.             获取事务的相关属性,如需要回滚的异常,事务的传播方式等
  6.     */
  7.     TransactionAttributeSource tas = getTransactionAttributeSource();
  8.     final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  9.    
  10.     /*
  11.             根据当前所处的环境,决定具体要采用何种事务管理对象类型,对于 JDBC 事务来说,
  12.             该类型为 DataSourceTransactionManager
  13.     */
  14.     final TransactionManager tm = determineTransactionManager(txAttr);
  15.    
  16.     // 省略响应式事务的处理逻辑。。。。。。。
  17.     PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
  18.     final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  19.     if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
  20.         // 检查是否需要开启事务,这里是我们重点分析的一个地方
  21.         TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
  22.         Object retVal;
  23.         try {
  24.             // 实际业务方法的处理
  25.             retVal = invocation.proceedWithInvocation();
  26.         }
  27.         catch (Throwable ex) {
  28.             // 事务的回滚处理
  29.             completeTransactionAfterThrowing(txInfo, ex);
  30.             throw ex;
  31.         }
  32.         finally {
  33.             cleanupTransactionInfo(txInfo);
  34.         }
  35.         // 省略部分代码。。。。。
  36.         
  37.         // 提交事务
  38.         commitTransactionAfterReturning(txInfo);
  39.         return retVal;
  40.     }
  41.     // 省略编程式的事务处理逻辑 。。。。。
  42. }
复制代码
接下来我们具体分析一下对于创建事务的有关处理:
  1. protected TransactionInfo
  2.     createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
  3.                                  @Nullable TransactionAttribute txAttr,
  4.                                  final String joinpointIdentification) {
  5.    
  6.     TransactionStatus status = null;
  7.     if (txAttr != null) {
  8.         if (tm != null) {
  9.             // 实际创建事务的处理
  10.             status = tm.getTransaction(txAttr);
  11.         }
  12.         else {
  13.             // 省略部分代码
  14.         }
  15.     }
  16.     // 针对事务的准备工作,目的是为了将事务信息绑定到当前的线程
  17.     return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  18. }
复制代码
通过上面的分析,我们可以知道当前事务传里对象为 DataSourceTransactionManager,我们继续查看它对创建事务的处理过程:
  1. public final TransactionStatus
  2.     getTransaction(@Nullable TransactionDefinition definition)
  3.     throws TransactionException {
  4.     // Use defaults if no transaction definition given.
  5.     TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
  6.     // 获取与当前线程绑定的事务信息
  7.     Object transaction = doGetTransaction();
  8.     boolean debugEnabled = logger.isDebugEnabled();
  9.     /*
  10.             检查当前执行过程中是否已经存在事务,如果存在,那么需要通过对 TransactionDefinition 的 propagationBehavior
  11.             来对当前的执行做出合适的事务处理形式
  12.     */
  13.     if (isExistingTransaction(transaction)) {
  14.         // Existing transaction found -> check propagation behavior to find out how to behave.
  15.         return handleExistingTransaction(def, transaction, debugEnabled);
  16.     }
  17.     // Check definition settings for new transaction.
  18.     if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
  19.         throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
  20.     }
  21.     // No existing transaction found -> check propagation behavior to find out how to proceed.
  22.     if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
  23.         throw new IllegalTransactionStateException(
  24.             "No existing transaction found for transaction marked with propagation 'mandatory'");
  25.     }
  26.     else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
  27.              def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
  28.              def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  29.         SuspendedResourcesHolder suspendedResources = suspend(null);
  30.         if (debugEnabled) {
  31.             logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
  32.         }
  33.         try {
  34.             /*
  35.                     由于没有检测到事务,因此需要开启一个事务来支持事务的有关操作
  36.             */
  37.             return startTransaction(def, transaction, debugEnabled, suspendedResources);
  38.         }
  39.         catch (RuntimeException | Error ex) {
  40.             resume(null, suspendedResources);
  41.             throw ex;
  42.         }
  43.     }
  44.     else {
  45.         boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  46.         return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
  47.     }
  48. }
复制代码
继续分析获取事务的代码,可以看到实际上最终是通过 TransactionSynchronizationManager 来检查绑定的事务信息的
  1. // 获取当前绑定的事务
  2. protected Object doGetTransaction() {
  3.     DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  4.     txObject.setSavepointAllowed(isNestedTransactionAllowed());
  5.     /*
  6.             获取与当前线程绑定的事务信息
  7.     */
  8.     ConnectionHolder conHolder =
  9.         (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
  10.     txObject.setConnectionHolder(conHolder, false);
  11.     return txObject;
  12. }
  13. /*
  14.         检查当前线程持有的事务是否是有效的,如果在 doGetTransaction 方法中 TransactionSynchronizationManager
  15.         中能够检测到对应的 ConnectionHolder 并且是活跃的,则说明确实在方法执行前就已经存了事务
  16. */
  17. protected boolean isExistingTransaction(Object transaction) {
  18.     DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  19.     return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
  20. }
  21. // JdbcTransactionObjectSupport
  22. public boolean hasConnectionHolder() {
  23.     return (this.connectionHolder != null);
  24. }
复制代码
如果当前已经存在了事务,需要针对事务传播行为对当前的事务做出对应的行为:
  1. private TransactionStatus handleExistingTransaction(
  2.     TransactionDefinition definition,
  3.     Object transaction,
  4.     boolean debugEnabled)
  5.     throws TransactionException {
  6.     /*
  7.             由于默认的是传播行为是 PROPAGATION_REQUIRED,即默认将当前的处理加入到当前已经存在的事务中,
  8.             我们主要关心这部分内容,因此省略了其它传播行为的处理
  9.     */
  10.     boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  11.     return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
  12. }
复制代码
继续查看对于当前事务的处理:
  1. // 简单地标记一下当前的事务状态
  2. protected final DefaultTransactionStatus prepareTransactionStatus(
  3.     TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
  4.     boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
  5.    
  6.     DefaultTransactionStatus status = newTransactionStatus(
  7.         definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
  8.     prepareSynchronization(status, definition);
  9.     return status;
  10. }
  11. // TransactionSynchronizationManager 线程绑定对象的部分处理,不是特别重要
  12. protected void prepareSynchronization(DefaultTransactionStatus status,
  13.                                       TransactionDefinition definition) {
  14.     if (status.isNewSynchronization()) {
  15.         TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
  16.         TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
  17.             definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
  18.             definition.getIsolationLevel() : null);
  19.         TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
  20.         TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
  21.         TransactionSynchronizationManager.initSynchronization();
  22.     }
  23. }
复制代码
接下来我们继续查看对于启动事务时的有关处理:
  1. // 开启一个新的事务
  2. private TransactionStatus startTransaction(TransactionDefinition definition,
  3.                                            Object transaction,
  4.                                            boolean debugEnabled,
  5.                                            @Nullable SuspendedResourcesHolder suspendedResources) {
  6.     boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  7.     DefaultTransactionStatus status = newTransactionStatus(
  8.         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  9.     doBegin(transaction, definition);
  10.     prepareSynchronization(status, definition);
  11.     return status;
  12. }
  13. // 相当于开启事务的 BEGIN 语句
  14. protected void doBegin(Object transaction, TransactionDefinition definition) {
  15.     /*
  16.             由上文的 doGetTransaction 方法可知当前的 transaction 一定为 DataSourceTransactionObject 类型
  17.     */
  18.     DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  19.     Connection con = null;
  20.     try {
  21.         /*
  22.                 由于 TransactionSynchronizationManager.getResource(obtainDataSource()) 可能为 null,
  23.                 因此需要对这种情况进行处理,避免为 null
  24.         */
  25.         if (!txObject.hasConnectionHolder() ||
  26.             txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
  27.             Connection newCon = obtainDataSource().getConnection();
  28.             if (logger.isDebugEnabled()) {
  29.                 logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
  30.             }
  31.             txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
  32.         }
  33.         // 省略一些不是特别重要的数据库连接和准备的相关代码
  34.         /*
  35.                 到这里就比较关键了,结合上文 doGetTransaction 方法可知,这两者之间是对应的,只要 TransactionSynchronizationManager 绑定的资源对象一致,那么就能够检测到当前执行的事务信息
  36.         */
  37.         if (txObject.isNewConnectionHolder()) {
  38.             TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
  39.         }
  40.     }
  41.     catch (Throwable ex) {
  42.         if (txObject.isNewConnectionHolder()) {
  43.             DataSourceUtils.releaseConnection(con, obtainDataSource());
  44.             txObject.setConnectionHolder(null, false);
  45.         }
  46.         throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
  47.     }
  48. }
复制代码
通过对 doBegin 和 doGetTransaction 方法的分析,我们可以知道,只要 TransactionSynchronizationManager#getResource(Object) 能够拿到同一个线程绑定的资源对象,那么我们就可以将这两个操作合并到一个事务中,并且能够直接复用 Spring 现有的事务处理逻辑!
具体解决方案

通过对 Spring 事务处理的源码分析,我们可以知道,如果希望当前线程执行上下文能够检测到事务的存在,我们只要通过 TransactionSynchronizationManager 绑定一致的事务资源对象即可。为了方便,我们可以直接将线程执行的任务封装到一个新的任务类中,在这个任务类中绑定相关的事务资源即可,对应的任务类如下:
  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  4. import org.springframework.transaction.TransactionDefinition;
  5. import org.springframework.transaction.TransactionStatus;
  6. import org.springframework.transaction.support.DefaultTransactionDefinition;
  7. import org.springframework.transaction.support.TransactionSynchronizationManager;
  8. import java.util.concurrent.Callable;
  9. import java.util.concurrent.ExecutionException;
  10. /**
  11. * 用于 {@link DataSourceTransactionManager} 事务管理对应的任务线程,这个类的存在是为了使得
  12. * Spring 中的事务管理能够在多线程的环境下依旧能够有效地工作.
  13. * 对于要要执行的任务,可以将其封装成此任务对象,该任务对象在执行时将会绑定与 {@link  DataSourceTransactionManager#getResourceFactory()}
  14. * 对应的 {@link TransactionSynchronizationManager#getResourceMap()} 中关联的事务对象,以使得要执行的任务包含在已有的事务中
  15. * (至少能保证存在一种可行的方式能够得到父线程的所处事务上下文),从而使得当前待执行的任务能够被现有统领事务进行管理
  16. *
  17. * @see DataSourceTransactionManager
  18. * @see TransactionSynchronizationManager
  19. *@author lxh
  20. */
  21. public class DataSourceTransactionTask
  22.     implements Callable<TransactionStatus> {
  23.     private final static Logger log = LoggerFactory.getLogger(DataSourceTransactionTask.class);
  24.     /*
  25.         与 TransactionSynchronizationManager.resources 关联的事务属性对象的 Value 值,
  26.         在当前上下文中,为了保存与原有事务的完整性,这里的 resource 存储的是 DataSourceTransactionObject
  27.      */
  28.     private final Object resource;
  29.     // 当前 Spring 平台的事务管理对象
  30.     private final DataSourceTransactionManager txManager;
  31.     // 实际需要运行的任务
  32.     private final Runnable runnable;
  33.     // 与事务有关的描述信息
  34.     private final TransactionDefinition definition;
  35.     public DataSourceTransactionTask(Object resource,
  36.                                      DataSourceTransactionManager txManager,
  37.                                      Runnable runnable,
  38.                                      TransactionDefinition definition) {
  39.         this.resource = resource;
  40.         this.txManager = txManager;
  41.         this.runnable = runnable;
  42.         this.definition = definition;
  43.     }
  44.     @Override
  45.     public TransactionStatus call() {
  46.         // 通过源码分析可知,对于 JDBC 事务的处理,key 为对应的 DataSource 对象
  47.         Object key = txManager.getResourceFactory();
  48.         /*
  49.                 resource 是在启动这个线程之前就已经被主线程开启的事务对象,
  50.                 我们可以知道它实际上就是 DataSourceTransactionObject,我们将他绑定到
  51.                 当前线程,即可使得当前线程能够感知到这个事务的存在
  52.         */
  53.         TransactionSynchronizationManager.bindResource(key, resource);
  54.         TransactionStatus status = txManager.getTransaction(definition);
  55.         try {
  56.             runnable.run();
  57.         } catch (Throwable t) {
  58.             log.debug("任务执行出现异常", t);
  59.             status.setRollbackOnly(); // 出现了异常,需要将整个事务进行回滚
  60.         } finally {
  61.             // 移除与当前线程执行的关联关系,避免任务执行过程中的资源混乱
  62.             TransactionSynchronizationManager.unbindResource(key);
  63.         }
  64.         return status;
  65.     }
  66. }
复制代码
更进一步,我们可以使用线程池来进一步封装,从而避免自己手动创建线程或者其它的线程管理容器:
  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  4. import org.springframework.transaction.TransactionDefinition;
  5. import org.springframework.transaction.TransactionStatus;
  6. import org.springframework.transaction.support.DefaultTransactionDefinition;
  7. import org.springframework.transaction.support.TransactionSynchronizationManager;
  8. import reactor.core.publisher.Flux;
  9. import reactor.core.publisher.Mono;
  10. import reactor.core.scheduler.Scheduler;
  11. import reactor.core.scheduler.Schedulers;
  12. import reactor.util.function.Tuples;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import java.util.concurrent.*;
  16. /**
  17. * 用于需要执行并行任务的事务管理线程池,由于现有的 Spring 声明式事务在不同的线程中不能很好地工作,
  18. * 而在某些情况下,可能需要考虑使用多个处理器的优势来提高方法的执行性能,因此定义了此类来提供一种类似
  19. * 线程池的方式来执行可以并行执行的任务,并对将这些任务整合到 Spring 的事务管理中 <br />
  20. * 具体的使用方式:
  21. * <ol>
  22. *     <li>
  23. *         如果你希望自己配置线程池的相关属性,可以手动创建一个 {@link ThreadPoolExecutor} 来作为构造参数通过
  24. *         {@link DataSourceTransactionExecutor#DataSourceTransactionExecutor(ThreadPoolExecutor,
  25. *         DataSourceTransactionManager, TransactionDefinition)} 进行构建,在构造时会复制这个 {@link ThreadPoolExecutor}
  26. *         的相关属性来重新创建一个 {@link ThreadPoolExecutor} 进行任务的处理,因此不会影响到现有线程池的工作. 同时,值得注意的是,
  27. *         对于传入的 {@link ThreadPoolExecutor} 参数中的 {@link ThreadPoolExecutor#getQueue()} 工作队列类型,必须保证提供一个
  28. *         无参的构造函数来使得工作队列对象能够被重新创建,否则将会抛出异常 <br />
  29. *         此外,除了任务执行者的参数外,至少还需要指定 {@link DataSourceTransactionManager} 用于任务执行时线程事务的统一管理,使得每个任务
  30. *         执行时能够被 Spring 事务进行管理,由于 Spring 提供了对于事务的不同处理方式,因此也可以自定义传入 {@link TransactionDefinition}
  31. *         来定义这些行为
  32. *     </li>
  33. *     <li>
  34. *         对于需要执行的任务,只要将其作为 {@link Runnable} 参数通过 {@link #addTask(Runnable)} 的形式加入到当前的任务列表中,
  35. *         在这个过程中实际的任务不会被执行.
  36. *         当确定已经将任务加入完成后,通过调用 {@link #execute()} 方法来执行这些任务,这些任务的执行会被构造时传入的
  37. *         {@link DataSourceTransactionManager} 进行统一的事务管理,同时,任务执行完毕之后,当前的任务执行者将会被关闭,
  38. *         并不能再继续添加任务
  39. *     </li>
  40. *     以下面的例子为例,我们可以执行如下的几个业务处理:
  41. *     <pre>
  42. *         DataSourceTransactionExecutor executor = new DataSourceTransactionExecutor(txManager);
  43. *         executor.addTask(this::service1);
  44. *         executor.addTask(this::service2);
  45. *         executor.addTask(this::service3);
  46. *         executor.execute();
  47. *     </pre>
  48. * </ol>
  49. *
  50. * @see DataSourceTransactionTask
  51. * @see DataSourceTransactionManager
  52. *@author lxh
  53. */
  54. public class DataSourceTransactionExecutor {
  55.     private final static Logger log = LoggerFactory.getLogger(DataSourceTransactionExecutor.class);
  56.     private final List<Callable<TransactionStatus>> callableList = new ArrayList<>();
  57.     private final DataSourceTransactionManager txManager;
  58.     private final TransactionStatus txStatus;
  59.     private final Object txResource;
  60.     private final ThreadPoolExecutor executor;
  61.     public DataSourceTransactionExecutor(int coreSize,
  62.                                          int maxSize,
  63.                                          int keepTime,
  64.                                          TimeUnit timeUnit,
  65.                                          BlockingQueue<Runnable> workQueue,
  66.                                          ThreadFactory threadFactory,
  67.                                          RejectedExecutionHandler rejectHandler,
  68.                                          DataSourceTransactionManager txManager,
  69.                                          TransactionDefinition definition) {
  70.         this.txManager = txManager;
  71.         this.txStatus = txManager.getTransaction(definition);
  72.         this.txResource = TransactionSynchronizationManager.getResource(txManager.getResourceFactory());
  73.         executor = new ThreadPoolExecutor(coreSize, maxSize, keepTime,
  74.                                           timeUnit, workQueue, threadFactory, rejectHandler);
  75.     }
  76.     public DataSourceTransactionExecutor(DataSourceTransactionManager txManager,
  77.                                          TransactionDefinition definition) {
  78.         this(Runtime.getRuntime().availableProcessors() * 2,
  79.              Runtime.getRuntime().availableProcessors() * 2,
  80.              60,
  81.              TimeUnit.SECONDS,
  82.              new LinkedBlockingDeque<>(),
  83.              Thread::new,
  84.              new ThreadPoolExecutor.AbortPolicy(),
  85.              txManager,
  86.              definition
  87.             );
  88.     }
  89.     @SuppressWarnings("unchecked")
  90.     public DataSourceTransactionExecutor(ThreadPoolExecutor executor,
  91.                                          DataSourceTransactionManager txManager,
  92.                                          TransactionDefinition definition) {
  93.         // 复制一个线程池对象,避免一些线程问题
  94.         this.executor = new ThreadPoolExecutor(
  95.             executor.getCorePoolSize(),
  96.             executor.getMaximumPoolSize(),
  97.             executor.getKeepAliveTime(TimeUnit.SECONDS),
  98.             TimeUnit.SECONDS,
  99.             ReflectTool.createInstance(executor.getQueue().getClass()),
  100.             executor.getThreadFactory(),
  101.             ReflectTool.createInstance(executor.getRejectedExecutionHandler().getClass())
  102.         );
  103.         this.txManager = txManager;
  104.         this.txStatus = txManager.getTransaction(definition);
  105.         this.txResource = TransactionSynchronizationManager.getResource(txManager.getResourceFactory());
  106.     }
  107.     public DataSourceTransactionExecutor(DataSourceTransactionManager txManager) {
  108.         this(txManager, new DefaultTransactionDefinition());
  109.     }
  110.     public void addTask(Runnable task) {
  111.         callableList.add(DataSourceTransactionTask.Builder.aTask()
  112.                          .runnable(task).txManager(txManager)
  113.                          .resource(txResource).definition(new DefaultTransactionDefinition())
  114.                          .build()
  115.                         );
  116.     }
  117.     public void addTask(Runnable task, TransactionDefinition def) {
  118.         callableList.add(DataSourceTransactionTask.Builder.aTask()
  119.                          .runnable(task).txManager(txManager)
  120.                          .resource(txResource).definition(def)
  121.                          .build()
  122.                         );
  123.     }
  124.     public void execute() throws InterruptedException {
  125.         List<Future<TransactionStatus>> futures = new ArrayList<>();
  126.         for (Callable<TransactionStatus> callable : callableList) {
  127.             futures.add(executor.submit(callable));
  128.         }
  129.         executor.shutdown();
  130.         List<TransactionStatus> statusList = new ArrayList<>();
  131.         for (Future<TransactionStatus> future : futures) {
  132.             try {
  133.                 statusList.add(future.get());
  134.             } catch (ExecutionException e) {
  135.                 log.error("任务执行出现异常", e);
  136.                 statusList.add(null);
  137.             }
  138.         }
  139.         Object[] statusArgs = new Object[statusList.size()];
  140.         statusList.toArray(statusArgs);
  141.         mergeTaskResult(statusArgs); // 合并每个任务的事务信息
  142.     }
  143.     /**
  144.      * 以 Reactor 异步的方式执行这些任务,需要注意的是,当使用这个方法时,由于
  145.      * Reactor 的异步特性,如果业务方法使用了 @Transactional 注解修饰,Spring 的事务处理会发生在实际处理
  146.      * 事务之前,可能会导致数据库连接被释放,从而无法绑定对应的事务对象,使用时需要注意这一点
  147.      */
  148.     public void asyncExecute() {
  149.         List<Mono<TransactionStatus>> monoList = new ArrayList<>();
  150.         Scheduler scheduler = Schedulers.fromExecutor(this.executor);
  151.         for (Callable<TransactionStatus> callable : callableList) {
  152.             monoList.add(Mono.fromCallable(callable)
  153.                          .subscribeOn(scheduler));
  154.         }
  155.         Flux.zip(monoList, Tuples::fromArray)
  156.             .single()
  157.             .flatMap(tuple2 -> Mono.fromRunnable(() -> {
  158.                 TransactionSynchronizationManager.bindResource(txManager.getResourceFactory(), txResource);
  159.                 mergeTaskResult(tuple2.toArray());
  160.             }))
  161.             .subscribeOn(scheduler)
  162.             .doOnSubscribe(any -> log.info("开始执行事务的合并操作"))
  163.             .doFinally(any -> {
  164.                 log.debug("合并事务处理执行完成");
  165.                 scheduler.dispose();
  166.                 executor.shutdown();
  167.             })
  168.             .subscribe();
  169.     }
  170.     private void mergeTaskResult(Object... statusList) {
  171.         boolean exFlag = false;
  172.         for (Object obj : statusList) {
  173.             if (obj == null) {
  174.                 exFlag = true;
  175.                 continue;
  176.             }
  177.             // 在当前上下文中一定是 TransactionStatus 类型的对象
  178.             TransactionStatus status = (TransactionStatus) obj;
  179.             if (status.isRollbackOnly()) exFlag = true;
  180.         }
  181.         if (exFlag) {
  182.             log.debug("由于任务执行时出现异常,因此会将整个业务进操作进行回滚");
  183.             txManager.rollback(txStatus);
  184.             /*
  185.                     这里抛出异常的原因是因为相关的业务方法可能被 @Transactional 修饰过,
  186.                     从而导致提交只能回滚的事务而导致的提交异常,具体使用时可以考虑替换掉这个异常类型
  187.             */
  188.             throw new RuntimeException("需要回滚的异常");
  189.         } else {
  190.             txManager.commit(txStatus);
  191.         }
  192.     }
  193. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

忿忿的泥巴坨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表