Atomikos详解:数据库XA规范与Atomikos使用与源码分析

打印 上一主题 下一主题

主题 831|帖子 831|积分 2493

一、认识2PC - 两阶段提交

1、理论

理论性的东西,懒得再打一遍了,贴在这了:
分布式事务详解【分布式事务的几种解决方案】彻底搞懂分布式事务
关键的两张图:
下图展示了2PC的两个阶段,分成功和失败两个情况阐明:
成功情况:

失败情况:

2、手撸XA-两阶段提交

(1)时序图


(2)代码实例

  1. import com.mysql.cj.jdbc.JdbcConnection;
  2. import com.mysql.cj.jdbc.MysqlXAConnection;
  3. import com.mysql.cj.jdbc.MysqlXid;
  4. import org.junit.jupiter.api.Test;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import javax.transaction.xa.XAException;
  7. import javax.transaction.xa.XAResource;
  8. import javax.transaction.xa.Xid;
  9. import java.sql.Connection;
  10. import java.sql.DriverManager;
  11. import java.sql.PreparedStatement;
  12. import java.sql.SQLException;
  13. @SpringBootTest
  14. public class MysqlXaTest {
  15.     @Test
  16.     public void testXa() {
  17.         try {
  18.             //获取员工库的连接以及资源管理器
  19.             JdbcConnection employeeConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/employee", "root", "rootroot");
  20.             MysqlXAConnection employeeXAConnection = new MysqlXAConnection(employeeConnection, true);
  21.             XAResource employeeXaResource = employeeXAConnection.getXAResource();
  22.             //获取的员工薪资库的连接以及资源管理器
  23.             JdbcConnection salaryConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/salary", "root", "rootroot");
  24.             MysqlXAConnection salaryXAConnection = new MysqlXAConnection(salaryConnection, true);
  25.             XAResource salaryXaResource = salaryXAConnection.getXAResource();
  26.             // 全局事务id
  27.             byte[] gtrid = "g00003".getBytes();
  28.             // 分支事务id
  29.             byte[] bqual = "b00001".getBytes();
  30.             // 标识,一般是个固定值
  31.             int formatId = 1;
  32.             //开启员工插入的分支事务
  33.             Xid employeeXid = new MysqlXid(gtrid, bqual, formatId);
  34.             employeeXaResource.start(employeeXid, XAResource.TMNOFLAGS);
  35.             PreparedStatement preparedStatement = employeeConnection.prepareStatement("insert into employee (name, sex, level) values ('小10', '女', '7')");
  36.             preparedStatement.execute();
  37.             employeeXaResource.end(employeeXid, XAResource.TMSUCCESS);
  38.             //开启员工薪资的分支事务
  39.             byte[] salaryBqual = "b00002".getBytes();
  40.             Xid salaryXid = new MysqlXid(gtrid, salaryBqual, formatId);
  41.             salaryXaResource.start(salaryXid, XAResource.TMNOFLAGS);
  42.             PreparedStatement salaryPreparedStatement = salaryConnection.prepareStatement("insert into employee_salary (employee_id, salary) values ('12', 7000)");
  43.             salaryPreparedStatement.execute();
  44.             salaryXaResource.end(salaryXid, XAResource.TMSUCCESS);
  45.             //第一阶段-准备阶段
  46.             int employeePrepareResult = employeeXaResource.prepare(employeeXid);
  47.             int salaryPrepareResult = salaryXaResource.prepare(salaryXid);
  48.             //第二阶段-根据准备阶段的结果。判断是要执行commit还是rollback
  49.             if (employeePrepareResult == XAResource.XA_OK && salaryPrepareResult == XAResource.XA_OK) {
  50.                 employeeXaResource.commit(employeeXid, false);
  51.                 salaryXaResource.commit(salaryXid, false);
  52.             } else {
  53.                 employeeXaResource.rollback(employeeXid);
  54.                 salaryXaResource.rollback(salaryXid);
  55.             }
  56.         } catch (SQLException | XAException e) {
  57.             throw new RuntimeException(e);
  58.         }
  59.     }
  60. }
复制代码
3、认识JTA

JTA(Java Transaction API):是Java平台上一个尺度API,用于管理和控制分布式事务的实行流程。
核心类:
javax.transaction.UserTransaction:袒露给应用使用,用来启动、提交、回滚事务。
javax.transaction.TransactionManager:提供给事务管理器的接口,用于协调和控制分布式事务的实行过程。
javax.transaction.XAResource:表示一个资源管理器,用于管理和操纵资源。
javax.transaction.Xid:用于唯一标识一个分布式事务。
4、今天的主角:Atomikos

