一、认识2PC - 两阶段提交
1、理论
理论性的东西,懒得再打一遍了,贴在这了:
分布式事务详解【分布式事务的几种解决方案】彻底搞懂分布式事务
关键的两张图:
下图展示了2PC的两个阶段,分成功和失败两个情况阐明:
成功情况:
失败情况:
2、手撸XA-两阶段提交
(1)时序图
(2)代码实例
- import com.mysql.cj.jdbc.JdbcConnection;
- import com.mysql.cj.jdbc.MysqlXAConnection;
- import com.mysql.cj.jdbc.MysqlXid;
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
- import javax.transaction.xa.XAException;
- import javax.transaction.xa.XAResource;
- import javax.transaction.xa.Xid;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- import java.sql.SQLException;
- @SpringBootTest
- public class MysqlXaTest {
- @Test
- public void testXa() {
- try {
- //获取员工库的连接以及资源管理器
- JdbcConnection employeeConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/employee", "root", "rootroot");
- MysqlXAConnection employeeXAConnection = new MysqlXAConnection(employeeConnection, true);
- XAResource employeeXaResource = employeeXAConnection.getXAResource();
- //获取的员工薪资库的连接以及资源管理器
- JdbcConnection salaryConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/salary", "root", "rootroot");
- MysqlXAConnection salaryXAConnection = new MysqlXAConnection(salaryConnection, true);
- XAResource salaryXaResource = salaryXAConnection.getXAResource();
- // 全局事务id
- byte[] gtrid = "g00003".getBytes();
- // 分支事务id
- byte[] bqual = "b00001".getBytes();
- // 标识,一般是个固定值
- int formatId = 1;
- //开启员工插入的分支事务
- Xid employeeXid = new MysqlXid(gtrid, bqual, formatId);
- employeeXaResource.start(employeeXid, XAResource.TMNOFLAGS);
- PreparedStatement preparedStatement = employeeConnection.prepareStatement("insert into employee (name, sex, level) values ('小10', '女', '7')");
- preparedStatement.execute();
- employeeXaResource.end(employeeXid, XAResource.TMSUCCESS);
- //开启员工薪资的分支事务
- byte[] salaryBqual = "b00002".getBytes();
- Xid salaryXid = new MysqlXid(gtrid, salaryBqual, formatId);
- salaryXaResource.start(salaryXid, XAResource.TMNOFLAGS);
- PreparedStatement salaryPreparedStatement = salaryConnection.prepareStatement("insert into employee_salary (employee_id, salary) values ('12', 7000)");
- salaryPreparedStatement.execute();
- salaryXaResource.end(salaryXid, XAResource.TMSUCCESS);
- //第一阶段-准备阶段
- int employeePrepareResult = employeeXaResource.prepare(employeeXid);
- int salaryPrepareResult = salaryXaResource.prepare(salaryXid);
- //第二阶段-根据准备阶段的结果。判断是要执行commit还是rollback
- if (employeePrepareResult == XAResource.XA_OK && salaryPrepareResult == XAResource.XA_OK) {
- employeeXaResource.commit(employeeXid, false);
- salaryXaResource.commit(salaryXid, false);
- } else {
- employeeXaResource.rollback(employeeXid);
- salaryXaResource.rollback(salaryXid);
- }
- } catch (SQLException | XAException e) {
- throw new RuntimeException(e);
- }
- }
- }
复制代码 3、认识JTA
JTA(Java Transaction API):是Java平台上一个尺度API,用于管理和控制分布式事务的实行流程。
核心类:
javax.transaction.UserTransaction:袒露给应用使用,用来启动、提交、回滚事务。
javax.transaction.TransactionManager:提供给事务管理器的接口,用于协调和控制分布式事务的实行过程。
javax.transaction.XAResource:表示一个资源管理器,用于管理和操纵资源。
javax.transaction.Xid:用于唯一标识一个分布式事务。
4、今天的主角:Atomikos
Atomikos是一个开源的事务管理器,用于管理和控制分布式事务的实行过程。提供了一个可靠的、高性能的事务管明白决方案,可以与多种应用程序和数据库集成。
简单明白就是,Atomikos是可以集成在我们Java代码里面,和我们的业务代码绑定到同一个Java进程里面的一个事务管理器的框架,可以帮助我们业务程序去自行实现分布式事务。
Atomikos特点:支持分布式事务、支持多种web服务器、支持多种数据库、支持XA协议、提供高性能的事务管理。
Atomikos可以解决,在同一个应用下,毗连多个数据库,实现分布式事务。
5、2PC存在的问题
1、TM单点问题。TM挂掉之后,无法回滚和提交。
2、资源锁定的问题。资源锁定之后,TM挂掉无法回滚和提交。
3、性能瓶颈。资源锁定时间长。
4、数据不同等问题。commit时成功状态不同等就会造成数据不同等。
二、Atomikos使用
1、依赖+设置
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jta-atomikos</artifactId>
- </dependency>
复制代码- server.port=8080
- spring.employee-datasource.driverClassName = com.mysql.jdbc.Driver
- spring.employee-datasource.jdbc-url = jdbc:mysql://localhost:3306/employee?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
- spring.employee-datasource.username = root
- spring.employee-datasource.password = rootroot
- spring.salary-datasource.driverClassName = com.mysql.jdbc.Driver
- spring.salary-datasource.jdbc-url = jdbc:mysql://localhost:3306/salary?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
- spring.salary-datasource.username = root
- spring.salary-datasource.password = rootroot
- logging.level.com.atomikos = debug
复制代码 2、定义AtomikosDataSourceBean数据源
- import com.atomikos.jdbc.AtomikosDataSourceBean;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import javax.sql.DataSource;
- import java.util.Properties;
- @Configuration
- public class AtomikosDataSourceConfig {
- @Value("${spring.employee-datasource.jdbc-url}")
- private String employeeUrl;
- @Value("${spring.employee-datasource.username}")
- private String employeeUser;
- @Value("${spring.employee-datasource.password}")
- private String employeePassword;
- @Value("${spring.salary-datasource.jdbc-url}")
- private String salaryUrl;
- @Value("${spring.salary-datasource.username}")
- private String salaryUser;
- @Value("${spring.salary-datasource.password}")
- private String salaryPassword;
- /**
- * 定义两个数据源,分别对应两个数据库
- */
- @Bean(name = "employeeDataSource")
- public DataSource employeeDataSource(){
- AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
- atomikosDataSourceBean.setUniqueResourceName("employeeDataSource");
- atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
- Properties properties = new Properties();
- properties.setProperty("URL", employeeUrl);
- properties.setProperty("user", employeeUser);
- properties.setProperty("password", employeePassword);
- atomikosDataSourceBean.setXaProperties(properties);
- return atomikosDataSourceBean;
- }
- /**
- * 定义两个数据源,分别对应两个数据库
- */
- @Bean(name = "salaryDataSource")
- public DataSource salaryDataSource(){
- AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
- atomikosDataSourceBean.setUniqueResourceName("salaryDataSource");
- atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
- Properties properties = new Properties();
- properties.setProperty("URL", salaryUrl);
- properties.setProperty("user", salaryUser);
- properties.setProperty("password", salaryPassword);
- atomikosDataSourceBean.setXaProperties(properties);
- return atomikosDataSourceBean;
- }
- }
复制代码 3、定义事务管理器JtaTransactionManager
- import com.atomikos.icatch.jta.UserTransactionImp;
- import com.atomikos.icatch.jta.UserTransactionManager;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.DependsOn;
- import org.springframework.transaction.PlatformTransactionManager;
- import org.springframework.transaction.jta.JtaTransactionManager;
- import javax.transaction.TransactionManager;
- import javax.transaction.UserTransaction;
- @Configuration
- public class AtomikosConfig {
- // JTA的事务管理
- @Bean(name = "userTransaction")
- public UserTransaction userTransaction() {
- return new UserTransactionImp();
- }
- @Bean(name = "atomikosTransactionManager")
- public TransactionManager atomikosTransactionManager() {
- return new UserTransactionManager();
- }
- /**
- * 事务管理器
- */
- @Bean(name = "platformTransactionManager")
- @DependsOn({"userTransaction", "atomikosTransactionManager"})
- public PlatformTransactionManager transactionManager() {
- UserTransaction userTransaction = userTransaction();
- TransactionManager transactionManager = atomikosTransactionManager();
- return new JtaTransactionManager(userTransaction, transactionManager);
- }
- }
复制代码 4、MyBatis设置
- import lombok.SneakyThrows;
- import org.apache.ibatis.session.SqlSessionFactory;
- import org.mybatis.spring.SqlSessionFactoryBean;
- import org.mybatis.spring.annotation.MapperScan;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import javax.sql.DataSource;
- @Configuration
- @MapperScan(basePackages = "com.example.distributetransaction.dao", sqlSessionFactoryRef = "sqlSessionFactoryEmployee")
- public class EmployeeMybatisConfig {
- @SneakyThrows
- @Bean
- public SqlSessionFactory sqlSessionFactoryEmployee(@Qualifier("employeeDataSource") DataSource dataSource) throws Exception {
- SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
- factoryBean.setDataSource(dataSource);
- return factoryBean.getObject();
- }
- }
复制代码- import lombok.SneakyThrows;
- import org.apache.ibatis.session.SqlSessionFactory;
- import org.mybatis.spring.SqlSessionFactoryBean;
- import org.mybatis.spring.annotation.MapperScan;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import javax.sql.DataSource;
- @Configuration
- @MapperScan(basePackages = "com.example.distributetransaction.dao1", sqlSessionFactoryRef = "sqlSessionFactorySalary")
- public class SalaryMybatisConfig {
- @SneakyThrows
- @Bean
- public SqlSessionFactory sqlSessionFactorySalary(@Qualifier("salaryDataSource") DataSource dataSource) throws Exception {
- SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
- factoryBean.setDataSource(dataSource);
- return factoryBean.getObject();
- }
- }
复制代码 5、验证
- @Transactional(rollbackFor = Exception.class)
- public String join(EmployeeEntity employeeEntity) {
- //第一步,插入员工基础信息
- employeeDao.insertEmployee(employeeEntity);
- //第二步,插入员工薪资
- employeeSalaryDao.insertEmployeeSalary(employeeEntity.getId(), employeeEntity.getSalary());
- int i = 1 / 0;
- return "员工入职成功";
- }
复制代码 三、Atomikos源码分析
1、@Transactional入口:TransactionInterceptor创建事务流程
- (1)Spring事务入口:@Transactional
- (2)TransactionInterceptor#invoke:Spring事务的署理拦截方法
- (3)TransactionAspectSupport#determineTransactionManager:确定事务管理器=>我们创建的JtaTransactionManager
- (4)TransactionAspectSupport#createTransactionIfNecessary:创建事务
- (5)AbstractPlatformTransactionManager#getTransaction:获取事务
- (6)JtaTransactionManager#doGetTransaction:获取事务,拿到JtaTransactionObject,里面封装了UserTransactionImp
- (7)JtaTransactionManager获取我们设置的UserTransactionImp
JtaTransactionManager#doGetTransaction:获取事务
2、启动事务
- (1)从AbstractPlatformTransactionManager#handleExistingTransaction调用AbstractPlatformTransactionManager#startTransaction开启事务
- (2)调用JtaTransactionManager#doBegin开启事务
- (3)调用JtaTransactionManager#doJtaBegin开启事务
- (4)调用UserTransactionImp#begin开启事务
- (5)最终调用的是UserTransactionManager#begin开启事务
- (6)调用TransactionManagerImp#begin()开启事务
- (7)调用CompositeTransactionManagerImp#createCompositeTransaction创建分布式事务
3、小总结:启动全局事务流程图
4、分支事务,业务流程实行过程
5、事务提交与回滚
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |