SpringBoot 整合多数据源的事务问题

打印 上一主题 下一主题

主题 863|帖子 863|积分 2589

代码

先贴代码:
核心就是:Spring给我们提供的一个类 AbstractRoutingDataSource,然后我们再写一个切面来切换数据源,肯定要有一个地方存储key还要保证上下文都可用,所以我们使用 ThreadLocal 来存储数据源的key
pom.xml
  1.    <dependency>
  2.        <groupId>org.springframework.boot</groupId>
  3.        <artifactId>spring-boot-starter-aop</artifactId>
  4.    </dependency>
  5.    <dependency>
  6.        <groupId>org.springframework.boot</groupId>
  7.        <artifactId>spring-boot-starter-web</artifactId>
  8.    </dependency>
  9.    
  10.    <dependency>
  11.        <groupId>com.alibaba</groupId>
  12.        <artifactId>druid-spring-boot-starter</artifactId>
  13.        <version>1.2.6</version>
  14.    </dependency>
  15.    <dependency>
  16.        <groupId>org.projectlombok</groupId>
  17.        <artifactId>lombok</artifactId>
  18.    </dependency>
复制代码
注解:
  1. @Target({ElementType.TYPE, ElementType.METHOD})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @Inherited
  5. public @interface DS {
  6.     String value() default DataSourceCons.DS1;
  7. }
复制代码
常量类:
  1. public interface DataSourceCons {
  2.     String DS1 ="ds1";
  3.    
  4.     String DS2 ="ds2";
  5. }
复制代码
上下文存储对象:
  1. @Slf4j
  2. public class DynamicDataSourceContextHolder {
  3.     private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
  4.     public static void setContextHolder(String dsName) {
  5.         log.info("切换到 ===> {} 数据源", dsName);
  6.         CONTEXT_HOLDER.set(dsName);
  7.     }
  8.     public static String get() {
  9.         return CONTEXT_HOLDER.get();
  10.     }
  11.     public static void clear() {
  12.         CONTEXT_HOLDER.remove();
  13.     }
  14. }
复制代码
切面:后续这个Order注解有大用
  1. @Aspect
  2. @Component
  3. // @Order(-1)
  4. public class DynamicDataSourceAspect {
  5.     @Pointcut("@annotation(com.lizhi.dds.anno.DS)")
  6.     public void point1() {
  7.     }
  8.     @Around("point1()")
  9.     public Object switchDS(ProceedingJoinPoint pjp) throws Throwable {
  10.         try {
  11.             DS ds = getDataSource(pjp);
  12.             if (ds != null){
  13.                 DynamicDataSourceContextHolder.setContextHolder(ds.value());
  14.             }
  15.             return pjp.proceed();
  16.         } finally {
  17.             DynamicDataSourceContextHolder.clear();
  18.         }
  19.     }
  20.     private DS getDataSource(ProceedingJoinPoint pjp) {
  21.         MethodSignature signature = (MethodSignature) pjp.getSignature();
  22.         DS methodAnno = signature.getMethod().getAnnotation(DS.class);
  23.         if (methodAnno != null) {
  24.             return methodAnno;
  25.         } else {
  26.             return pjp.getTarget().getClass().getAnnotation(DS.class);
  27.         }
  28.     }
  29. }
复制代码
存储数据源的类:我们这里就写一个map来存储数据源,大家也可以加其它的属性
  1. @Data
  2. @Configuration
  3. @ConfigurationProperties(prefix = "spring.datasource.druid")
  4. public class DruidProperties {
  5.     private Map<String, Map<String, String>> ds;
  6. }
