ToB企服应用市场:ToB评测及商务社交产业平台

标题: 支付宝一面:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了! [打印本页]

作者: 张国伟    时间: 2023-9-29 22:26
标题: 支付宝一面:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!
背景介绍

1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。
2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。
3,下面用一个简单示例演示多线程事务。
公用的类和方法
  1. /**
  2. * 平均拆分list方法.
  3. * @param source
  4. * @param n
  5. * @param <T>
  6. * @return
  7. */
  8. public static <T> List<List<T>> averageAssign(List<T> source,int n){
  9.     List<List<T>> result=new ArrayList<List<T>>();
  10.     int remaider=source.size()%n;
  11.     int number=source.size()/n;
  12.     int offset=0;//偏移量
  13.     for(int i=0;i<n;i++){
  14.         List<T> value=null;
  15.         if(remaider>0){
  16.             value=source.subList(i*number+offset, (i+1)*number+offset+1);
  17.             remaider--;
  18.             offset++;
  19.         }else{
  20.             value=source.subList(i*number+offset, (i+1)*number+offset);
  21.         }
  22.         result.add(value);
  23.     }
  24.     return result;
  25. }
  26. /**  线程池配置
  27. * @version V1.0
  28. */
  29. public class ExecutorConfig {
  30.     private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
  31.     private volatile static ExecutorService executorService;
  32.     public static ExecutorService getThreadPool() {
  33.         if (executorService == null){
  34.             synchronized (ExecutorConfig.class){
  35.                 if (executorService == null){
  36.                     executorService =  newThreadPool();
  37.                 }
  38.             }
  39.         }
  40.         return executorService;
  41.     }
  42.     private static  ExecutorService newThreadPool(){
  43.         int queueSize = 500;
  44.         int corePool = Math.min(5, maxPoolSize);
  45.         return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
  46.             new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
  47.     }
  48.     private ExecutorConfig(){}
  49. }
  50. /** 获取sqlSession
  51. * @author 86182
  52. * @version V1.0
  53. */
  54. @Component
  55. public class SqlContext {
  56.     @Resource
  57.     private SqlSessionTemplate sqlSessionTemplate;
  58.     public SqlSession getSqlSession(){
  59.         SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
  60.         return sqlSessionFactory.openSession();
  61.     }
  62. }
复制代码
推荐一个 Spring Boot 基础教程及实战示例:https://github.com/javastacks/spring-boot-best-practice
示例事务不成功操作

[code]  /** * 测试多线程事务. * @param employeeDOList */@Override@Transactionalpublic void saveThread(List employeeDOList) {    try {        //先做删除操作,如果子线程出现异常,此操作不会回滚        this.getBaseMapper().delete(null);        //获取线程池        ExecutorService service = ExecutorConfig.getThreadPool();        //拆分数据,拆分5份        List lists=averageAssign(employeeDOList, 5);        //执行的线程        Thread []threadArray = new Thread[lists.size()];        //监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭        CountDownLatch countDownLatch = new CountDownLatch(lists.size());        AtomicBoolean atomicBoolean = new AtomicBoolean(true);        for (int i =0;i {                try {                 //最后一个线程抛出异常                    if (!atomicBoolean.get()){                        throw new ServiceException("001","出现异常");                    }                    //批量添加,mybatisPlus中自带的batch方法                    this.saveBatch(list);                }finally {                    countDownLatch.countDown();                }            });        }        for (int i = 0; i




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4