ToB企服应用市场:ToB评测及商务社交产业平台

标题: 基于Mybtais的多线程事务的手动提交 [打印本页]

作者: 自由的羽毛    时间: 2025-1-3 16:28
标题: 基于Mybtais的多线程事务的手动提交
解释


业务需求



我们的要将数据同步到两个表里面,这两个表是不相关的,因为我们传输的数据量过大以是接口超时了,为了优化进步接口效率+这两个表是不相关的表,以是我们可以上我们的多线程分别插入数据这样子来优化
 
题目所在


Spring的@Transactional注解只能控制当火线程的事务而控制不到异步线程的事务,那就会出现部门事务回滚情况。例如A表乐成插入但是B表插入失败了,这个部门事务回滚的情况是不可的,不符合我们的原子性。以是我们要用编程式事务手动提交来控制我们的事务,末了来实现一个同一回滚大概同一提交,不能出现一部门提交一部门回滚的题目

代码逻辑



因为我们是数据库操作又是基于Spring-mybatis,在异步线程中我们不能用Autowired来注入我们的Mapper对象,以是就有了两个要使用的类
一个是操作数据库的SqlsessionFactory类
一个是我们操作Spring事务的PlatformTransactionManager类

另有三个重要的类
一个是用来提交回滚事物的TransactionStatus类
一个是用来控制和阻塞线程的CountDownLatch类
一个是多线程下包管线程安全的AtomicBoolean类

我们在异步线程开启手动提交事务然后操作数据库插入数据,此时我们的事务还没有提交
我们的异步线程被countDownLatch.await()阻塞,等待全部的异步线程都实行了countDownLatch.countDown(),也就是异步线程都实行完,我们的异步线程才开始提交事务
我们会有个AtomicBoolean类型的status变量,我们异步线程的其中一个报错后我们都会try-catch把这个变量改成false

末了我们的事务提交逻辑是,假如这个status是false,我们就同一提交事务,假如这个satus是false,我们就同一回滚事务

代码

  1. package com.example.transational.Test;
  2. import jakarta.annotation.Resource;
  3. import org.apache.ibatis.session.SqlSession;
  4. import org.apache.ibatis.session.SqlSessionFactory;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.transaction.PlatformTransactionManager;
  8. import org.springframework.transaction.TransactionStatus;
  9. import org.springframework.transaction.support.DefaultTransactionDefinition;
  10. import java.util.ArrayList;
  11. import java.util.Collections;
  12. import java.util.List;
  13. import java.util.concurrent.ArrayBlockingQueue;
  14. import java.util.concurrent.CountDownLatch;
  15. import java.util.concurrent.ThreadPoolExecutor;
  16. import java.util.concurrent.TimeUnit;
  17. import java.util.concurrent.atomic.AtomicBoolean;
  18. @Component
  19. public class Test {
  20.     //两个重要类,SqlsessionFactory和PlatformTransactionManager
  21.     @Resource
  22.     private SqlSessionFactory sqlSessionFactory;
  23.     @Autowired
  24.     private PlatformTransactionManager transactionManager;
  25.     public static final int CORE_POOL_SIZE = 5;
  26.     public static final int MAX_POOL_SIZE = 10;
  27.     public static final int QUEUE_CAPACITY = 100;
  28.     public static final Long KEEP_ALIVE_TIME = 1L;
  29.     public static final ThreadPoolExecutor kiraExecutor = new ThreadPoolExecutor(
  30.             CORE_POOL_SIZE,
  31.             MAX_POOL_SIZE,
  32.             KEEP_ALIVE_TIME,
  33.             TimeUnit.SECONDS,
  34.             new ArrayBlockingQueue<>(QUEUE_CAPACITY),
  35.             new ThreadPoolExecutor.AbortPolicy()
  36.     );
  37.     public void insert() {
  38.    
  39.         //状态变量,原子的Boolean类型防止多线程下出问题
  40.         AtomicBoolean status = new AtomicBoolean(true);
  41.         
  42.         //countDwonLatch,我们有两个异步任务,当两个都执行完后我们的主线程再往下执行
  43.         CountDownLatch countDownLatch = new CountDownLatch(2);
  44.         
  45.         //等两个事务都提交或回滚完我们再放行我们的主线程
  46.         CountDownLatch waitCountDownLatch=new CountDownLatch(2);
  47.         kiraExecutor.execute(() -> {
  48.             TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());
  49.             try  {
  50.                 //我们要openSession设置为false,设置为手动提交
  51.                 SqlSession sqlSession = sqlSessionFactory.openSession(false);
  52.                 BookMapper bookMapper = sqlSession.getMapper(BookMapper.class);
  53.                 bookMapper.bookInsert(87979, 1);
  54. //                手动抛出异常进行测试
  55. //                throw new RuntimeException();
  56.             } catch (Exception e) {
  57.                 status.set(false);
  58.             } finally {
  59.                 countDownLatch.countDown();
  60.             }
  61.             try{
  62.                 countDownLatch.await();//等待所有的异步线程都执行完我们再提交
  63.                 if(status.get())
  64.                 {
  65.                     transactionManager.commit(transaction);
  66.                 }
  67.                 else
  68.                 {
  69.                     transactionManager.rollback(transaction);
  70.                 }
  71.             }catch (Exception e)
  72.             {
  73.             }
  74.             finally {
  75.                 waitCountDownLatch.countDown();
  76.             }
  77.         });
  78.         kiraExecutor.execute(() -> {
  79.             TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());
  80.             try  {
  81.                 //我们要openSession设置为false,设为手动提交
  82.                 SqlSession sqlSession = sqlSessionFactory.openSession(false);
  83.                 BookMapper bookMapper = sqlSession.getMapper(BookMapper.class);
  84.                 bookMapper.bookInsert2(84614, 1);
  85.             } catch (Exception e) {
  86.                 status.set(false);
  87.             } finally {
  88.                 countDownLatch.countDown();
  89.             }
  90.             try{
  91.                 countDownLatch.await();//我们要所有的异步线程都执行完我们再提交
  92.                 if(status.get())
  93.                 {
  94.                     transactionManager.commit(transaction);
  95.                 }
  96.                 else
  97.                 {
  98.                     transactionManager.rollback(transaction);
  99.                 }
  100.             }catch (Exception e)
  101.             {
  102.             }
  103.             finally {
  104.                 waitCountDownLatch.countDown();
  105.             }
  106.         });
  107.         try{
  108.             waitCountDownLatch.await();//等待所有的异步线程都执行完我们的主线程才继续往下执行
  109.         }
  110.         catch (Exception e)
  111.         {
  112.         }
  113.     }
  114. }
