【项目实践07】【多线程下变乱的同等性】

打印 上一主题 下一主题

主题 871|帖子 871|积分 2613

一、前言

本系列用来记载一些在实际项目中的小东西,并记载在过程中想到一些小东西,由于是随条记载,所以内容不会过于详细。

本篇从开篇到完成至少搁置了半年,因此之前的一些想法记不太清楚了。总之:本篇的方案未经过实际验证,审慎使用。
本篇的灵感泉源自 Spring在多线程环境下如何确保变乱同等性,正如其文章内容所属,想实现 多线程下变乱的同等性 有多重方案选择,包括最底子的 JDBC 编程、或者 分布式变乱思想等,不必局限于此。
二、项目背景

某个项目中需要处理用户上传的 Excel 文件,对每条记载单独处理且处理过程比力耗时,因此使用了线程池来加快处理速度。简朴Demo 如下:
  1.     @Override
  2.     public void testTransaction() {
  3.         // 模拟构造大量数据
  4.         List<Integer> indexs = initData(100);
  5.         // 子线程插入数据
  6.         indexs.parallelStream()
  7.                 .forEach(index ->
  8.                         transactionTemplate.executeWithoutResult(
  9.                                 transactionStatus ->
  10.                                         saveIndexDemo(index.toString())));
  11.     }
  12.         /**
  13.      * 模拟大量数据
  14.      *
  15.      * @return
  16.      */
  17.     private List<Integer> initData(int size) {
  18.         List<Integer> indexs = Lists.newArrayList();
  19.         for (int i = 0; i < size; i++) {
  20.             indexs.add(i);
  21.         }
  22.         return indexs;
  23.     }
  24.     /**
  25.      * 子线程保存记录
  26.      *
  27.      * @param index
  28.      */
  29.     @SneakyThrows
  30.     private void saveIndexDemo(String index) {
  31.         log.info("{} 保存数据 {}", Thread.currentThread().getId(), index);
  32.         demoMapper.insert(getDemo(index));
  33.         // TODO : 模拟业务耗时
  34.         Thread.sleep(10000);
  35.     }
  36.     /**
  37.      * 获取实例
  38.      *
  39.      * @param userId
  40.      * @return
  41.      */
  42.     private Demo getDemo(String userId) {
  43.         final Demo demo = new Demo();
  44.         demo.setUserId(userId);
  45.         return demo;
  46.     }
复制代码
本来相安无事,但是有一天晚上几个用户上传了一个2w行的Excel,这里就在开启线程处理,而天天记载的比力耗时,因此导致每个线程都占用着一个DB连接不释放。最终导致 DB 连接数不够用,其他业务异常。

三、实现方案

实际改造方案:限定线程开启的数量来控制并发度。改造比力简朴,这里直接列出,重点在下面的思路延伸。
  1.     /**
  2.      * 线程池
  3.      */
  4.     private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(5);
  5.     @Transactional(rollbackFor = Exception.class)
  6.     @Override
  7.     public void testTransaction() {
  8.         // 模拟构造大量数据
  9.         List<Integer> indexs = initData(100);
  10.         final CompletableFuture[] futures =
  11.                 indexs.stream()
  12.                         .map(index -> CompletableFuture.runAsync(() ->
  13.                                 transactionTemplate.executeWithoutResult(
  14.                                         transactionStatus ->
  15.                                                 saveIndexDemo(index.toString())), EXECUTOR_SERVICE))
  16.                         .toArray(CompletableFuture[]::new);
  17.         CompletableFuture.allOf(futures).join();
  18.     }
复制代码

四、思路延伸

上面的逻辑实际是有问题的:每个 saveIndexDemo 都处于不同的变乱中,也就是说其中某个 saveIndexDemo 假如异常了其他 saveIndexDemo 并不会回滚。因此这里的问题就演变成了:多线程下如何保证变乱的同等性。
1. 方案一

子线程业务执行结束后统一挂起,等待所有子线程执行结束后判断是否提交变乱 (2PC、3PC 的简化版)。如下:这里使用 CountDownLatch 来壅闭所有子线程,当所有子线程执行结束后根据 atomicBoolean 的值判断是否有变乱执行异常,假如异常,则将所有变乱回滚。
  1.       @SneakyThrows
  2.     @Override
  3.     public String testTransaction01() {
  4.         demoMapper.delete(null);
  5.         // 初始化数据
  6.         List<Integer> indexs = initData(10);
  7.         // 等待子线程执行结束
  8.         CountDownLatch countDownLatch = new CountDownLatch(indexs.size());
  9.         // 业务执行标志
  10.         AtomicBoolean atomicBoolean = new AtomicBoolean(true);
  11.         CountDownLatch endFlag = new CountDownLatch(indexs.size());
  12.         // 模拟每个业务单独开一个线程来执行,并且每个业务内部都开启了一个事务
  13.         for (Integer index : indexs) {
  14.             // 如果线程池线程数量 < 任务数量 会导致任务卡死:线程执行后 countDownLatch.await(); 等待剩余任务执行完成,而线程池中线程被占满,导致线程无法释放就无法再执行剩余任务
  15.             EXECUTOR_SERVICE.execute(() ->
  16.                     transactionTemplate.executeWithoutResult(transactionStatus -> {
  17.                         runTranscation(index, countDownLatch, atomicBoolean, endFlag);
  18.                     }));
  19.         }
  20.         // 等待所有线程执行结束后接口再返回
  21.         endFlag.await();
  22.         return "success";
  23.     }
  24.     /**
  25.      * 执行事务
  26.      *
  27.      * @param index
  28.      * @param countDownLatch
  29.      * @param atomicBoolean
  30.      * @param endFlag
  31.      */
  32.     private void runTranscation(Integer index, CountDownLatch countDownLatch, AtomicBoolean atomicBoolean, CountDownLatch endFlag) {
  33.         try {
  34.             // 业务执行结束
  35.             saveIndexDemo(index.toString());
  36.             countDownLatch.countDown();
  37.             // 等待其他线程业务执行结束
  38.             countDownLatch.await();
  39.         } catch (Exception e) {
  40.             // 如果业务执行异常,设置标志位为 false
  41.             countDownLatch.countDown();
  42.             atomicBoolean.set(false);
  43.             // 如果某个线程执行异常,抛出异常回滚
  44.             throw new RuntimeException("测试异常, 事务回滚");
  45.         } finally {
  46.             endFlag.countDown();
  47.         }
  48.     }
复制代码
上面代码虽然实现了所谓的功能,但是照旧存在一些漏洞的,包括但不限于:

  • 线程池线程数量必须大于等于任务数量,否则会导致任务卡死:线程执行后 countDownLatch.await(); 等待剩余任务执行完成,而线程池中线程被占满,导致线程无法释放就无法再执行剩余任务。
  • 主线程 countDownLatch.await() 这一步执行结束时,子线程中的变乱大概还没提交,不过可以使用其他标志位判断是否全部执行结束,这里不再演示。
  • 假如子线程其中一个由于网络或锁等问题迟迟不能执行结束,会导致其他子线程的变乱一并无法提交,数据库连接池的连接会不停被占用。
  • 由于子线程是同时开启线程而且互相称待的,所以一定要注意死锁情况。如下:

    • 线程A 执行业务逻辑时执行 SQL select * from demo where id = 1 for update, 此SQL会锁住 id = 1 的记载。
    • 线程B 执行业务逻辑时执行SQL update demo set user_id = 'demo' where id = '1',此时会发现 id = 1 的记载被线程A锁住,因此等待线程A执行结束。
    • 线程A执行完业务逻辑后执行到 countDownLatch.await(); 后等待其他线程业务逻辑执行结束。而此时线程B由于等待线程A持有的记载A 的锁迟迟无法结束业务逻辑,也就无法执行到 countDownLatch.countDown(); 的逻辑,从而形成死锁的局面。

综上,该方案不推荐使用。
2. 方案二