复制代码
核心类:我们写一个类来继承这个核心类来自定义我们自己的东西
  1. @Component
  2. public class DynamicDataSource extends AbstractRoutingDataSource {
  3.     @Autowired
  4.     private DruidProperties druidProperties;
  5.     /**
  6.      * 初始化数据源
  7.      *
  8.      * @throws Exception
  9.      */
  10.     @PostConstruct
  11.     public void init() throws Exception {
  12.         Map<String, Map<String, String>> ds = druidProperties.getDs();
  13.         Map<Object, Object> dataSources = new HashMap<>();
  14.         for (Map.Entry<String, Map<String, String>> entry : ds.entrySet()) {
  15.             DataSource dataSource = DruidDataSourceFactory.createDataSource(entry.getValue());
  16.             dataSources.put(entry.getKey(), dataSource);
  17.         }
  18.         setTargetDataSources(dataSources);
  19.         setDefaultTargetDataSource(dataSources.get(DataSourceCons.DS1));
  20.         afterPropertiesSet();
  21.     }
  22.        
  23.     /**
  24.      * 拿数据源的时候会调用此方法来找key
  25.      *
  26.      * @return
  27.      */
  28.     @Override
  29.     protected Object determineCurrentLookupKey() {
  30.         return DynamicDataSourceContextHolder.get();
  31.     }
  32. }
复制代码
直接来看一下这个类的源码:就知道动态数据源是怎么来的了
  1. public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
  2.         // 我们所有的数据源都会存储在这里面
  3.         @Nullable
  4.         private Map<Object, Object> targetDataSources;
  5.         // 默认用哪个数据源
  6.         @Nullable
  7.         private Object defaultTargetDataSource;
  8.         private boolean lenientFallback = true;
  9.         private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
  10.         // 解析过后的所有数据源
  11.         @Nullable
  12.         private Map<Object, DataSource> resolvedDataSources;
  13.         // 解析过后的默认数据源
  14.         @Nullable
  15.         private DataSource resolvedDefaultDataSource;
  16.         /**
  17.          * Specify the map of target DataSources, with the lookup key as key.
  18.          * The mapped value can either be a corresponding {@link javax.sql.DataSource}
  19.          * instance or a data source name String (to be resolved via a
  20.          * {@link #setDataSourceLookup DataSourceLookup}).
  21.          * <p>The key can be of arbitrary type; this class implements the
  22.          * generic lookup process only. The concrete key representation will
  23.          * be handled by {@link #resolveSpecifiedLookupKey(Object)} and
  24.          * {@link #determineCurrentLookupKey()}.
  25.          */
  26.         public void setTargetDataSources(Map<Object, Object> targetDataSources) {
  27.                 this.targetDataSources = targetDataSources;
  28.         }
  29.         /**
  30.          * Specify the default target DataSource, if any.
  31.          * <p>The mapped value can either be a corresponding {@link javax.sql.DataSource}
  32.          * instance or a data source name String (to be resolved via a
  33.          * {@link #setDataSourceLookup DataSourceLookup}).
  34.          * <p>This DataSource will be used as target if none of the keyed
  35.          * {@link #setTargetDataSources targetDataSources} match the
  36.          * {@link #determineCurrentLookupKey()} current lookup key.
  37.          */
  38.         public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
  39.                 this.defaultTargetDataSource = defaultTargetDataSource;
  40.         }
  41.         @Override
  42.         public void afterPropertiesSet() {
  43.                 if (this.targetDataSources == null) {
  44.                         throw new IllegalArgumentException("Property 'targetDataSources' is required");
  45.                 }
  46.                 this.resolvedDataSources = CollectionUtils.newHashMap(this.targetDataSources.size());
  47.                 this.targetDataSources.forEach((key, value) -> {
  48.                         Object lookupKey = resolveSpecifiedLookupKey(key);
  49.                         DataSource dataSource = resolveSpecifiedDataSource(value);
  50.                         this.resolvedDataSources.put(lookupKey, dataSource);
  51.                 });
  52.                 if (this.defaultTargetDataSource != null) {
  53.                         this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
  54.                 }
  55.         }
  56.         /**
  57.          * Resolve the given lookup key object, as specified in the
  58.          * {@link #setTargetDataSources targetDataSources} map, into
  59.          * the actual lookup key to be used for matching with the
  60.          * {@link #determineCurrentLookupKey() current lookup key}.
  61.          * <p>The default implementation simply returns the given key as-is.
  62.          * @param lookupKey the lookup key object as specified by the user
  63.          * @return the lookup key as needed for matching
  64.          */
  65.         protected Object resolveSpecifiedLookupKey(Object lookupKey) {
  66.                 return lookupKey;
  67.         }
  68.         /**
  69.          * Resolve the specified data source object into a DataSource instance.
  70.          * <p>The default implementation handles DataSource instances and data source
  71.          * names (to be resolved via a {@link #setDataSourceLookup DataSourceLookup}).
  72.          * @param dataSource the data source value object as specified in the
  73.          * {@link #setTargetDataSources targetDataSources} map
  74.          * @return the resolved DataSource (never {@code null})
  75.          * @throws IllegalArgumentException in case of an unsupported value type
  76.          */
  77.         protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
  78.                 if (dataSource instanceof DataSource) {
  79.                         return (DataSource) dataSource;
  80.                 }
  81.                 else if (dataSource instanceof String) {
  82.                         return this.dataSourceLookup.getDataSource((String) dataSource);
  83.                 }
  84.                 else {
  85.                         throw new IllegalArgumentException(
  86.                                         "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
  87.                 }
  88.         }
  89.         /**
  90.          * Return the resolved default target DataSource, if any.
  91.          * @return the default DataSource, or {@code null} if none or not resolved yet
  92.          * @since 5.2.9
  93.          * @see #setDefaultTargetDataSource
  94.          */
  95.         @Nullable
  96.         public DataSource getResolvedDefaultDataSource() {
  97.                 return this.resolvedDefaultDataSource;
  98.         }
  99.         /**
  100.          * 决定要使用哪个数据源的方法(核心)
  101.          */
  102.         protected DataSource determineTargetDataSource() {
  103.                 Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
  104.                 // 先拿到数据源的key
  105.                 Object lookupKey = determineCurrentLookupKey();
  106.                 // 从我们初始化好的map里根据key拿到一个数据源
  107.                 DataSource dataSource = this.resolvedDataSources.get(lookupKey);
  108.                 // 如果没拿到就用默认的数据源
  109.                 if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
  110.                         dataSource = this.resolvedDefaultDataSource;
  111.                 }
  112.                 if (dataSource == null) {
  113.                         throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
  114.                 }
  115.                 return dataSource;
  116.         }
  117.         /**
  118.          * 找一下数据源的key(我们重写了此方法,所以拿到的就是我们上下文里存储的key)
  119.          */
  120.         @Nullable
  121.         protected abstract Object determineCurrentLookupKey();
  122. }
