自由的羽毛 发表于 2025-1-3 16:28:16

基于Mybtais的多线程事务的手动提交

解释


业务需求


我们的要将数据同步到两个表里面,这两个表是不相关的,因为我们传输的数据量过大以是接口超时了,为了优化进步接口效率+这两个表是不相关的表,以是我们可以上我们的多线程分别插入数据这样子来优化
 
题目所在


Spring的@Transactional注解只能控制当火线程的事务而控制不到异步线程的事务,那就会出现部门事务回滚情况。例如A表乐成插入但是B表插入失败了,这个部门事务回滚的情况是不可的,不符合我们的原子性。以是我们要用编程式事务手动提交来控制我们的事务,末了来实现一个同一回滚大概同一提交,不能出现一部门提交一部门回滚的题目

代码逻辑


因为我们是数据库操作又是基于Spring-mybatis,在异步线程中我们不能用Autowired来注入我们的Mapper对象,以是就有了两个要使用的类
一个是操作数据库的SqlsessionFactory类
一个是我们操作Spring事务的PlatformTransactionManager类

另有三个重要的类
一个是用来提交回滚事物的TransactionStatus类
一个是用来控制和阻塞线程的CountDownLatch类
一个是多线程下包管线程安全的AtomicBoolean类

我们在异步线程开启手动提交事务然后操作数据库插入数据,此时我们的事务还没有提交
我们的异步线程被countDownLatch.await()阻塞,等待全部的异步线程都实行了countDownLatch.countDown(),也就是异步线程都实行完,我们的异步线程才开始提交事务
我们会有个AtomicBoolean类型的status变量,我们异步线程的其中一个报错后我们都会try-catch把这个变量改成false

末了我们的事务提交逻辑是,假如这个status是false,我们就同一提交事务,假如这个satus是false,我们就同一回滚事务
代码

package com.example.transational.Test;

import jakarta.annotation.Resource;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Component
public class Test {



    //两个重要类,SqlsessionFactory和PlatformTransactionManager
    @Resource
    private SqlSessionFactory sqlSessionFactory;

    @Autowired
    private PlatformTransactionManager transactionManager;


    public static final int CORE_POOL_SIZE = 5;
    public static final int MAX_POOL_SIZE = 10;
    public static final int QUEUE_CAPACITY = 100;
    public static final Long KEEP_ALIVE_TIME = 1L;

    public static final ThreadPoolExecutor kiraExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(QUEUE_CAPACITY),
            new ThreadPoolExecutor.AbortPolicy()
    );



    public void insert() {
   
      //状态变量,原子的Boolean类型防止多线程下出问题
      AtomicBoolean status = new AtomicBoolean(true);
      
      //countDwonLatch,我们有两个异步任务,当两个都执行完后我们的主线程再往下执行
      CountDownLatch countDownLatch = new CountDownLatch(2);
      
      //等两个事务都提交或回滚完我们再放行我们的主线程
      CountDownLatch waitCountDownLatch=new CountDownLatch(2);

      kiraExecutor.execute(() -> {

            TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());
            try{
                //我们要openSession设置为false,设置为手动提交
                SqlSession sqlSession = sqlSessionFactory.openSession(false);

                BookMapper bookMapper = sqlSession.getMapper(BookMapper.class);
                bookMapper.bookInsert(87979, 1);


//                手动抛出异常进行测试
//                throw new RuntimeException();
            } catch (Exception e) {
                status.set(false);
            } finally {
                countDownLatch.countDown();

            }

            try{
                countDownLatch.await();//等待所有的异步线程都执行完我们再提交
                if(status.get())
                {
                  transactionManager.commit(transaction);
                }
                else
                {
                  transactionManager.rollback(transaction);
                }

            }catch (Exception e)
            {

            }
            finally {
                waitCountDownLatch.countDown();
            }
      });


      kiraExecutor.execute(() -> {

            TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());
            try{

                //我们要openSession设置为false,设为手动提交
                SqlSession sqlSession = sqlSessionFactory.openSession(false);


                BookMapper bookMapper = sqlSession.getMapper(BookMapper.class);
                bookMapper.bookInsert2(84614, 1);

            } catch (Exception e) {
                status.set(false);
            } finally {
                countDownLatch.countDown();
            }


            try{
                countDownLatch.await();//我们要所有的异步线程都执行完我们再提交
                if(status.get())
                {
                  transactionManager.commit(transaction);
                }
                else
                {
                  transactionManager.rollback(transaction);
                }

            }catch (Exception e)
            {

            }
            finally {
                waitCountDownLatch.countDown();
            }


      });


      try{
            waitCountDownLatch.await();//等待所有的异步线程都执行完我们的主线程才继续往下执行
      }
      catch (Exception e)
      {

      }



    }




} 手动回滚场景存在的小题目

主键连续断开题目

主键连续断开了
因为我们是自增主键,但是我们这样子手动回滚的时间我们mysql不知道,以是我们的主键还是+了,以是另一个数据插入进来的时间,他的主键会断开不会连续
例如30,然后我们插入失败两次回滚了两次,第三次乐成了,他的主键不是31而是33

https://i-blog.csdnimg.cn/img_convert/6536b666d6bf2dfcefca47d0443b0cb2.png
commit的时间报错却不能同一回滚的题目

还记得我们之前的提交逻辑吗
我们是多个异步线程都没错,也就是我们的status变量为true,我们的多个线程再提交事务

但是有个题目?我们要是commit()的时间错误了怎么办?我们这时间没有通知机制让其他事务都回滚,因为我们阻塞结束后我们其实是并发来提交我们的事务的。

那我换个思绪串行提交不就好了?例如List<TransactionStatus>,然后我们在主线程同一for循环提交事务???
你觉得这样子就对了???那我for循环串行提交突然某一个事务因为commmit()堕落了那该怎么办?我们还是会导致部门事务回滚。
因为我们的事务commit()完之后就不能回滚了
以是说现在就是,假如我们的commit()不出现报错,那我们的业务逻辑是完全对的
假如我们的commit()出现报错,那么这个业务逻辑就有题目了
例如我们10个事务我们for循环提交了5个,但是第6个事务我们commit()堕落了咋办?我们commit()完后是不能回滚的对吧?

以是这种commit()报错导致的极端错误我们是无法解决的,什么时间会commit()堕落呢?那我也不知道哈哈哈,我想到一个场景就是我们有10次commit(),前面5次commit()乐成,第六次commit()报错因为Mysql宕机了,以是commit()报错
这就就类似于宕机错误,例如我们10个事务我们提交了5个然后java程序宕机了,后面5个事务也没提交,这种情况也会导致部门事务回滚。
我们并不能包管这10个事务提交的原子性
以是不思量commit()和java程序宕机,我们的这个逻辑就是原子性的,但是这两种极端情况我们处理不了
https://i-blog.csdnimg.cn/direct/2693caf3153346cea19665dc4d65d7c4.png 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 基于Mybtais的多线程事务的手动提交