涛声依旧在 发表于 2024-7-29 22:42:25

Atomikos详解:数据库XA规范与Atomikos使用与源码分析

一、认识2PC - 两阶段提交

1、理论

理论性的东西,懒得再打一遍了,贴在这了:
分布式事务详解【分布式事务的几种解决方案】彻底搞懂分布式事务
关键的两张图:
下图展示了2PC的两个阶段,分成功和失败两个情况阐明:
成功情况:
https://i-blog.csdnimg.cn/blog_migrate/9b35280fdca5ab8df56d110b4f692d73.png
失败情况:
https://i-blog.csdnimg.cn/blog_migrate/4a67e06f5f7abb6f55d2019b3a4ac901.png
2、手撸XA-两阶段提交

(1)时序图

https://i-blog.csdnimg.cn/blog_migrate/03a9642e86861a831b3d01e983dff5a8.png
(2)代码实例

import com.mysql.cj.jdbc.JdbcConnection;
import com.mysql.cj.jdbc.MysqlXAConnection;
import com.mysql.cj.jdbc.MysqlXid;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

@SpringBootTest
public class MysqlXaTest {

    @Test
    public void testXa() {
      try {
            //获取员工库的连接以及资源管理器
            JdbcConnection employeeConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/employee", "root", "rootroot");
            MysqlXAConnection employeeXAConnection = new MysqlXAConnection(employeeConnection, true);
            XAResource employeeXaResource = employeeXAConnection.getXAResource();

            //获取的员工薪资库的连接以及资源管理器
            JdbcConnection salaryConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/salary", "root", "rootroot");
            MysqlXAConnection salaryXAConnection = new MysqlXAConnection(salaryConnection, true);
            XAResource salaryXaResource = salaryXAConnection.getXAResource();


            // 全局事务id
            byte[] gtrid = "g00003".getBytes();
            // 分支事务id
            byte[] bqual = "b00001".getBytes();
            // 标识,一般是个固定值
            int formatId = 1;

            //开启员工插入的分支事务
            Xid employeeXid = new MysqlXid(gtrid, bqual, formatId);
            employeeXaResource.start(employeeXid, XAResource.TMNOFLAGS);
            PreparedStatement preparedStatement = employeeConnection.prepareStatement("insert into employee (name, sex, level) values ('小10', '女', '7')");
            preparedStatement.execute();
            employeeXaResource.end(employeeXid, XAResource.TMSUCCESS);

            //开启员工薪资的分支事务
            byte[] salaryBqual = "b00002".getBytes();
            Xid salaryXid = new MysqlXid(gtrid, salaryBqual, formatId);
            salaryXaResource.start(salaryXid, XAResource.TMNOFLAGS);
            PreparedStatement salaryPreparedStatement = salaryConnection.prepareStatement("insert into employee_salary (employee_id, salary) values ('12', 7000)");
            salaryPreparedStatement.execute();
            salaryXaResource.end(salaryXid, XAResource.TMSUCCESS);

            //第一阶段-准备阶段
            int employeePrepareResult = employeeXaResource.prepare(employeeXid);
            int salaryPrepareResult = salaryXaResource.prepare(salaryXid);

            //第二阶段-根据准备阶段的结果。判断是要执行commit还是rollback
            if (employeePrepareResult == XAResource.XA_OK && salaryPrepareResult == XAResource.XA_OK) {
                employeeXaResource.commit(employeeXid, false);
                salaryXaResource.commit(salaryXid, false);
            } else {
                employeeXaResource.rollback(employeeXid);
                salaryXaResource.rollback(salaryXid);
            }
      } catch (SQLException | XAException e) {
            throw new RuntimeException(e);
      }
    }
}

3、认识JTA

JTA(Java Transaction API):是Java平台上一个尺度API,用于管理和控制分布式事务的实行流程。
核心类:
javax.transaction.UserTransaction:袒露给应用使用,用来启动、提交、回滚事务。
javax.transaction.TransactionManager:提供给事务管理器的接口,用于协调和控制分布式事务的实行过程。
javax.transaction.XAResource:表示一个资源管理器,用于管理和操纵资源。
javax.transaction.Xid:用于唯一标识一个分布式事务。
4、今天的主角:Atomikos