本方案基于Spring在多线程环境下如何确保变乱同等性 中的内容,做了部门改造,详细思路在该文中有过介绍,本篇不过多赘述细节。
以下面Demo为例,实现功能为 :初始化 15 条数据,每条数据使用一个单独的线程,每个线程开辟一个单独的变乱来将数据插入到数据库中。当其中一个线程的变乱出现异常时,所有变乱统一回滚。
  1.     @Transactional(rollbackFor = Exception.class)
  2.     @Override
  3.     public void testTransaction02() {
  4.         // 初始化 15 条数据
  5.         List<Integer> indexs = initData(15);
  6.         // 并发执行,每一个线程执行事务,事务的内容是将 Index 插入到 demo 表中
  7.         CompletableFuture<TransactionUtil.TransactionResult>[] futures =
  8.                 indexs.stream()
  9.                         .map(index ->
  10.                                 CompletableFuture.supplyAsync(() ->
  11.                                         transactionUtil.executeWithoutResult(
  12.                                                 transactionStatus ->
  13.                                                         saveIndexDemo(index.toString()))))
  14.                         .toArray(CompletableFuture[]::new);
  15.         // 等待所有 CompletableFuture 执行结束
  16.         CompletableFuture.allOf(futures).join();
  17.         // 获取子线程执行结果
  18.         List<TransactionUtil.TransactionResult> transactionResults =
  19.                 Arrays.stream(futures)
  20.                         .map(CompletableFuture::join)
  21.                         .collect(Collectors.toList());
  22.         // 批量处理所有子事物
  23.         transactionUtil.batchTrim(transactionResults);
  24. }