复制代码
application.yml:
  1. server:
  2.   port: 9999
  3. spring:
  4.   datasource:
  5.     type: com.alibaba.druid.pool.DruidDataSource
  6.     druid:
  7.       ds:
  8.         ds1:
  9.           url: jdbc:mysql://localhost:3306/ry-vue?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
  10.           username: root
  11.           password: root
  12.           driverClassName: com.mysql.cj.jdbc.Driver
  13.         ds2:
  14.           url: jdbc:mysql://localhost:3306/atguigudb?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
  15.           username: root
  16.           password: root
  17.           driverClassName: com.mysql.cj.jdbc.Driver
复制代码
在org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource#determineTargetDataSource 方法上打一个断点,然后发一个请求过去就能看到了,肯定一看就懂。


然后我们来说遇到的问题。
发现问题

问题1:两个库用同一个事务的问题

当我们使用如下代码时就会出现事务问题:
代码:
  1.     @Transactional
  2.     @Override
  3.     public List<SysUser> listAll() {
  4.         System.out.println("employeeService.listAll() = " + employeeService.listAll());
  5.         return sysUserMapper.selectList(null);
  6.     }
  7.     @DS(DataSourceCons.DS2)
  8.     @Override
  9.     public List<Employees> listAll() {
  10.         return employeeMapper.selectList(null);
  11.     }
复制代码
问题截图:

我们发现的确出现了问题,发现第二张表的数据源使用的还是上一个的数据源,那我们就会想,是不是数据源切换没成功啊?别想了,我们看一下控制台就可以发现的确有打印(蓝色框里),说明走切面了。
那是哪的问题呢。
我们又会想到两个库使用同一个事务肯定会有问题。那我们就直接用Spring的事务的传播特性来解决不就好了,于是我们就直接上手改传播特性为 REQUIRES_NEW(默认的传播特性REQUIRED 也就是使用同一个事务,那我们就直接在第二个方法上加上事务注解并设置传播特性REQUIRES_NEW 即可,就是创建一个新事务,使用不同的事务)。我们改完之后发现还是没什么卵用。那我们就会疑惑了那是哪的问题呢?
解决方案:事务传播特性设置为 REQUIRES_NEW
问题2:优先级问题

这个问题的答案我就直接说了:事务的原理是代理,我们切换数据源的切面的原理也是代理,它俩的执行的前后顺序是有问题的。我们需要把切换数据源的切面让它在事务前执行即可。
解决方案:也就是需要加个Order注解来提升优先级。
源码分析

第一次Debug

先在第二个事务的方法设置传播特性为 REQUIRES_NEW,然后来分析。最后会分析 REQUIRED 为啥不行
我们直接在事务的方法上打一个断点:

那我们就会想了,都不知道从哪看,那该怎么办,没关系:不知道从哪看很简单,我们直接看调用栈来找。

我们通过调用栈可以发现:

  • 红框里是我们自己的方法,所以不用管
  • 紫框里是代理,所以也不用管
  • 蓝框里我们一看有 Transaction 关键字,那不就是事务相关的吗,好家伙,这不就找到了
那我们直接就在上面打个断点org.springframework.transaction.interceptor.TransactionInterceptor#invoke
发现它调用了 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction 方法
  1.         protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
  2.                         final InvocationCallback invocation) throws Throwable {
  3.                 // If the transaction attribute is null, the method is non-transactional.
  4.                 // 如果transaction属性为null,则该方法为非事务性方法。
  5.                 TransactionAttributeSource tas = getTransactionAttributeSource();
  6.                 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  7.                 if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
  8.                         // 如果需要的话就创建一个事务(**核心**)
  9.                         TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
  10.                         Object retVal;
  11.                         try {
  12.                                 // 执行目标方法
  13.                                 retVal = invocation.proceedWithInvocation();
  14.                         }
  15.                         catch (Throwable ex) {
  16.                                 // target invocation exception
  17.                                 // 出现异常就处理异常
  18.                                 completeTransactionAfterThrowing(txInfo, ex);
  19.                                 throw ex;
  20.                         }
  21.                         finally {
  22.                                 cleanupTransactionInfo(txInfo);
  23.                         }
  24.                         if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
  25.                                 // Set rollback-only in case of Vavr failure matching our rollback rules...
  26.                                 TransactionStatus status = txInfo.getTransactionStatus();
  27.                                 if (status != null && txAttr != null) {
  28.                                         retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
  29.                                 }
  30.                         }
  31.                        
  32.                         // 提交事务
  33.                         commitTransactionAfterReturning(txInfo);
  34.                         return retVal;
  35.                 }
  36.         }
复制代码
前面的都不需要看,直接看 createTransactionIfNecessary() 方法:TransactionAspectSupport类
  1.         protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
  2.                         @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
  3.                 TransactionStatus status = null;
  4.                 if (txAttr != null) {
  5.                         if (tm != null) {
  6.                                 // 获取一个事务(核心)
  7.                                 status = tm.getTransaction(txAttr);
  8.                         }
  9.                         else {
  10.                                 if (logger.isDebugEnabled()) {
  11.                                         logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
  12.                                                         "] because no transaction manager has been configured");
  13.                                 }
  14.                         }
  15.                 }
  16.                 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  17.         }
复制代码
来到真正的核心 getTransaction() 方法:AbstractPlatformTransactionManager类
  1.         public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
  2.                         throws TransactionException {
  3.                 // 事务的定义信息
  4.                 TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
  5.                 // 获取一个事务
  6.                 Object transaction = doGetTransaction();
  7.                 boolean debugEnabled = logger.isDebugEnabled();
  8.                 // 已经有事务的处理
  9.                 if (isExistingTransaction(transaction)) {
  10.                         // Existing transaction found -> check propagation behavior to find out how to behave.
  11.                         return handleExistingTransaction(def, transaction, debugEnabled);
  12.                 }
  13.                 // 没有事务的处理
  14.                 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
  15.                         throw new IllegalTransactionStateException(
  16.                                         "No existing transaction found for transaction marked with propagation 'mandatory'");
  17.                 }
  18.                 else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
  19.                                 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
  20.                                 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  21.                         SuspendedResourcesHolder suspendedResources = suspend(null);
  22.                         try {
  23.                                 // 开启一个事务
  24.                                 return startTransaction(def, transaction, debugEnabled, suspendedResources);
  25.                         }
  26.                 }
  27.         }
复制代码
三个方法:doGetTransaction() 方法、handleExistingTransaction() 方法、startTransaction() 方法。我们一个一个来看。
第一个方法:
org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction
  1.         protected Object doGetTransaction() {
  2.                 DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  3.                 txObject.setSavepointAllowed(isNestedTransactionAllowed());
  4.                 ConnectionHolder conHolder =
  5.                                 (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
  6.                 txObject.setConnectionHolder(conHolder, false);
  7.                 return txObject;
  8.         }
复制代码

这是第一次这三个的值。
核心呢其实就是从缓存里获取了ConnectionHolder对象,连接缓存
来看一下这个方法:
org.springframework.transaction.support.TransactionSynchronizationManager#getResource
  1.         private static final ThreadLocal<Map<Object, Object>> resources =
  2.                         new NamedThreadLocal<>("Transactional resources");
  3.         public static Object getResource(Object key) {
  4.                 Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
  5.                 return doGetResource(actualKey);
  6.         }
  7.         /**
  8.          * Actually check the value of the resource that is bound for the given key.
  9.          */
  10.         @Nullable
  11.         private static Object doGetResource(Object actualKey) {
  12.                 Map<Object, Object> map = resources.get();
  13.                 if (map == null) {
  14.                         return null;
  15.                 }
  16.                 Object value = map.get(actualKey);
  17.                 // Transparently remove ResourceHolder that was marked as void...
  18.                 if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
  19.                         map.remove(actualKey);
  20.                         // Remove entire ThreadLocal if empty...
  21.                         if (map.isEmpty()) {
  22.                                 resources.remove();
  23.                         }
  24.                         value = null;
  25.                 }
  26.                 return value;
  27.         }
复制代码
我们不是继承了AbstractRoutingDataSource这个类吗,这个key呢就是我们的这个类。

第一次肯定是拿不到连接缓存的,所以一路返回。又到了 getTransaction() 这个大方法
第一次肯定也是不存在事务的,所以也不会走我们的 第二个核心方法 handleExistingTransaction()
所以就来到了我们的第三个核心方法
org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction
  1.         private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
  2.                         boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
  3.                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  4.                 DefaultTransactionStatus status = newTransactionStatus(
  5.                                 definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  6.                 // 开始
  7.                 doBegin(transaction, definition);
  8.                 prepareSynchronization(status, definition);
  9.                 return status;
  10.         }