Atomikos是一个开源的事务管理器,用于管理和控制分布式事务的实行过程。提供了一个可靠的、高性能的事务管明白决方案,可以与多种应用程序和数据库集成。
简单明白就是,Atomikos是可以集成在我们Java代码里面,和我们的业务代码绑定到同一个Java进程里面的一个事务管理器的框架,可以帮助我们业务程序去自行实现分布式事务。
Atomikos特点:支持分布式事务、支持多种web服务器、支持多种数据库、支持XA协议、提供高性能的事务管理。
Atomikos可以解决,在同一个应用下,毗连多个数据库,实现分布式事务。
5、2PC存在的问题

1、TM单点问题。TM挂掉之后,无法回滚和提交。
2、资源锁定的问题。资源锁定之后,TM挂掉无法回滚和提交。
3、性能瓶颈。资源锁定时间长。
4、数据不同等问题。commit时成功状态不同等就会造成数据不同等。

二、Atomikos使用

1、依赖+设置

  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-jta-atomikos</artifactId>
  4. </dependency>
复制代码
  1. server.port=8080
  2. spring.employee-datasource.driverClassName = com.mysql.jdbc.Driver
  3. spring.employee-datasource.jdbc-url = jdbc:mysql://localhost:3306/employee?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
  4. spring.employee-datasource.username = root
  5. spring.employee-datasource.password = rootroot
  6. spring.salary-datasource.driverClassName = com.mysql.jdbc.Driver
  7. spring.salary-datasource.jdbc-url = jdbc:mysql://localhost:3306/salary?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
  8. spring.salary-datasource.username = root
  9. spring.salary-datasource.password = rootroot
  10. logging.level.com.atomikos = debug
复制代码
2、定义AtomikosDataSourceBean数据源

  1. import com.atomikos.jdbc.AtomikosDataSourceBean;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import javax.sql.DataSource;
  6. import java.util.Properties;
  7. @Configuration
  8. public class AtomikosDataSourceConfig {
  9.     @Value("${spring.employee-datasource.jdbc-url}")
  10.     private String employeeUrl;
  11.     @Value("${spring.employee-datasource.username}")
  12.     private String employeeUser;
  13.     @Value("${spring.employee-datasource.password}")
  14.     private String employeePassword;
  15.     @Value("${spring.salary-datasource.jdbc-url}")
  16.     private String salaryUrl;
  17.     @Value("${spring.salary-datasource.username}")
  18.     private String salaryUser;
  19.     @Value("${spring.salary-datasource.password}")
  20.     private String salaryPassword;
  21.     /**
  22.      * 定义两个数据源,分别对应两个数据库
  23.      */
  24.     @Bean(name = "employeeDataSource")
  25.     public DataSource employeeDataSource(){
  26.         AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
  27.         atomikosDataSourceBean.setUniqueResourceName("employeeDataSource");
  28.         atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
  29.         Properties properties = new Properties();
  30.         properties.setProperty("URL", employeeUrl);
  31.         properties.setProperty("user", employeeUser);
  32.         properties.setProperty("password", employeePassword);
  33.         atomikosDataSourceBean.setXaProperties(properties);
  34.         return atomikosDataSourceBean;
  35.     }
  36.     /**
  37.      * 定义两个数据源,分别对应两个数据库
  38.      */
  39.     @Bean(name = "salaryDataSource")
  40.     public DataSource salaryDataSource(){
  41.         AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
  42.         atomikosDataSourceBean.setUniqueResourceName("salaryDataSource");
  43.         atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
  44.         Properties properties = new Properties();
  45.         properties.setProperty("URL", salaryUrl);
  46.         properties.setProperty("user", salaryUser);
  47.         properties.setProperty("password", salaryPassword);
  48.         atomikosDataSourceBean.setXaProperties(properties);
  49.         return atomikosDataSourceBean;
  50.     }
  51. }