复制代码
上面代码的关键实现是下面的工具类,如下:
  1. /**
  2. * @Desc : 从 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction 中参考下整个流程
  3. */
  4. @Slf4j
  5. @Component
  6. public class TransactionUtil {
  7.     @Resource
  8.     private PlatformTransactionManager transactionManager;
  9.     @Resource
  10.     private TransactionDefinition transactionDefinition;
  11.     /**
  12.      * 基于 TransactionOperations#executeWithoutResult(java.util.function.Consumer) 改造
  13.      *
  14.      * @param function
  15.      */
  16.     public <T> TransactionResult<T> execute(Function<TransactionStatus, T> function) {
  17.         TransactionStatus status = beginNewTransaction();
  18.         TransactionResult<T> transactionResult =
  19.                 new TransactionResult<>(status, TransactionResource.copyTransactionResource());
  20.         try {
  21.             transactionResult.setData(function.apply(status));
  22.         } catch (Throwable ex) {
  23.             log.error("Transaction failed", ex);
  24.             transactionResult.setEx(ex);
  25.         } finally {
  26.             clearTransactionResource();
  27.         }
  28.         return transactionResult;
  29.     }
  30.     /**
  31.      * 基于 TransactionOperations#executeWithoutResult(java.util.function.Consumer) 改造
  32.      * 需要注意:在本篇代码中,执行该方法的是线程池中的某一个线程,而不是主线程
  33.      *
  34.      * @param consumer
  35.      */
  36.     public TransactionResult<Void> executeWithoutResult(Consumer<TransactionStatus> consumer) {
  37.         // 为当前线程开启一个新的事务
  38.         TransactionStatus status = beginNewTransaction();
  39.         // 封装成 TransactionResult 对象
  40.         TransactionResult<Void> transactionResult =
  41.                 new TransactionResult<>(status, TransactionResource.copyTransactionResource());
  42.         try {
  43.             //执行具体业务逻辑
  44.             consumer.accept(status);
  45.         } catch (Throwable ex) {
  46.             log.error("Transaction failed", ex);
  47.             transactionResult.setEx(ex);
  48.         } finally {
  49.             // 清除 beginNewTransaction(); 方法创建事务时绑定当前线程的一些事务资源,防止线程池的线程复用时出现问题
  50.             clearTransactionResource();
  51.         }
  52.         return transactionResult;
  53.     }
  54.     /**
  55.      * 开启事务资源
  56.      *
  57.      * @return
  58.      */
  59.     private TransactionStatus beginNewTransaction() {
  60.         // 开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
  61.         return transactionManager.getTransaction(transactionDefinition);
  62.     }
  63.     /**
  64.      * 清除事务资源
  65.      * beginNewTransaction 方法在创建新事物的同时还会给当前线程绑定一些事务属性,而由于线程池的线程是复用的,所以当下次再使用这个线程创新的事务时可能会出现问题:
  66.      * 如 : 当第一次使用该线程创建事务时会给该线程绑定一个 DB 连接,而当事务提交后这个连接就会关闭,
  67.      * 当下一次复用该线程执行事务时会判断当前线程已经绑定了 DB 连接就不会重新创建连接,而这个连接实际上已经关闭了,会出现 java.sql.SQLException: Connection is closed 问题
  68.      */
  69.     private void clearTransactionResource() {
  70.         Maps.newHashMap(TransactionSynchronizationManager.getResourceMap()).keySet()
  71.                 .forEach(TransactionSynchronizationManager::unbindResource);
  72.         if (TransactionSynchronizationManager.isActualTransactionActive()) {
  73.             TransactionSynchronizationManager.clearSynchronization();
  74.         }
  75.     }
  76.     /**
  77.      * 批量处理事务
  78.      * 需要注意:在本篇代码中,执行该方法的是主线程,线程池中每个线程都会开启一个事务并将事务属性返回交由主线程来统一处理。
  79.      *
  80.      * @param transactionResults
  81.      * @return 是否提交
  82.      */
  83.     public boolean batchTrim(Collection<TransactionResult> transactionResults) {
  84.         // 判断是否出现异常
  85.         boolean rollback = transactionResults.stream()
  86.                 .anyMatch(transactionResult -> Objects.nonNull(transactionResult.getEx()));
  87.         for (TransactionResult<?> transactionResult : transactionResults) {
  88.             // 处理事务,提交或回滚
  89.             if (rollback) {
  90.                 transactionResult.rollback();
  91.             } else {
  92.                 transactionResult.commit();
  93.             }
  94.         }
  95.         return !rollback;
  96.     }
  97.     /**
  98.      * 保存当前事务资源,用于线程间的事务资源COPY操作
  99.      */
  100.     @Builder
  101.     public static class TransactionResource {
  102.         /**
  103.          * 当前线程事务资源
  104.          */
  105.         private static final ThreadLocal<TransactionResource> RESOURCE_THREAD_LOCAL = new ThreadLocal<>();
  106.         /**
  107.          * 事务结束后默认会移除集合中的DataSource作为key关联的资源记录
  108.          */
  109.         private Map<Object, Object> resources = new HashMap<>();
  110.         /**
  111.          * 下面五个属性会在事务结束后被自动清理,无需我们手动清理
  112.          */
  113.         private Set<TransactionSynchronization> synchronizations = new HashSet<>();
  114.         /**
  115.          * 当前事务名称
  116.          */
  117.         private String currentTransactionName;
  118.         /**
  119.          * 当前事务只读属性
  120.          */
  121.         private Boolean currentTransactionReadOnly;
  122.         /**
  123.          * 当前事务隔离级别
  124.          */
  125.         private Integer currentTransactionIsolationLevel;
  126.         /**
  127.          * 当前事务是否处于活动状态
  128.          */
  129.         private Boolean actualTransactionActive;
  130.         /**
  131.          * 当前事务的属性拷贝
  132.          *
  133.          * @return
  134.          */
  135.         public static TransactionResource copyTransactionResource() {
  136.             // 如果当前线程存在活跃事务则拷贝,否则返回一个基础 TransactionResource
  137.             if (TransactionSynchronizationManager.isActualTransactionActive()) {
  138.                 return TransactionResource.builder()
  139.                         .resources(Maps.newHashMap(TransactionSynchronizationManager.getResourceMap()))
  140.                         .synchronizations(Sets.newHashSet(TransactionSynchronizationManager.getSynchronizations()))
  141.                         .currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
  142.                         .currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
  143.                         .currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
  144.                         .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
  145.                         .build();
  146.             } else {
  147.                 return TransactionResource.builder()
  148.                         .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
  149.                         .build();
  150.             }
  151.         }
  152.         /**
  153.          * 给当前线程注入事务属性
  154.          */
  155.         public void autowiredTransactionResource() {
  156.             if (actualTransactionActive) {
  157.                 TransactionSynchronizationManager.initSynchronization();
  158.                 synchronizations.forEach(TransactionSynchronizationManager::registerSynchronization);
  159.                 resources.forEach(TransactionSynchronizationManager::bindResource);
  160.                 TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
  161.                 TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
  162.                 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
  163.                 TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
  164.             }
  165.         }
  166.         /**
  167.          * 移除事务属性
  168.          *
  169.          * @param all : 是否全部清除,默认只移除非 DataSource资源。这里解释下该参数的作用:
  170.          *            事务提交后会自动清除事务相关的资源信息,所以不需要调用TransactionSynchronizationManager.clearSynchronization();同时事务信息也会自动清除 DataSource 相关的属性,所以也不需要清除。
  171.          *            而对于上面代码来说,如果是子线程事务因为会执行提交或者回滚操作,会自动清除 DataSource 相关的属性,所以传 false; 而对主线程事务来说,事务提交并不是由我们控制,所以需要传 true,清空本身事务信息,为绑定子线程事务信息做准备
  172.          */
  173.         public void removeTransactionResource(boolean all) {
  174.             if (actualTransactionActive) {
  175.                 // 事务结束后默认会移除集合中的DataSource作为key关联的资源记录,DataSource如果重复移除,unbindResource时会因为不存在此key关联的事务资源而报错
  176.                 resources.keySet().forEach(key -> {
  177.                     if (all || !(key instanceof DataSource)) {
  178.                         TransactionSynchronizationManager.unbindResource(key);
  179.                     }
  180.                 });
  181.             }
  182.         }
  183.     }
  184.     /**
  185.      * 执行结果
  186.      *
  187.      * @param <T>
  188.      */
  189.     public class TransactionResult<T> {
  190.         /**
  191.          * 事务状态
  192.          */
  193.         private TransactionStatus status;
  194.         /**
  195.          * 线程资源
  196.          */
  197.         private TransactionResource resource;
  198.         /**
  199.          * 异常信息
  200.          */
  201.         @Getter
  202.         @Setter
  203.         private Throwable ex;
  204.         /**
  205.          * 执行结果
  206.          */
  207.         @Getter
  208.         @Setter
  209.         private T data;
  210.         public TransactionResult(TransactionStatus status, TransactionResource resource) {
  211.             this.status = status;
  212.             this.resource = resource;
  213.         }
  214.         /**
  215.          * 事务提交
  216.          * 需要注意:在本篇代码中,执行该方法的是主线程
  217.          */
  218.         public void commit() {
  219.             // 如果主线程开启了事务,则保存主线程事务信息:因为主线程可能同步存在事务,所以需要先备份主线程的事务信息
  220.             TransactionResource currentResource = TransactionResource.copyTransactionResource();
  221.             try {
  222.                 // 移除当前线程(主线程)事务资源, 防止下面绑定其他事务资源时冲突,这里调用入参传 true ,清除主线程事务的全部资源信息
  223.                 currentResource.removeTransactionResource(true);
  224.                 if (TransactionSynchronizationManager.isSynchronizationActive()) {
  225.                     // 清除活跃状态,否则注入子事务时会判断当前线程已经有活跃事务
  226.                     TransactionSynchronizationManager.clearSynchronization();
  227.                 }
  228.                 // 注入子线程事务信息 : resource 中保存的事子线程的事务信息,将子线程事务信息注入到这里后再提交子线程事务
  229.                 resource.autowiredTransactionResource();
  230.                 // 提交子线程 : 事务提交后会自动清除事务相关的资源信息,所以不需要调用TransactionSynchronizationManager.clearSynchronization();,同时事务信息也会自动清除 DataSource 相关的属性,所以也不需要清除
  231.                 transactionManager.commit(status);
  232.                 // 清除子线程事务信息, 防止下面恢复主线程事务时属性冲突, 这里调用入参传 false ,因为上面调用了事务提交,所以会自动清除 DataSource 相关的属性,也就不需要我们自己清除了
  233.                 resource.removeTransactionResource(false);
  234.             } catch (Exception ex) {
  235.                 log.info("commit error", ex);
  236.                 // 清除子线程事务信息, 防止下面恢复主线程事务时属性冲突
  237.                 resource.removeTransactionResource(false);
  238.             } finally {
  239.                 log.info("commit");
  240.                 // 恢复主线程事务信息
  241.                 currentResource.autowiredTransactionResource();
  242.             }
  243.         }
  244.         /**
  245.          * 事务回滚
  246.          */
  247.         public void rollback() {
  248.             // 如果主线程开启了事务,则保存主线程事务信息:因为主线程可能同步存在事务,所以需要先备份主线程的事务信息
  249.             TransactionResource currentResource = TransactionResource.copyTransactionResource();
  250.             try {
  251.                 // 移除当前线程(主线程)事务资源, 防止下面绑定其他事务资源时冲突,这里调用入参传 true ,清除主线程事务的全部资源信息
  252.                 currentResource.removeTransactionResource(true);
  253.                 if (TransactionSynchronizationManager.isSynchronizationActive()) {
  254.                     // 清除活跃状态,否则注入子事务时会判断当前线程已经有活跃事务
  255.                     TransactionSynchronizationManager.clearSynchronization();
  256.                 }
  257.                 // 注入子线程事务信息 : resource 中保存的事子线程的事务信息,将子线程事务信息注入到这里后再提交子线程事务
  258.                 resource.autowiredTransactionResource();
  259.                 // 提交子线程 : 事务提交后会自动清除事务相关的资源信息,所以不需要调用TransactionSynchronizationManager.clearSynchronization();,同时事务信息也会自动清除 DataSource 相关的属性,所以也不需要清除
  260.                 rollbackOnException(status, ex);
  261.                 // 清除子线程事务信息, 防止下面恢复主线程事务时属性冲突, 这里调用入参传 false ,因为上面调用了事务提交,所以会自动清除 DataSource 相关的属性,也就不需要我们自己清除了
  262.                 resource.removeTransactionResource(false);
  263.             } catch (Exception ex) {
  264.                 log.info("commit error", ex);
  265.                 // 清除子线程事务信息, 防止下面恢复主线程事务时属性冲突
  266.                 resource.removeTransactionResource(false);
  267.             } finally {
  268.                 log.info("commit");
  269.                 // 恢复主线程事务信息
  270.                 currentResource.autowiredTransactionResource();
  271.             }
  272.         }
  273.         /**
  274.          * 回滚
  275.          *
  276.          * @param status
  277.          * @param ex
  278.          * @throws TransactionException
  279.          */
  280.         private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException {
  281.             Assert.state(transactionManager != null, "No PlatformTransactionManager set");
  282.             log.debug("Initiating transaction rollback on application exception", ex);
  283.             try {
  284.                 transactionManager.rollback(status);
  285.             } catch (TransactionSystemException ex2) {
  286.                 log.error("Application exception overridden by rollback exception", ex);
  287.                 ex2.initApplicationException(ex);
  288.                 throw ex2;
  289.             } catch (RuntimeException | Error ex2) {
  290.                 log.error("Application exception overridden by rollback exception", ex);
  291.                 throw ex2;
  292.             }
  293.         }
  294.     }
  295. }
复制代码
上面的方案存在问题:

  • 在编写代码时注意多个线程变乱在执行过程中的死锁问题。
  • 最终变乱的提交方法 TransactionUtil#batchTrim 并不是原子的,当多个变乱中某个变乱提交失败了,则无法对之前提交成功的变乱回滚。
3. 方案三

基于方案二,又发散出了头脑三:所有子线程与主线程公用同一个变乱信息,最后在主线程执行结束后将这个变乱提交。
以下面Demo为例,实现功能为 :初始化 15 条数据,每条数据使用一个单独的线程,每个线程都会使用主线程的变乱来将数据插入到数据库中。当其中一个线程或主线程出现异常时,变乱统一回滚。
  1.     @Transactional
  2.     @Override
  3.     public void testTransaction03() {
  4.         // 初始化 15 条数据
  5.         List<Integer> indexs = initData(15);
  6.         // 拷贝主线程事务资源
  7.         SingleTransactionUtil.TransactionResource transactionResource =
  8.                 SingleTransactionUtil.TransactionResource.copyTransactionResource();
  9.         // 并发执行,每一个线程执行事务,事务的内容是将 Index 插入到 demo 表中
  10.         CompletableFuture<SingleTransactionUtil.TransactionResult>[] futures =
  11.                 indexs.stream()
  12.                         .map(index ->
  13.                                 CompletableFuture.supplyAsync(() ->
  14.                                         singleTransactionUtil.executeWithoutResult(transactionResource,
  15.                                                 () -> saveIndexDemo(index.toString()))))
  16.                         .toArray(CompletableFuture[]::new);
  17.         // 等待所有 CompletableFuture 执行结束
  18.         CompletableFuture.allOf(futures).join();
  19.         final boolean anyMatch = Arrays.stream(futures)
  20.                 .anyMatch(f -> f.join().getEx() != null);
  21.         if (anyMatch){
  22.             throw new RuntimeException("测试异常, 事务回滚");
  23.         }
  24.         dataDemoMapper.insert(new DataDemo("1"));
  25.         dataDemoMapper.insert(new DataDemo("2"));
  26.     }
复制代码
上面代码的关键实现是下面的工具类,如下:
  1. /**
  2. * @Desc : 从 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction 中参考下整个流程
  3. */
  4. @Slf4j
  5. @Component
  6. public class SingleTransactionUtil {
  7.     @Resource
  8.     private PlatformTransactionManager transactionManager;
  9.     @Resource
  10.     private TransactionDefinition transactionDefinition;
  11.     /**
  12.      * 基于 TransactionOperations#executeWithoutResult(java.util.function.Consumer) 改造
  13.      * 执行
  14.      *
  15.      * @param supplier
  16.      */
  17.     public <T> TransactionResult<T> execute(TransactionResource transactionResource,  Supplier<T> supplier) {
  18.         transactionResource.autowiredTransactionResource();
  19.         TransactionResult<T> transactionResult = new TransactionResult<>();
  20.         try {
  21.             transactionResult.setData(supplier.get());
  22.         } catch (Throwable ex) {
  23.             log.error("Transaction failed", ex);
  24.             transactionResult.setEx(ex);
  25.         } finally {
  26.             clearTransactionResource();
  27.         }
  28.         return transactionResult;
  29.     }
  30.     /**
  31.      * 基于 TransactionOperations#executeWithoutResult(java.util.function.Consumer) 改造
  32.      * 执行
  33.      *
  34.      * @param runnable
  35.      */
  36.     public TransactionResult<Void> executeWithoutResult(TransactionResource transactionResource, Runnable runnable) {
  37.         transactionResource.autowiredTransactionResource();
  38.         TransactionResult<Void> transactionResult = new TransactionResult<>();
  39.         try {
  40.             runnable.run();
  41.         } catch (Throwable ex) {
  42.             log.error("Transaction failed", ex);
  43.             transactionResult.setEx(ex);
  44.         } finally {
  45.             clearTransactionResource();
  46.         }
  47.         return transactionResult;
  48.     }
  49.    
  50.     /**
  51.      * 清除事务资源
  52.      * beginNewTransaction 方法在创建新事物的同时还会给当前线程绑定一些事务属性,而由于线程池的线程是复用的,所以当下次再使用这个线程创新的事务时可能会出现问题:
  53.      * 如 : 当第一次使用该线程创建事务时会给该线程绑定一个 DB 连接,而当事务提交后这个连接就会关闭,
  54.      * 当下一次复用该线程执行事务时会判断当前线程已经绑定了 DB 连接就不会重新创建连接,而这个连接实际上已经关闭了,会出现 java.sql.SQLException: Connection is closed 问题
  55.      */
  56.     private void clearTransactionResource() {
  57.         Maps.newHashMap(TransactionSynchronizationManager.getResourceMap()).keySet()
  58.                 .forEach(TransactionSynchronizationManager::unbindResource);
  59.         if (TransactionSynchronizationManager.isActualTransactionActive()) {
  60.             TransactionSynchronizationManager.clearSynchronization();
  61.         }
  62.     }
  63.     @Slf4j
  64.     public static class TransactionResult<T> {
  65.         /**
  66.          * 异常信息
  67.          */
  68.         @Getter
  69.         @Setter
  70.         private Throwable ex;
  71.         /**
  72.          * 执行结果
  73.          */
  74.         @Getter
  75.         @Setter
  76.         private T data;
  77.     }
  78.     /**
  79.      * 保存当前事务资源,用于线程间的事务资源COPY操作
  80.      */
  81.     @Builder
  82.     public static class TransactionResource {
  83.         /**
  84.          * 当前线程事务资源
  85.          */
  86.         private static final ThreadLocal<TransactionResource> RESOURCE_THREAD_LOCAL = new ThreadLocal<>();
  87.         /**
  88.          * 事务结束后默认会移除集合中的DataSource作为key关联的资源记录
  89.          */
  90.         private Map<Object, Object> resources = new HashMap<>();
  91.         /**
  92.          * 下面五个属性会在事务结束后被自动清理,无需我们手动清理
  93.          */
  94.         private Set<TransactionSynchronization> synchronizations = new HashSet<>();
  95.         /**
  96.          * 当前事务名称
  97.          */
  98.         private String currentTransactionName;
  99.         /**
  100.          * 当前事务只读属性
  101.          */
  102.         private Boolean currentTransactionReadOnly;
  103.         /**
  104.          * 当前事务隔离级别
  105.          */
  106.         private Integer currentTransactionIsolationLevel;
  107.         /**
  108.          * 当前事务是否处于活动状态
  109.          */
  110.         private Boolean actualTransactionActive;
  111.         /**
  112.          * 当前事务的属性拷贝
  113.          *
  114.          * @return
  115.          */
  116.         public static TransactionResource copyTransactionResource() {
  117.             // 如果当前线程存在活跃事务则拷贝,否则返回一个基础 TransactionResource
  118.             if (TransactionSynchronizationManager.isActualTransactionActive()) {
  119.                 return TransactionResource.builder()
  120.                         .resources(Maps.newHashMap(TransactionSynchronizationManager.getResourceMap()))
  121.                         .synchronizations(Sets.newHashSet(TransactionSynchronizationManager.getSynchronizations()))
  122.                         .currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
  123.                         .currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
  124.                         .currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
  125.                         .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
  126.                         .build();
  127.             } else {
  128.                 return TransactionResource.builder()
  129.                         .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
  130.                         .build();
  131.             }
  132.         }
  133.         /**
  134.          * 给当前线程注入事务属性
  135.          */
  136.         public void autowiredTransactionResource() {
  137.             if (actualTransactionActive) {
  138.                 TransactionSynchronizationManager.initSynchronization();
  139.                 synchronizations.forEach(TransactionSynchronizationManager::registerSynchronization);
  140.                 resources.forEach(TransactionSynchronizationManager::bindResource);
  141.                 TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
  142.                 TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
  143.                 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
  144.                 TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
  145.             }
  146.         }
  147.         /**
  148.          * 移除事务属性
  149.          *
  150.          * @param all : 是否全部清除,默认只移除非 DataSource资源。这里解释下该参数的作用:
  151.          *            事务提交后会自动清除事务相关的资源信息,所以不需要调用TransactionSynchronizationManager.clearSynchronization();同时事务信息也会自动清除 DataSource 相关的属性,所以也不需要清除。
  152.          *            而对于上面代码来说,如果是子线程事务因为会执行提交或者回滚操作,会自动清除 DataSource 相关的属性,所以传 false; 而对主线程事务来说,事务提交并不是由我们控制,所以需要传 true,清空本身事务信息,为绑定子线程事务信息做准备
  153.          */
  154.         public void removeTransactionResource(boolean all) {
  155.             if (actualTransactionActive) {
  156.                 // 事务结束后默认会移除集合中的DataSource作为key关联的资源记录,DataSource如果重复移除,unbindResource时会因为不存在此key关联的事务资源而报错
  157.                 resources.keySet().forEach(key -> {
  158.                     if (all || !(key instanceof DataSource)) {
  159.                         TransactionSynchronizationManager.unbindResource(key);
  160.                     }
  161.                 });
  162.             }
  163.         }
  164.     }
  165. }
复制代码
五、参考内容

Spring在多线程环境下如何确保变乱同等性
https://mp.weixin.qq.com/s/gyEKbnzD3gIqfatL94DXsg

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

知者何南

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表