ToB企服应用市场:ToB评测及商务社交产业平台

标题: 事务中无法切换数据源?DataSourceSwitchInvoker:轻松实现多数据源切换执 [打印本页]

作者: 丝    时间: 4 天前
标题: 事务中无法切换数据源?DataSourceSwitchInvoker:轻松实现多数据源切换执
配景:

在有标注为@Transactional的类或公共方法中(传播特性,如:NOT_SUPPORTED、SUPPORTS、REQUIRED【默认值】、REQUIRES_NEW)实行数据源切换大概不乐成(好比:主从数据源切换,多数据源切换等,均会发现切换不乐成,或“偶然又切换乐成”),导致本应该需要查主库却查了从库,本应该查B库却仍查了A库导致表不存在等各种查询问题。
原因是什么呢?

本质原因是:因为只要添加了@Transactional (传播特性,如:NOT_SUPPORTED、SUPPORTS、REQUIRED【默认值】、REQUIRES_NEW),在事务同步上下文类型为:SYNCHRONIZATION_ALWAYS时 ,那么会在事务切面中进行初始化事务同步上下文状态【prepareTransactionStatus】(详细可分析代码位置:org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction),此时org.springframework.transaction.support.TransactionSynchronizationManager#isSynchronizationActive 是true,若需要事务时(EQUIRED【默认值】、REQUIRES_NEW)则还会org.springframework.transaction.support.AbstractPlatformTransactionManager#doBegin获取connection并开启事务且构建ConnectionHolder注册生存于事务同步上下文中,当mybatis 的SqlSessionTemplate.SqlSessionInterceptor.invoke实行时,第一次会将获取的SqlSession通过SqlSessionUtils.registerSessionHolder注册生存于事务同步上下文中,后续只要是同一个SqlSession,那么间接的就是持有同一个SpringManagedTransaction,SpringManagedTransaction是优先从ConnectionHolder获取已有connection对象,若不存在才会创建新的connection对象,并构建ConnectionHolder注册生存于事务同步上下文中,后续只要是在同一个事务同步上下文中,那么都是复用相同的SqlSession、SpringManagedTransaction、ConnectionHolder,所以单纯的改DataSource(ThreadLocal的线程变量)没有效,因为此时ConnectionHolder中生存的是Connection,而不是DataSource
Spring声明式事务源代码分析流程图


为何偶然切换数据源乐成?

当为事务传播特性为NOT_SUPPORTED、SUPPORTS时,由于此时事务管理器并不会提前打开Conneciton并开启事务(即:也不会生存到ConnectionHolder)【从上图中就可以看出】,而是在实行一条SQL语句时,触发了MyBatis的第一次获取SqlSession,间接的实行了DataSourceUtils.doGetConnection(会生存到ConnectionHolder中),如果在方法中的实行第一条SQL语句进步行数据源切换,那么就可以生效,若在实行第一条SQL语句后再尝试切换,那么由于SqlSession已不是最新的(ConnectionHolder中已有Connection),则只会复用。
解决方案:

新增数据源切换实行器工具类:DataSourceSwitchInvoker,作用:在实行前会检查要切换的数据源与当前已持有的数据源(ConnectionHolder.Connection)是否一致,一致则直接实行回调方法(即:不存在切换数据源),不一致则挂起当前事务(挂事务与资源后,会清空事务同步上下文,就像从来没有实行过事务方法一样,默认状态),然后实行回调方法,最后恢复被挂起的事务与资源,并恢复回实行前的数据源设置。即:相当于在事务实行过程中,撕开一个口子(无任何状态),实行完成后,再恢复回事务的原状态,不影响后续的实行。
DataSourceSwitchInvoker.invokeOn 代码逻辑流程图:

(注:图片部份位置有屏蔽删减是因为我实现了多个版本,本次是简化实用版,无需复杂的设置,直接方法入参传入即可)

DataSourceSwitchInvoker 实现CODE:
  1. /**
  2. * @author: zuowenjun
  3. * @description:数据源切换后执行器,解决在多数据源项目中,无法在事务方法中进行数据源切换问题
  4. */
  5. @Component
  6. public class DataSourceSwitchInvoker {
  7.    
  8.     private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceSwitchInvoker.class);
  9.     private static final Map<String, String> DATA_SOURCE_NAME_WITH_URL_MAP = new HashMap<>();
  10.     private static final String SET_BEFORE = "BEFORE";
  11.     private static final String SET_AFTER = "AFTER";
  12.     @Value("${dataSourceSwitchInvoker.settings.datasourceJdbcUrlPattern:}")
  13.     private String datasourceJdbcUrlPattern;
  14.     /**
  15.      * 初始化必要条件:数据源配置集合(数据源名称与jdbcUrl对应关系)
  16.      */
  17.     @PostConstruct
  18.     public void initializeRequirement() {
  19.         if (StringUtils.isBlank(datasourceJdbcUrlPattern)) {
  20.             LOGGER.warn("datasourceJdbcUrlPattern is null");
  21.             return;
  22.         }
  23.         DATA_SOURCE_NAME_WITH_URL_MAP.clear();
  24.         Map<String, String> configMap = getPropertiesByPattern(datasourceJdbcUrlPattern, value -> ObjectUtils.defaultIfNull(value, "").toString().trim(), (k, v) -> StringUtils.isNotEmpty(v));
  25.         if (MapUtils.isEmpty(configMap)) {
  26.             LOGGER.error("DataSourceSwitchInvoker.initializeRequirement configMap is empty ,datasourceJdbcUrlPattern: {}", datasourceJdbcUrlPattern);
  27.             return;
  28.         }
  29.         DATA_SOURCE_NAME_WITH_URL_MAP.putAll(configMap);
  30.         LOGGER.info("DataSourceSwitchInvoker.initializeRequirement ok");
  31.     }
  32.     /**
  33.      * 在指定的数据源下执行回调方法
  34.      *
  35.      * @param getCurrentDsNameFunc
  36.      * @param setCurrentDsNameFunc
  37.      * @param invokeCallback
  38.      * @return
  39.      */
  40.     public static <T> T invokeOn(String newDataSourceName, Supplier<String> getCurrentDsNameFunc, Consumer<String> setCurrentDsNameFunc, BiFunction<String, String, Boolean> checkSameDsNameFunc, Supplier<T> invokeCallback) {
  41.         Assert.notNull(getCurrentDsNameFunc, "执行前获取数据源配置回调方法不能为空");
  42.         Assert.notNull(setCurrentDsNameFunc, "执行前要设置的数据源配置回调方法不能为空");
  43.         Assert.notNull(invokeCallback, "具体执行回调方法不能为空");
  44.         String invokeId = "DSI" + System.currentTimeMillis();
  45.         String oldDataSourceName = getCurrentDsNameFunc.get();
  46.         setCurrentDsNameFunc.accept(newDataSourceName);
  47.         LOGGER.info("DataSourceSwitchInvoker.invokeOn setCurrentDsName {} --> {} ,invokeId: {}", oldDataSourceName, newDataSourceName, invokeId);
  48.         Object currentTransaction = null;
  49.         Object suspendedResourcesHolder = null;
  50.         PlatformTransactionManagerDelegateInner platformTransactionManagerDelegate = null;
  51.         try {
  52.             String currentDbConnectionUrl = TransactionManagerUtils.getCurrentDbConnectionUrl(null);
  53.             if (StringUtils.isEmpty(currentDbConnectionUrl) || currentDbConnectionUrl.equalsIgnoreCase(DATA_SOURCE_NAME_WITH_URL_MAP.get(newDataSourceName))) {
  54.                 //若当前没有持有DB连接 或持有的DB连接与当前要设置的DB数据源相同,则表明无需额外处理,只需正常执行即可
  55.                 return invokeCallback.get();
  56.             } else if (StringUtils.isNotEmpty(currentDbConnectionUrl) && checkSameDsNameFunc != null) {
  57.                 String currentUsedDataSourceName = DATA_SOURCE_NAME_WITH_URL_MAP.entrySet().stream().filter(kv -> currentDbConnectionUrl.equalsIgnoreCase(kv.getValue())).map(Map.Entry::getKey).findFirst().orElse(null);
  58.                 if (Boolean.TRUE.equals(checkSameDsNameFunc.apply(currentUsedDataSourceName, newDataSourceName))) {
  59.                     //若当前事务连接对应的已实际使用的数据源与要设置的数据源一致,则表明无需额外处理,只需正常执行即可
  60.                     return invokeCallback.get();
  61.                 }
  62.             }
  63.             //若持有DB连接,则需要先挂起当前事务或资源
  64.             AbstractPlatformTransactionManager platformTransactionManager = SpringUtils.getBean(AbstractPlatformTransactionManager.class);
  65.             Assert.notNull(platformTransactionManager, "not found AbstractPlatformTransactionManager bean");
  66.             platformTransactionManagerDelegate = new PlatformTransactionManagerDelegateInner(platformTransactionManager);
  67.             currentTransaction = TransactionManagerUtils.getCurrentTransaction(platformTransactionManager);
  68.             if (!platformTransactionManagerDelegate.isExistingTransaction(currentTransaction)) {
  69.                 currentTransaction = null;
  70.             }
  71.             suspendedResourcesHolder = platformTransactionManagerDelegate.suspend(currentTransaction);
  72.             LOGGER.debug("DataSourceSwitchInvoker.invokeOn suspend result is {} ,invokeId: {}", suspendedResourcesHolder != null, invokeId);
  73.             return invokeCallback.get();
  74.         } finally {
  75.             String resumeSuspendedResources = null;
  76.             //前面若有挂起事务或资源,则需在执行完方法后需恢复到当前事务状态
  77.             if (currentTransaction != null || suspendedResourcesHolder != null) {
  78.                 platformTransactionManagerDelegate.resume(currentTransaction, suspendedResourcesHolder);
  79.                 resumeSuspendedResources = "resume suspendedResources ok";
  80.             }
  81.             setCurrentDsNameFunc.accept(oldDataSourceName);
  82.             LOGGER.info("DataSourceSwitchInvoker.invokeOn end {} , recover setCurrentDsName {} --> {} ,invokeId: {}", resumeSuspendedResources, newDataSourceName, oldDataSourceName, invokeId);
  83.         }
  84.     }
  85.     /**
  86.      * 在指定的数据源下执行回调方法
  87.      *
  88.      * @param setCurrentDsNameFunc
  89.      * @param invokeCallback
  90.      * @param <T>
  91.      * @return
  92.      */
  93.     public static <T> T invokeOn(Consumer<String> setCurrentDsNameFunc, Supplier<T> invokeCallback) {
  94.         return invokeOn(SET_BEFORE, () -> SET_AFTER, setCurrentDsNameFunc, null, invokeCallback);
  95.     }
  96.     private static <T> Map<String, T> getPropertiesByPattern(String configPath, Function<Object, T> convertValueFunc, BiFunction<String, T, Boolean> filterFunc) {
  97.         Assert.notNull(configPath, "param configPath not be null");
  98.         Assert.notNull(convertValueFunc, "param convertValueFunc not be null");
  99.         Map<String, T> resultMap = new HashMap<>();
  100.         if (!(SpringUtils.getApplicationContext().getEnvironment() instanceof ConfigurableEnvironment)) {
  101.             return resultMap;
  102.         }
  103.         ConfigurableEnvironment environment = (ConfigurableEnvironment) SpringUtils.getApplicationContext().getEnvironment();
  104.         AntPathMatcher antPathMatcher = new AntPathMatcher(".");
  105.         String configKey = "{configKey}";
  106.         // 遍历所有的属性源
  107.         for (PropertySource<?> propertySource : environment.getPropertySources()) {
  108.             if (propertySource instanceof EnumerablePropertySource) {
  109.                 EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>) propertySource;
  110.                 // 遍历当前属性源中的所有属性
  111.                 for (String propertyName : enumerablePropertySource.getPropertyNames()) {
  112.                     if (antPathMatcher.match(configPath, propertyName)) {
  113.                         String key = propertyName;
  114.                         if (configPath.contains(configKey)) {
  115.                             key = antPathMatcher.extractUriTemplateVariables(configPath, propertyName).getOrDefault(configKey.replaceAll("[{}]", ""), "<null>");
  116.                         }
  117.                         T value = convertValueFunc.apply(enumerablePropertySource.getProperty(propertyName));
  118.                         if (filterFunc == null || filterFunc.apply(key, value)) {
  119.                             resultMap.put(key, convertValueFunc.apply(value));
  120.                         }
  121.                     }
  122.                 }
  123.             }
  124.         }
  125.         return resultMap;
  126.     }
  127.     /**
  128.      * 通过内部类在不破坏封装性、访问性的前提下,提供当前类内部的protected方法的访问能力
  129.      */
  130.     private static class PlatformTransactionManagerDelegateInner extends PlatformTransactionManagerDelegate {
  131.         public PlatformTransactionManagerDelegateInner(AbstractPlatformTransactionManager transactionManager) {
  132.             super(transactionManager);
  133.         }
  134.         @Override
  135.         protected Object suspend(Object transaction) throws TransactionException {
  136.             return super.suspend(transaction);
  137.         }
  138.         @Override
  139.         protected void resume(Object transaction, Object resourcesHolderObj) {
  140.             super.resume(transaction, resourcesHolderObj);
  141.         }
  142.         @Override
  143.         protected boolean isExistingTransaction(Object transaction) {
  144.             return super.isExistingTransaction(transaction);
  145.         }
  146.     }
  147. }
复制代码
依赖CODE(注意包名路径需与AbstractPlatformTransactionManager、DataSourceTransactionManager一致):
  1. //author: zuowenjun
  2. //注意包名必需是如下,因为要访问protected方法
  3. package org.springframework.jdbc.datasource;
  4. public class PlatformTransactionManagerDelegate {
  5.     private final AbstractPlatformTransactionManager delegate;
  6.     public PlatformTransactionManagerDelegate(AbstractPlatformTransactionManager transactionManager) {
  7.         this.delegate = transactionManager;
  8.     }
  9.     protected Object suspend(Object transaction) throws TransactionException {
  10.         return delegate.suspend(transaction);
  11.     }
  12.     protected void resume(Object transaction, Object resourcesHolderObj) {
  13.         AbstractPlatformTransactionManager.SuspendedResourcesHolder resourcesHolder = (AbstractPlatformTransactionManager.SuspendedResourcesHolder) resourcesHolderObj;
  14.         delegate.resume(transaction, resourcesHolder);
  15.     }
  16.     protected boolean isExistingTransaction(Object transaction) {
  17.         return delegate.isExistingTransaction(transaction);
  18.     }
  19. }
  20. //author: zuowenjun
  21. //注意包名必需是如下,因为要访问protected方法
  22. package org.springframework.transaction.support;
  23. public class TransactionManagerUtils {
  24.    
  25.     public static String getCurrentDbConnectionUrl(String threadLocalDbNameIfNoSet) {
  26.         DataSource dataSource = SpringUtils.getBean(DataSource.class);
  27.         if (dataSource == null) {
  28.             return threadLocalDbNameIfNoSet;
  29.         }
  30.         ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
  31.         if (conHolder == null || !conHolder.hasConnection()) {
  32.             return threadLocalDbNameIfNoSet;
  33.         }
  34.         try {
  35.             return conHolder.getConnection().getMetaData().getURL();
  36.         } catch (Throwable e) {
  37.             LOGGER.warn("TransactionManagerUtils.getCurrentDbConnectionUrl error", e);
  38.         }
  39.         return threadLocalDbNameIfNoSet;
  40.     }
  41.     public static Object getCurrentTransaction(AbstractPlatformTransactionManager transactionManager) {
  42.         if (!(transactionManager instanceof DataSourceTransactionManager)) {
  43.             throw new RuntimeException("only support DataSourceTransactionManager doGetTransaction");
  44.         }
  45.         DataSourceTransactionManager dsTransactionManager = (DataSourceTransactionManager) transactionManager;
  46.         return dsTransactionManager.doGetTransaction();
  47.     }
  48.    
  49. }
复制代码
其中:SpringUtils工具类是一个简单的实现了Spring上下文织入的接口然后赋值给静态字段,终极实现可以直接使用applicationContext.getBean(type)
使用示例CODE:
  1. //假设这里是数据源的设置,tips:多数据源一般都是自定义实现了AbstractRoutingDataSource,然后使用ThreadLocal来保存设置当前要使用的数据源配置名称
  2. private ThreadLocal<String> dataSourceHolder = new ThreadLocal<>();
  3. @Transactional
  4. public doWithTx(){
  5.         //第一种方法:【推荐第一种】
  6.         //假设之前是read_db 数据源,现在需要切换成master_db
  7.         DataSourceSwitchInvoker.invokeOn("master_db", () -> dataSourceHolder.get(), (dsName) -> dataSourceHolder.set(dsName), null, () -> {
  8.             Object demo = null; //模拟 demoMapper.get(123L);
  9.             return demo;
  10.         });
  11.    
  12.         //第二种方法:(重载方法,一个设置数据源方法处理执行前、执行后的数据源设置)
  13.     //假设之前是read_db 数据源,现在需要切换成master_db
  14.         AtomicReference<String> dsName = new AtomicReference<>();
  15.         DataSourceSwitchInvoker.invokeOn(eventName -> {
  16.             if (SET_BEFORE.equals(eventName)) {
  17.                 //执行前,自行记录之前的数据源
  18.                 dsName.set(dataSourceHolder.get());
  19.                 //设置新数据源
  20.                 dataSourceHolder.set("master_db");
  21.             } else if (SET_AFTER.equals(eventName)) {
  22.                 //执行后,还原设置数据源
  23.                 dataSourceHolder.set(dsName.get());
  24.             }
  25.         }, () -> {
  26.             Object demo = null; //模拟 demoMapper.get(123L);
  27.             return demo;
  28.         });
  29. }
  30.    
复制代码
编码建议:

切换虽好用,但建议不要在切换的方法中进行写数据的操作,更得当仅用于临时需要查询其他数据源的数据时使用,以免破坏spring事务的完备性,因为invokeOn方法本身就是先挂起一个事务,然后开新连接实行新的操作DB的方法,最后还原恢复事务,若在其中又进行了其他的操作,大概存在未知风险,固然理论做什么都可以但非常不建议。
经多种测试,无论是普通方法 OR 在事务中的方法,均能正常实行,简直就是YYDS!原创不易,如有资助关注+点个赞吧v

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4