复制代码
3、定义事务管理器JtaTransactionManager

  1. import com.atomikos.icatch.jta.UserTransactionImp;
  2. import com.atomikos.icatch.jta.UserTransactionManager;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.context.annotation.DependsOn;
  6. import org.springframework.transaction.PlatformTransactionManager;
  7. import org.springframework.transaction.jta.JtaTransactionManager;
  8. import javax.transaction.TransactionManager;
  9. import javax.transaction.UserTransaction;
  10. @Configuration
  11. public class AtomikosConfig {
  12.     // JTA的事务管理
  13.     @Bean(name = "userTransaction")
  14.     public UserTransaction userTransaction() {
  15.         return new UserTransactionImp();
  16.     }
  17.     @Bean(name = "atomikosTransactionManager")
  18.     public TransactionManager atomikosTransactionManager() {
  19.         return new UserTransactionManager();
  20.     }
  21.     /**
  22.      * 事务管理器
  23.      */
  24.     @Bean(name = "platformTransactionManager")
  25.     @DependsOn({"userTransaction", "atomikosTransactionManager"})
  26.     public PlatformTransactionManager transactionManager() {
  27.         UserTransaction userTransaction = userTransaction();
  28.         TransactionManager transactionManager = atomikosTransactionManager();
  29.         return new JtaTransactionManager(userTransaction, transactionManager);
  30.     }
  31. }
复制代码
4、MyBatis设置

  1. import lombok.SneakyThrows;
  2. import org.apache.ibatis.session.SqlSessionFactory;
  3. import org.mybatis.spring.SqlSessionFactoryBean;
  4. import org.mybatis.spring.annotation.MapperScan;
  5. import org.springframework.beans.factory.annotation.Qualifier;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import javax.sql.DataSource;
  9. @Configuration
  10. @MapperScan(basePackages = "com.example.distributetransaction.dao", sqlSessionFactoryRef = "sqlSessionFactoryEmployee")
  11. public class EmployeeMybatisConfig {
  12.     @SneakyThrows
  13.     @Bean
  14.     public SqlSessionFactory sqlSessionFactoryEmployee(@Qualifier("employeeDataSource") DataSource dataSource) throws Exception {
  15.         SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
  16.         factoryBean.setDataSource(dataSource);
  17.         return factoryBean.getObject();
  18.     }
  19. }
复制代码
  1. import lombok.SneakyThrows;
  2. import org.apache.ibatis.session.SqlSessionFactory;
  3. import org.mybatis.spring.SqlSessionFactoryBean;
  4. import org.mybatis.spring.annotation.MapperScan;
  5. import org.springframework.beans.factory.annotation.Qualifier;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import javax.sql.DataSource;
  9. @Configuration
  10. @MapperScan(basePackages = "com.example.distributetransaction.dao1", sqlSessionFactoryRef = "sqlSessionFactorySalary")
  11. public class SalaryMybatisConfig {
  12.     @SneakyThrows
  13.     @Bean
  14.     public SqlSessionFactory sqlSessionFactorySalary(@Qualifier("salaryDataSource") DataSource dataSource) throws Exception {
  15.         SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
  16.         factoryBean.setDataSource(dataSource);
  17.         return factoryBean.getObject();
  18.     }
  19. }
复制代码
5、验证

  1. @Transactional(rollbackFor = Exception.class)
  2. public String join(EmployeeEntity employeeEntity) {
  3.     //第一步,插入员工基础信息
  4.     employeeDao.insertEmployee(employeeEntity);
  5.     //第二步,插入员工薪资
  6.     employeeSalaryDao.insertEmployeeSalary(employeeEntity.getId(), employeeEntity.getSalary());
  7.     int i = 1 / 0;
  8.     return "员工入职成功";
  9. }
复制代码
三、Atomikos源码分析

1、@Transactional入口:TransactionInterceptor创建事务流程



  • (1)Spring事务入口:@Transactional
  • (2)TransactionInterceptor#invoke:Spring事务的署理拦截方法
  • (3)TransactionAspectSupport#determineTransactionManager:确定事务管理器=>我们创建的JtaTransactionManager
  • (4)TransactionAspectSupport#createTransactionIfNecessary:创建事务
  • (5)AbstractPlatformTransactionManager#getTransaction:获取事务
  • (6)JtaTransactionManager#doGetTransaction:获取事务,拿到JtaTransactionObject,里面封装了UserTransactionImp
  • (7)JtaTransactionManager获取我们设置的UserTransactionImp



JtaTransactionManager#doGetTransaction:获取事务


2、启动事务



  • (1)从AbstractPlatformTransactionManager#handleExistingTransaction调用AbstractPlatformTransactionManager#startTransaction开启事务
  • (2)调用JtaTransactionManager#doBegin开启事务
  • (3)调用JtaTransactionManager#doJtaBegin开启事务
  • (4)调用UserTransactionImp#begin开启事务
  • (5)最终调用的是UserTransactionManager#begin开启事务
  • (6)调用TransactionManagerImp#begin()开启事务
  • (7)调用CompositeTransactionManagerImp#createCompositeTransaction创建分布式事务




3、小总结:启动全局事务流程图


4、分支事务,业务流程实行过程


5、事务提交与回滚






免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

涛声依旧在

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表