复制代码
直接看 doBegin() 方法
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
  1.         protected void doBegin(Object transaction, TransactionDefinition definition) {
  2.                 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  3.                 Connection con = null;
  4.                 try {
  5.                         // 如果事务没有连接缓存,那就给它一个
  6.                         if (!txObject.hasConnectionHolder() ||
  7.                                         txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
  8.                                 // 根据当前数据源来获取一个连接 这个也是导致我们的问题所在处一
  9.                                 Connection newCon = obtainDataSource().getConnection();
  10.                                 if (logger.isDebugEnabled()) {
  11.                                         logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
  12.                                 }
  13.                                 // 给当前事务设置一个连接缓存
  14.                                 txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
  15.                         }
  16.                         // 设置一堆事务的特性:隔离级别啊、自动提交啊什么的
  17.                         txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
  18.                         con = txObject.getConnectionHolder().getConnection();
  19.                         Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
  20.                         txObject.setPreviousIsolationLevel(previousIsolationLevel);
  21.                         txObject.setReadOnly(definition.isReadOnly());
  22.                         // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
  23.                         // so we don't want to do it unnecessarily (for example if we've explicitly
  24.                         // configured the connection pool to set it already).
  25.                         if (con.getAutoCommit()) {
  26.                                 txObject.setMustRestoreAutoCommit(true);
  27.                                 if (logger.isDebugEnabled()) {
  28.                                         logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
  29.                                 }
  30.                                 con.setAutoCommit(false);
  31.                         }
  32.                         prepareTransactionalConnection(con, definition);
  33.                         txObject.getConnectionHolder().setTransactionActive(true);
  34.                         int timeout = determineTimeout(definition);
  35.                         if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
  36.                                 txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
  37.                         }
  38.                         if (txObject.isNewConnectionHolder()) {
  39.                                 // 给当前线程绑定一下连接缓存,后续再来方便获取
  40.                                 // 也就是往刚才的那个map里放一下
  41.                                 // key还是我们的那个数据源的类,value就是连接缓存对象
  42.                                 // 这个也是导致我们的问题所在处二
  43.                                 TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
  44.                         }
  45.                 }
  46.         }
复制代码
总结:就是根据我们的数据源来获取一个连接,并缓存起来,以供后续使用。并会设置一些事务的相关信息。
我们当前获取到的连接:

绑定的缓存:key还是我们的那个类,value就是我们根据当前数据源获取到的连接,当前数据源是第一个数据源。我们
接下来看一下第二个数据源的连接是什么。

我们来看一下 obtainDataSource().getConnection() 这个方法是怎么做的:AbstractRoutingDataSource类
  1.         @Override
  2.         public Connection getConnection() throws SQLException {
  3.                 return determineTargetDataSource().getConnection();
  4.         }
  5.        
  6.         protected DataSource determineTargetDataSource() {
  7.                 Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
  8.                 Object lookupKey = determineCurrentLookupKey();
  9.                 DataSource dataSource = this.resolvedDataSources.get(lookupKey);
  10.                 if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
  11.                         dataSource = this.resolvedDefaultDataSource;
  12.                 }
  13.                 if (dataSource == null) {
  14.                         throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
  15.                 }
  16.                 return dataSource;
  17.         }
  18.        
  19.         // 我们写的实现类
  20.     @Override
  21.     protected Object determineCurrentLookupKey() {
  22.         return DynamicDataSourceContextHolder.get();
  23.     }
复制代码
是不是很熟悉?这不就是Spring给我们提供的来做动态数据源那个类的源码吗!前面已经分析过了

由于我们上下文存储的key是null,所以就给了一个默认数据源。所以获取到的是第一个数据源的连接。我们看一下第二个方法是否还是这个数据源,让我们进入第二次 Debug吧。
第二次Debug