Atomikos是一个开源的事务管理器,用于管理和控制分布式事务的实行过程。提供了一个可靠的、高性能的事务管明白决方案,可以与多种应用程序和数据库集成。
简单明白就是,Atomikos是可以集成在我们Java代码里面,和我们的业务代码绑定到同一个Java进程里面的一个事务管理器的框架,可以帮助我们业务程序去自行实现分布式事务。
Atomikos特点:支持分布式事务、支持多种web服务器、支持多种数据库、支持XA协议、提供高性能的事务管理。
Atomikos可以解决,在同一个应用下,毗连多个数据库,实现分布式事务。
5、2PC存在的问题

1、TM单点问题。TM挂掉之后,无法回滚和提交。
2、资源锁定的问题。资源锁定之后,TM挂掉无法回滚和提交。
3、性能瓶颈。资源锁定时间长。
4、数据不同等问题。commit时成功状态不同等就会造成数据不同等。
https://i-blog.csdnimg.cn/blog_migrate/7172e14ff7243bd87ec5ab6ef2322e92.png
二、Atomikos使用

1、依赖+设置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
server.port=8080

spring.employee-datasource.driverClassName = com.mysql.jdbc.Driver
spring.employee-datasource.jdbc-url = jdbc:mysql://localhost:3306/employee?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.employee-datasource.username = root
spring.employee-datasource.password = rootroot

spring.salary-datasource.driverClassName = com.mysql.jdbc.Driver
spring.salary-datasource.jdbc-url = jdbc:mysql://localhost:3306/salary?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.salary-datasource.username = root
spring.salary-datasource.password = rootroot

logging.level.com.atomikos = debug
2、定义AtomikosDataSourceBean数据源

import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.util.Properties;

@Configuration
public class AtomikosDataSourceConfig {

    @Value("${spring.employee-datasource.jdbc-url}")
    private String employeeUrl;

    @Value("${spring.employee-datasource.username}")
    private String employeeUser;

    @Value("${spring.employee-datasource.password}")
    private String employeePassword;

    @Value("${spring.salary-datasource.jdbc-url}")
    private String salaryUrl;

    @Value("${spring.salary-datasource.username}")
    private String salaryUser;

    @Value("${spring.salary-datasource.password}")
    private String salaryPassword;

    /**
   * 定义两个数据源,分别对应两个数据库
   */
    @Bean(name = "employeeDataSource")
    public DataSource employeeDataSource(){
      AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
      atomikosDataSourceBean.setUniqueResourceName("employeeDataSource");
      atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");

      Properties properties = new Properties();
      properties.setProperty("URL", employeeUrl);
      properties.setProperty("user", employeeUser);
      properties.setProperty("password", employeePassword);
      atomikosDataSourceBean.setXaProperties(properties);
      return atomikosDataSourceBean;
    }

    /**
   * 定义两个数据源,分别对应两个数据库
   */
    @Bean(name = "salaryDataSource")
    public DataSource salaryDataSource(){
      AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
      atomikosDataSourceBean.setUniqueResourceName("salaryDataSource");
      atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");

      Properties properties = new Properties();
      properties.setProperty("URL", salaryUrl);
      properties.setProperty("user", salaryUser);
      properties.setProperty("password", salaryPassword);
      atomikosDataSourceBean.setXaProperties(properties);
      return atomikosDataSourceBean;
    }
}

3、定义事务管理器JtaTransactionManager

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

@Configuration
public class AtomikosConfig {

    // JTA的事务管理
    @Bean(name = "userTransaction")
    public UserTransaction userTransaction() {
      return new UserTransactionImp();
    }

    @Bean(name = "atomikosTransactionManager")
    public TransactionManager atomikosTransactionManager() {
      return new UserTransactionManager();
    }

    /**
   * 事务管理器
   */
    @Bean(name = "platformTransactionManager")
    @DependsOn({"userTransaction", "atomikosTransactionManager"})
    public PlatformTransactionManager transactionManager() {
      UserTransaction userTransaction = userTransaction();
      TransactionManager transactionManager = atomikosTransactionManager();
      return new JtaTransactionManager(userTransaction, transactionManager);
    }
}

4、MyBatis设置

import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
@MapperScan(basePackages = "com.example.distributetransaction.dao", sqlSessionFactoryRef = "sqlSessionFactoryEmployee")
public class EmployeeMybatisConfig {