复制代码

手动回滚场景存在的小题目

主键连续断开题目

主键连续断开了
因为我们是自增主键,但是我们这样子手动回滚的时间我们mysql不知道,以是我们的主键还是+了,以是另一个数据插入进来的时间,他的主键会断开不会连续
例如30,然后我们插入失败两次回滚了两次,第三次乐成了,他的主键不是31而是33



commit的时间报错却不能同一回滚的题目

还记得我们之前的提交逻辑吗
我们是多个异步线程都没错,也就是我们的status变量为true,我们的多个线程再提交事务

但是有个题目?我们要是commit()的时间错误了怎么办?我们这时间没有通知机制让其他事务都回滚,因为我们阻塞结束后我们其实是并发来提交我们的事务的。

那我换个思绪串行提交不就好了?例如List<TransactionStatus>,然后我们在主线程同一for循环提交事务???
你觉得这样子就对了???那我for循环串行提交突然某一个事务因为commmit()堕落了那该怎么办?我们还是会导致部门事务回滚。
因为我们的事务commit()完之后就不能回滚了
以是说现在就是,假如我们的commit()不出现报错,那我们的业务逻辑是完全对的
假如我们的commit()出现报错,那么这个业务逻辑就有题目了
例如我们10个事务我们for循环提交了5个,但是第6个事务我们commit()堕落了咋办?我们commit()完后是不能回滚的对吧?

以是这种commit()报错导致的极端错误我们是无法解决的,什么时间会commit()堕落呢?那我也不知道哈哈哈,我想到一个场景就是我们有10次commit(),前面5次commit()乐成,第六次commit()报错因为Mysql宕机了,以是commit()报错
这就就类似于宕机错误,例如我们10个事务我们提交了5个然后java程序宕机了,后面5个事务也没提交,这种情况也会导致部门事务回滚。
我们并不能包管这10个事务提交的原子性
以是不思量commit()和java程序宕机,我们的这个逻辑就是原子性的,但是这两种极端情况我们处理不了
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4