基于Mybtais的多线程事务的手动提交
解释业务需求
我们的要将数据同步到两个表里面,这两个表是不相关的,因为我们传输的数据量过大以是接口超时了,为了优化进步接口效率+这两个表是不相关的表,以是我们可以上我们的多线程分别插入数据这样子来优化
题目所在
Spring的@Transactional注解只能控制当火线程的事务而控制不到异步线程的事务,那就会出现部门事务回滚情况。例如A表乐成插入但是B表插入失败了,这个部门事务回滚的情况是不可的,不符合我们的原子性。以是我们要用编程式事务手动提交来控制我们的事务,末了来实现一个同一回滚大概同一提交,不能出现一部门提交一部门回滚的题目
代码逻辑
因为我们是数据库操作又是基于Spring-mybatis,在异步线程中我们不能用Autowired来注入我们的Mapper对象,以是就有了两个要使用的类
一个是操作数据库的SqlsessionFactory类
一个是我们操作Spring事务的PlatformTransactionManager类
另有三个重要的类
一个是用来提交回滚事物的TransactionStatus类
一个是用来控制和阻塞线程的CountDownLatch类
一个是多线程下包管线程安全的AtomicBoolean类
我们在异步线程开启手动提交事务然后操作数据库插入数据,此时我们的事务还没有提交
我们的异步线程被countDownLatch.await()阻塞,等待全部的异步线程都实行了countDownLatch.countDown(),也就是异步线程都实行完,我们的异步线程才开始提交事务
我们会有个AtomicBoolean类型的status变量,我们异步线程的其中一个报错后我们都会try-catch把这个变量改成false
末了我们的事务提交逻辑是,假如这个status是false,我们就同一提交事务,假如这个satus是false,我们就同一回滚事务
代码
package com.example.transational.Test;
import jakarta.annotation.Resource;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class Test {
//两个重要类,SqlsessionFactory和PlatformTransactionManager
@Resource
private SqlSessionFactory sqlSessionFactory;
@Autowired
private PlatformTransactionManager transactionManager;
public static final int CORE_POOL_SIZE = 5;
public static final int MAX_POOL_SIZE = 10;
public static final int QUEUE_CAPACITY = 100;
public static final Long KEEP_ALIVE_TIME = 1L;
public static final ThreadPoolExecutor kiraExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.AbortPolicy()
);
public void insert() {
//状态变量,原子的Boolean类型防止多线程下出问题
AtomicBoolean status = new AtomicBoolean(true);
//countDwonLatch,我们有两个异步任务,当两个都执行完后我们的主线程再往下执行
CountDownLatch countDownLatch = new CountDownLatch(2);
//等两个事务都提交或回滚完我们再放行我们的主线程
CountDownLatch waitCountDownLatch=new CountDownLatch(2);
kiraExecutor.execute(() -> {
TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());
try{
//我们要openSession设置为false,设置为手动提交
SqlSession sqlSession = sqlSessionFactory.openSession(false);
BookMapper bookMapper = sqlSession.getMapper(BookMapper.class);
bookMapper.bookInsert(87979, 1);
// 手动抛出异常进行测试
// throw new RuntimeException();
} catch (Exception e) {
status.set(false);
} finally {
countDownLatch.countDown();
}
try{
countDownLatch.await();//等待所有的异步线程都执行完我们再提交
if(status.get())
{
transactionManager.commit(transaction);
}
else
{
transactionManager.rollback(transaction);
}
}catch (Exception e)
{
}
finally {
waitCountDownLatch.countDown();
}
});
kiraExecutor.execute(() -> {
TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());
try{
//我们要openSession设置为false,设为手动提交
SqlSession sqlSession = sqlSessionFactory.openSession(false);
BookMapper bookMapper = sqlSession.getMapper(BookMapper.class);
bookMapper.bookInsert2(84614, 1);
} catch (Exception e) {
status.set(false);
} finally {
countDownLatch.countDown();
}
try{
countDownLatch.await();//我们要所有的异步线程都执行完我们再提交
if(status.get())
{
transactionManager.commit(transaction);
}
else
{
transactionManager.rollback(transaction);
}
}catch (Exception e)
{
}
finally {
waitCountDownLatch.countDown();
}
});
try{
waitCountDownLatch.await();//等待所有的异步线程都执行完我们的主线程才继续往下执行
}
catch (Exception e)
{
}
}
} 手动回滚场景存在的小题目
主键连续断开题目
主键连续断开了
因为我们是自增主键,但是我们这样子手动回滚的时间我们mysql不知道,以是我们的主键还是+了,以是另一个数据插入进来的时间,他的主键会断开不会连续
例如30,然后我们插入失败两次回滚了两次,第三次乐成了,他的主键不是31而是33
https://i-blog.csdnimg.cn/img_convert/6536b666d6bf2dfcefca47d0443b0cb2.png
commit的时间报错却不能同一回滚的题目
还记得我们之前的提交逻辑吗
我们是多个异步线程都没错,也就是我们的status变量为true,我们的多个线程再提交事务
但是有个题目?我们要是commit()的时间错误了怎么办?我们这时间没有通知机制让其他事务都回滚,因为我们阻塞结束后我们其实是并发来提交我们的事务的。
那我换个思绪串行提交不就好了?例如List<TransactionStatus>,然后我们在主线程同一for循环提交事务???
你觉得这样子就对了???那我for循环串行提交突然某一个事务因为commmit()堕落了那该怎么办?我们还是会导致部门事务回滚。
因为我们的事务commit()完之后就不能回滚了
以是说现在就是,假如我们的commit()不出现报错,那我们的业务逻辑是完全对的
假如我们的commit()出现报错,那么这个业务逻辑就有题目了
例如我们10个事务我们for循环提交了5个,但是第6个事务我们commit()堕落了咋办?我们commit()完后是不能回滚的对吧?
以是这种commit()报错导致的极端错误我们是无法解决的,什么时间会commit()堕落呢?那我也不知道哈哈哈,我想到一个场景就是我们有10次commit(),前面5次commit()乐成,第六次commit()报错因为Mysql宕机了,以是commit()报错
这就就类似于宕机错误,例如我们10个事务我们提交了5个然后java程序宕机了,后面5个事务也没提交,这种情况也会导致部门事务回滚。
我们并不能包管这10个事务提交的原子性
以是不思量commit()和java程序宕机,我们的这个逻辑就是原子性的,但是这两种极端情况我们处理不了
https://i-blog.csdnimg.cn/direct/2693caf3153346cea19665dc4d65d7c4.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]