    @SneakyThrows
    @Bean
    public SqlSessionFactory sqlSessionFactoryEmployee(@Qualifier("employeeDataSource") DataSource dataSource) throws Exception {
      SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
      factoryBean.setDataSource(dataSource);
      return factoryBean.getObject();
    }
}

import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
@MapperScan(basePackages = "com.example.distributetransaction.dao1", sqlSessionFactoryRef = "sqlSessionFactorySalary")
public class SalaryMybatisConfig {

    @SneakyThrows
    @Bean
    public SqlSessionFactory sqlSessionFactorySalary(@Qualifier("salaryDataSource") DataSource dataSource) throws Exception {
      SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
      factoryBean.setDataSource(dataSource);
      return factoryBean.getObject();
    }
}

5、验证

@Transactional(rollbackFor = Exception.class)
public String join(EmployeeEntity employeeEntity) {
    //第一步,插入员工基础信息
    employeeDao.insertEmployee(employeeEntity);
    //第二步,插入员工薪资
    employeeSalaryDao.insertEmployeeSalary(employeeEntity.getId(), employeeEntity.getSalary());

    int i = 1 / 0;
    return "员工入职成功";
}
三、Atomikos源码分析

1、@Transactional入口:TransactionInterceptor创建事务流程



[*](1)Spring事务入口:@Transactional
[*](2)TransactionInterceptor#invoke:Spring事务的署理拦截方法
[*](3)TransactionAspectSupport#determineTransactionManager:确定事务管理器=>我们创建的JtaTransactionManager
[*](4)TransactionAspectSupport#createTransactionIfNecessary:创建事务
[*](5)AbstractPlatformTransactionManager#getTransaction:获取事务
[*](6)JtaTransactionManager#doGetTransaction:获取事务,拿到JtaTransactionObject,里面封装了UserTransactionImp
[*](7)JtaTransactionManager获取我们设置的UserTransactionImp
https://i-blog.csdnimg.cn/blog_migrate/0d2aee97fa3266533ed6b7b22bcd1ef0.png
https://i-blog.csdnimg.cn/blog_migrate/8a414b53df7d7e46028f4e37468e54eb.png
https://i-blog.csdnimg.cn/blog_migrate/b1652e5b4243e8413745eb7fbfdace05.png
JtaTransactionManager#doGetTransaction:获取事务
https://i-blog.csdnimg.cn/blog_migrate/dc35679f4155fcda4d196c7c2937fa50.png
https://i-blog.csdnimg.cn/blog_migrate/7f4af8c56b4545bc01b60f3175e5ca2d.png
2、启动事务



[*](1)从AbstractPlatformTransactionManager#handleExistingTransaction调用AbstractPlatformTransactionManager#startTransaction开启事务
[*](2)调用JtaTransactionManager#doBegin开启事务
[*](3)调用JtaTransactionManager#doJtaBegin开启事务
[*](4)调用UserTransactionImp#begin开启事务
[*](5)最终调用的是UserTransactionManager#begin开启事务
[*](6)调用TransactionManagerImp#begin()开启事务
[*](7)调用CompositeTransactionManagerImp#createCompositeTransaction创建分布式事务
https://i-blog.csdnimg.cn/blog_migrate/597ffdfa13a464ce99dc9ae75d835eab.png
https://i-blog.csdnimg.cn/blog_migrate/6d65602316d228c380366d06bba822f2.png
https://i-blog.csdnimg.cn/blog_migrate/3ccdc30060a6d4bdd26ff77df675b045.png
https://i-blog.csdnimg.cn/blog_migrate/3d52b6128fa7ed5d1d50b564f7683006.png
3、小总结:启动全局事务流程图

https://i-blog.csdnimg.cn/blog_migrate/7e8edc3c96b2eea33123c5daa1873298.jpeg
4、分支事务,业务流程实行过程

https://i-blog.csdnimg.cn/blog_migrate/e55b3e6b63ac964881b989aeecb6cb61.jpeg
5、事务提交与回滚

https://i-blog.csdnimg.cn/blog_migrate/9a6d5f3b0ffbd742584cd5a752b53560.png
https://i-blog.csdnimg.cn/blog_migrate/0c8dab67a16579044f9445e39b2407c4.png



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Atomikos详解:数据库XA规范与Atomikos使用与源码分析