前面都是一样的套路,但是走到这里就会发生变化了。还记得我们的三个核心方法吗?第一次 Debug的时候并没有走我们的第二个核心方法,这一次就要走了
进来的条件:判断当前事务对象是否有连接缓存并且当前连接缓存的事务是活跃的
org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction:
  1.         private TransactionStatus handleExistingTransaction(
  2.                         TransactionDefinition definition, Object transaction, boolean debugEnabled)
  3.                         throws TransactionException {
  4.                 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
  5.                         throw new IllegalTransactionStateException(
  6.                                         "Existing transaction found for transaction marked with propagation 'never'");
  7.                 }
  8.                 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
  9.                         if (debugEnabled) {
  10.                                 logger.debug("Suspending current transaction");
  11.                         }
  12.                         Object suspendedResources = suspend(transaction);
  13.                         boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  14.                         return prepareTransactionStatus(
  15.                                         definition, null, false, newSynchronization, debugEnabled, suspendedResources);
  16.                 }
  17.                 // 我们设置的传播特性会走这里
  18.                 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
  19.                         if (debugEnabled) {
  20.                                 logger.debug("Suspending current transaction, creating new transaction with name [" +
  21.                                                 definition.getName() + "]");
  22.                         }
  23.                         // 清除一下上一次事务的信息
  24.                         SuspendedResourcesHolder suspendedResources = suspend(transaction);
  25.                         try {
  26.                                 // 开启一个新的事务(核心)
  27.                                 return startTransaction(definition, transaction, debugEnabled, suspendedResources);
  28.                         }
  29.                         catch (RuntimeException | Error beginEx) {
  30.                                 resumeAfterBeginException(transaction, suspendedResources, beginEx);
  31.                                 throw beginEx;
  32.                         }
  33.                 }
  34.                 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  35.                         if (!isNestedTransactionAllowed()) {
  36.                                 throw new NestedTransactionNotSupportedException(
  37.                                                 "Transaction manager does not allow nested transactions by default - " +
  38.                                                 "specify 'nestedTransactionAllowed' property with value 'true'");
  39.                         }
  40.                         if (debugEnabled) {
  41.                                 logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
  42.                         }
  43.                         if (useSavepointForNestedTransaction()) {
  44.                                 // Create savepoint within existing Spring-managed transaction,
  45.                                 // through the SavepointManager API implemented by TransactionStatus.
  46.                                 // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
  47.                                 DefaultTransactionStatus status =
  48.                                                 prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
  49.                                 status.createAndHoldSavepoint();
  50.                                 return status;
  51.                         }
  52.                         else {
  53.                                 // Nested transaction through nested begin and commit/rollback calls.
  54.                                 // Usually only for JTA: Spring synchronization might get activated here
  55.                                 // in case of a pre-existing JTA transaction.
  56.                                 return startTransaction(definition, transaction, debugEnabled, null);
  57.                         }
  58.                 }
  59.                 // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
  60.                 if (debugEnabled) {
  61.                         logger.debug("Participating in existing transaction");
  62.                 }
  63.                 if (isValidateExistingTransaction()) {
  64.                         if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
  65.                                 Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
  66.                                 if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
  67.                                         Constants isoConstants = DefaultTransactionDefinition.constants;
  68.                                         throw new IllegalTransactionStateException("Participating transaction with definition [" +
  69.                                                         definition + "] specifies isolation level which is incompatible with existing transaction: " +
  70.                                                         (currentIsolationLevel != null ?
  71.                                                                         isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
  72.                                                                         "(unknown)"));
  73.                                 }
  74.                         }
  75.                         if (!definition.isReadOnly()) {
  76.                                 if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
  77.                                         throw new IllegalTransactionStateException("Participating transaction with definition [" +
  78.                                                         definition + "] is not marked as read-only but existing transaction is");
  79.                                 }
  80.                         }
  81.                 }
  82.                 // 默认的传播特性走这里
  83.                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  84.                 return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
  85.         }
复制代码
总结:就是根据不同的传播特性来做不同的事情。
先来看一下一个重要的事情,我们发现进来这个方法之后 Transaction 对象里还是有之前的连接缓存的,这是万万不行的,所以需要清掉。

当我们经过这个方法 suspend() 后,我们神奇的发现里面的信息没有了:

直接来看一下这个方法:
  1.         protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
  2.                 if (TransactionSynchronizationManager.isSynchronizationActive()) {
  3.                         List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
  4.                         try {
  5.                                 Object suspendedResources = null;
  6.                                 if (transaction != null) {
  7.                                         // 清除连接信息
  8.                                         suspendedResources = doSuspend(transaction);
  9.                                 }
  10.                                 String name = TransactionSynchronizationManager.getCurrentTransactionName();
  11.                                 TransactionSynchronizationManager.setCurrentTransactionName(null);
  12.                                 boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
  13.                                 TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
  14.                                 Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
  15.                                 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
  16.                                 boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
  17.                                 TransactionSynchronizationManager.setActualTransactionActive(false);
  18.                                 return new SuspendedResourcesHolder(
  19.                                                 suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
  20.                         }
  21.                         catch (RuntimeException | Error ex) {
  22.                                 // doSuspend failed - original transaction is still active...
  23.                                 doResumeSynchronization(suspendedSynchronizations);
  24.                                 throw ex;
  25.                         }
  26.                 }
  27.                 else if (transaction != null) {
  28.                         // Transaction active but no synchronization active.
  29.                         Object suspendedResources = doSuspend(transaction);
  30.                         return new SuspendedResourcesHolder(suspendedResources);
  31.                 }
  32.                 else {
  33.                         // Neither transaction nor synchronization active.
  34.                         return null;
  35.                 }
  36.         }
复制代码
它里面又调用了这个方法来做:
org.springframework.jdbc.datasource.DataSourceTransactionManager#doSuspend
  1.         @Override
  2.         protected Object doSuspend(Object transaction) {
  3.                 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  4.                 // 设置为null
  5.                 txObject.setConnectionHolder(null);
  6.                 return TransactionSynchronizationManager.unbindResource(obtainDataSource());
  7.         }
复制代码
然后我们直接来看我们的传播特性做的事情。其实还是之前 startTransaction() 做的事情。就是开
启一个新的事务来做。我们可以看一下这一次的连接是谁的

我们发现这次的连接还是第一个数据源的连接,这就出大问题了。我们想要的是第二个数据源的连接。这是怎么回事呢?不要急,我们直接来看。

这是咋回事啊,我靠,切面没成功吗,key还是null,所以获取到的数据源就是默认的数据源了。来看一下控制台是否打印切换数据源的信息:确实没有信息

当我们全部放行之后,再次查看控制台就会发现,我们的数据源切换成功了!

what?这是什么情况。这个时候我们就要运用大脑里的知识来想一想了,想到原理。来回 Debug 之后,我有了个想法:Spring事务是aop,我们的这个切换数据源的也是aop,那会不会是切面之间的执行顺序还有先后啊。
于是乎我们直接实践,直接在切换数据源的切面上加上了 Order 注解,来提高优先级。并在获取数据源key的地方和切换数据源的切面上这两个地方打了断点,我们会发现,的确是有优先级的。加了之后,我们神奇的发现成功了!Amazing!
解决

优先级:
再次一路 Debug 到doBegin() 方法来验证猜想:

打开控制台查看,也会发现有信息输出:

直接成功!
同一个事务问题:
我们把第二个方法的事务的传播特性还设置回原来的 REQUIRED 或者不加事务注解。
还是来到第二个核心方法:handleExistingTransaction()
还记得我之前在此处代码上加的注释吗?如果是默认的传播特性会走这里来处理
  1.         private TransactionStatus handleExistingTransaction(
  2.                         TransactionDefinition definition, Object transaction, boolean debugEnabled)
  3.                         throws TransactionException {
  4.                  ......
  5.                 // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
  6.                 // 处理 PROPAGATION_SUPPORTS 和 PROPAGATION_REQUIRED 的传播特性
  7.                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  8.                 return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
  9.         }
复制代码
prepareTransactionStatus():
  1.         protected final DefaultTransactionStatus prepareTransactionStatus(
  2.                         TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
  3.                         boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
  4.                 DefaultTransactionStatus status = newTransactionStatus(
  5.                                 definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
  6.                 prepareSynchronization(status, definition);
  7.                 return status;
  8.         }
复制代码
就是把当前的事务状态给返回了,所以后续拿到的连接还是上一个的
总结


  • 切面的优先级问题(Order注解)
  • 事务的传播特性(Propagation类)
文章到这里就结束了,应该还是有点长的。希望有点帮助哈。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

惊落一身雪

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表