Mybatis-Plus集成Sharding-JDBC与Flyway实现多租户分库分表

打印 上一主题 下一主题

主题 937|帖子 937|积分 2815

背景

公司产品部收到了一些重要客户的需求,他们希望能够依赖独立的数据库存储来支持他们的业务数据。与此同时,仍有许多中小客户,可以继续使用公共库以满足其需求。技术实现方面,此前持久层框架使用的Mybatis-plus,部分业务场景使用到了Sharding-JDBC用于分表,另外,我们的数据库版本控制工具使用的是Flyway。
方案说明

这里将方案进行简要说明,配置统一通过Nacos管理(有需要的可以自行定义租户配置页面)。

  • 1.首先多数据源管理使用Mybatis-Plus官方推荐的dynamic-datasource-spring-boot-starter组件,需要注意的是构建动态多数据源时,我们要把Sharding-JDBC数据源也纳入管理。因为我们的库里面毕竟只有部分表用到了Sharding-JDBC,这样可以复用数据源。

  • 2.其次,租户与数据源之间在Nacos建立关系配置,确保根据租户ID能够路由到唯一的租户数据源。我们需要自定义Sharding分片策略和多数据源切换逻辑,根据http请求传入的租户ID,设置正确的数据源。

  • 3.动态数据源与Sharding数据源配置做为公共配置在Nacos维护,在业务服务启动时,读取公共配置初始化多数据源,并添加对公共多数据源配置的监听。当配置变更时,重新构造Sharding数据源,并并更新动态多数据源。另外数据库脚本通过自定义flyway配置执行。

技术实现

前提

需要在Nacos提前维护租户与数据源关系配置。
不使用Sharding-JDBC场景


  • 1.引入相关组件依赖。
  1.         <dependency>
  2.             <groupId>com.alibaba.nacos</groupId>
  3.             <artifactId>nacos-client</artifactId>
  4.             <version>2.1.0</version>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>org.flywaydb</groupId>
  8.             <artifactId>flyway-core</artifactId>
  9.             <version>7.15.0</version>
  10.         </dependency>
  11.         <dependency>
  12.             <groupId>com.baomidou</groupId>
  13.             <artifactId>mybatis-plus-boot-starter</artifactId>
  14.             <version>3.4.1</version>
  15.         </dependency>
  16.         <dependency>
  17.             <groupId>com.baomidou</groupId>
  18.             <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
  19.             <version>3.4.1</version>
  20.         </dependency>
  21.         <dependency>
  22.             <groupId>com.alibaba</groupId>
  23.             <artifactId>druid</artifactId>
  24.             <version>1.2.6</version>
  25.         </dependency>
复制代码

  • 2.关闭Flyway自动配置和配置多数据源。
  1. spring:
  2.   flyway:
  3.     #关闭flyway自动配置,自定义实现
  4.     enabled: false
  5.   datasource:
  6.     dynamic:
  7.       #默认数据源
  8.       primary: ds0
  9.       datasource:
  10.         ds0:
  11.           type: com.alibaba.druid.pool.DruidDataSource
  12.           driverClassName: org.postgresql.Driver
  13.           url: jdbc:postgresql://127.0.0.1:5432/ds0
  14.           username: ds0
  15.           password: ds0123
  16.         ds1:
  17.           type: com.alibaba.druid.pool.DruidDataSource
  18.           driverClassName: org.postgresql.Driver
  19.           url: jdbc:postgresql://127.0.0.1:5432/ds1
  20.           username: ds1
  21.           password: ds1123
复制代码

  • 3.自定义实现Flyway配置类,对应的flyway脚本目录结构见下图,主库和租户库SQL脚本独立维护。
  1. Java
  2. @Slf4j
  3. @Configuration
  4. @EnableTransactionManagement
  5. public class FlywayConfig {
  6.     @Value("${spring.application.name}")
  7.     private String appName;
  8.     @Autowired
  9.     private DataSource dataSource;
  10.     @Bean
  11.     public void migrate() {
  12.         log.info("flyway开始逐数据源执行脚本");
  13.         DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
  14.         Map<String, DataSource> dataSources = ds.getDataSources();
  15.         dataSources.forEach((k, v) -> {
  16.             if (!"sharding".equals(k)) {
  17.                                                     // Flyway相关参数建议通过配置管理,以下代码仅供参考
  18.                 Flyway flyway = Flyway.configure()
  19.                         .dataSource(v)
  20.                         .table("t_" + k + "_" + appName + "_version")
  21.                         .baselineOnMigrate(true)
  22.                         .outOfOrder(true)
  23.                         .baselineVersion("1.0.0")
  24.                         .baselineDescription(k + "初始化")
  25.                         .locations(CommonConstant.SQL_BASE_LOCATION + (CommonConstant.DEFAULT_DS_NAME.equals(k) ? CommonConstant.MASTER_DB : CommonConstant.TENANT_DB))
  26.                         .load();
  27.                 flyway.migrate();
  28.                 log.info("flyway在 {} 数据源执行脚本成功", k);
  29.             }
  30.         });
  31.     }
  32. }
复制代码


  • 4.自定义实现数据源切换Filter类。
  1. @Slf4j
  2. @Component
  3. @WebFilter(filterName = "dynamicDatasourceFilter", urlPatterns = {"/*"})
  4. public class DynamicDatasourceFilter implements Filter {
  5.     // 构建演示用租户与数据源关系配置
  6.                 private static Map<String, String> tenantDsMap = new HashMap<>();
  7.     static {
  8.         tenantDsMap.put("tenant123", "ds0");
  9.         tenantDsMap.put("tenant456", "ds0");
  10.                                 tenantDsMap.put("tenant789", "ds1");
  11.     }
  12.     @Override
  13.     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
  14.         HttpServletRequest httpRequest = (HttpServletRequest) request;
  15.         // 从请求头获取租户ID
  16.         String tenantId = httpRequest.getHeader(CommonConstant.TENANT_HEADER);
  17.         try {
  18.             // 设置数据源
  19.             if (tenantDsMap.get(tenantId) == null) {
  20.                 // 如果根据租户ID未找到租户数据源配置,默认走主库
  21.                 DynamicDataSourceContextHolder.push(CommonConstant.DEFAULT_DS_NAME);
  22.             } else {
  23.                 //注意,如果是分片表,那么需要在分片表Service类或方法上加@DS("sharding")注解,最终由sharding的库分片策略决定SQL在哪个库执行。而这里的设置将会被@DS注解配置覆盖
  24.                 DynamicDataSourceContextHolder.push(tenantDsMap.get(tenantId));
  25.             }
  26.             // 执行
  27.             chain.doFilter(request, response);
  28.         } catch (Exception e) {
  29.             log.error("切换数据源失败,tenantId={},请求接口uri={},异常原因:{}", tenantId, httpRequest.getRequestURI(), ExceptionUtils.getStackTrace(e));
  30.         } finally {
  31.             // 清空当前线程数据源
  32.             DynamicDataSourceContextHolder.poll();
  33.         }
  34.     }
复制代码

使用Sharding-JDBC

如果微服务还需要使用Sharding分片,那么还需要引入sharding-jdbc组件依赖,并配置sharding数据源和分片规则。如果是多个服务共用数据库,那么建议将Sharding数据源配置做为公共配置在Nacos管理,而Sharding分片规则则做为服务个性化配置单独维护(分片规则基本不需要动态变更),这样当有新租户需要申请开通独立租户库的时候,直接变更Sharding数据源公共配置,服务在监听到公共配置变更后,即可重新构建新的Sharding数据源实例和动态数据源更新,无需重启服务。

  • 1.引入sharding-jdbc组件依赖。
  1.         <dependency>
  2.             <groupId>org.apache.shardingsphere</groupId>
  3.             <artifactId>sharding-jdbc-core</artifactId>
  4.             <version>4.1.1</version>
  5.         </dependency>
复制代码

  • 2.配置Sharding数据源和分片规则。
  1. # sharding数据源配置
  2. dataSources:
  3.   ds0: !!com.alibaba.druid.pool.DruidDataSource
  4.     driverClassName: org.postgresql.Driver
  5.     url: jdbc:postgresql://127.0.0.1:5432/ds0
  6.     username: ds0
  7.     password: ds0123
  8.   ds1: !!com.alibaba.druid.pool.DruidDataSource
  9.     driverClassName: org.postgresql.Driver
  10.     url: jdbc:postgresql://127.0.0.1:5432/ds1
  11.     username: ds1
  12.     password: ds1123
  13.   ds2: !!com.alibaba.druid.pool.DruidDataSource
  14.     driverClassName: org.postgresql.Driver
  15.     url: jdbc:postgresql://127.0.0.1:5432/ds2
  16.     username: ds2
  17.     password: ds2123
  18. # sharding分片规则配置
  19. shardingRule:
  20.   tables:
  21.     t_order:
  22.       actualDataNodes: ds$->{0..2}.t_order$->{0..1}
  23.       tableStrategy:
  24.         inline:
  25.           shardingColumn: order_no
  26.           algorithmExpression: t_order$->{order_no.toBigInteger() % 2}
  27.   defaultDataSourceName: ds0
  28.   # 默认库分片策略
  29.   defaultDatabaseStrategy:
  30.     standard:
  31.       shardingColumn: tenant_id
  32.                         # 自定义精确分片策略
  33.       preciseAlgorithmClassName: cn.xtstu.demo.config.CustomDataSourcePreciseShardingAlgorithm
  34.     #hint:
  35.                     #
  36.     #  algorithmClassName: cn.xtstu.demo.config.CustomHintShardingAlgorithm
  37.   defaultTableStrategy:
  38.     none:
  39. props:
  40.   sql.show: true
复制代码

  • 3.自定义精确分片策略。
  1. public class CustomDataSourcePreciseShardingAlgorithm implements PreciseShardingAlgorithm<String> {
  2.     // 构建演示用租户与数据源关系配置
  3.                 private static Map<String, String> tenantDsMap = new HashMap<>();
  4.     static {
  5.         tenantDsMap.put("tenant123", "ds0");
  6.         tenantDsMap.put("tenant456", "ds0");
  7.                                 tenantDsMap.put("tenant789", "ds1");
  8.     }
  9.                
  10.     @Override
  11.     public String doSharding(Collection<String> dataSourceNames, PreciseShardingValue<String> shardingValue) {
  12.                     // 库分片策略配置的分片键是字段tenant_id,根据分片键查询配置的数据源
  13.                     String dsName = tenantDsMap.get(shardingValue.getValue());
  14.         // 如果如前文所属,Sharding子数据源key与dynamic数据源key保持一致的话,这里直接返回就行了
  15.                                 return dsName;
  16.         // TODO 需要处理未匹配到数据源的情况
  17.     }
  18. }
复制代码

  • 4.自定义Hint分片策略(可选),适用于分片键与SQL无关的场景。
  1. public class CustomHintShardingAlgorithm implements HintShardingAlgorithm<Integer> {
  2.     // 构建演示用租户与数据源关系配置
  3.                 private static Map<String, String> tenantDsMap = new HashMap<>();
  4.     static {
  5.         tenantDsMap.put("tenant123", "ds0");
  6.         tenantDsMap.put("tenant456", "ds0");
  7.                                 tenantDsMap.put("tenant789", "ds1");
  8.     }
  9.                
  10.     @Override
  11.     public Collection<String> doSharding(Collection<String> collection, HintShardingValue<Integer> hintShardingValue) {
  12.         Collection<String> result = new ArrayList<>();
  13.         // 从请求头取到当前租户ID
  14.                                 HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
  15.         result.add(tenantDsMap.get(request.getHeader("tenantId")));
  16.                                 // TODO  需要处理未匹配到数据源的情况
  17.         return result;
  18.     }
  19. }
复制代码

  • 5.自定义动态数据源配置(核心就是将sharding数据源及其子数据源添加到动态数据源一起管理)。
  1. @Slf4j
  2. @Configuration
  3. public class CustomDynamicDataSourceConfig {
  4.     @Value("${spring.cloud.nacos.config.extension-configs[0].data-id}")
  5.     private String dataId;
  6.     @Value("${spring.cloud.nacos.config.group:DEFAULT_GROUP}")
  7.     private String group;
  8.     @Resource
  9.     private DynamicDataSourceProperties properties;
  10.     @Resource
  11.     private NacosHelper nacosHelper;
  12.     /**
  13.      * 启动时通过查询Nacos上sharding数据源及分片规则yaml配置初始化sharding-jdbc数据源
  14.      *
  15.      * @return
  16.      */
  17.     @Bean
  18.     public ShardingDataSource shardingDataSource() {
  19.         ConfigService configService = nacosHelper.getConfigService();
  20.         if (configService == null) {
  21.             log.error("连接nacos失败");
  22.         }
  23.         String configInfo = null;
  24.         try {
  25.             configInfo = configService.getConfig(dataId, group, 5000);
  26.         } catch (NacosException e) {
  27.             log.error("获取{}配置失败,异常原因:{}", dataId, ExceptionUtils.getStackTrace(e));
  28.         }
  29.         if (StringUtils.isBlank(configInfo)) {
  30.             log.error("{}配置为空,启动失败", dataId);
  31.             throw new NullPointerException(dataId + "配置为空");
  32.         }
  33.         try {
  34.             // 通过工厂类和yaml配置创建Sharding数据源
  35.             return (ShardingDataSource) YamlShardingDataSourceFactory.createDataSource(configInfo.getBytes(StandardCharsets.UTF_8));
  36.         } catch (Exception e) {
  37.             log.error("创建sharding-jdbc数据源异常:{}", ExceptionUtils.getStackTrace(e));
  38.             throw new NullPointerException("sharding-jdbc数据源为空");
  39.         }
  40.     }
  41.     /**
  42.      * 将动态数据源设置为首选的
  43.      * 当spring存在多个数据源时, 自动注入的是首选的对象
  44.      * 设置为主要的数据源之后,就可以支持shardingJdbc原生的配置方式了
  45.      */
  46.     @Primary
  47.     @Bean
  48.     public DataSource dataSource() {
  49.         DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource();
  50.         dataSource.setPrimary(properties.getPrimary());
  51.         dataSource.setStrict(properties.getStrict());
  52.         dataSource.setStrategy(properties.getStrategy());
  53.         dataSource.setP6spy(properties.getP6spy());
  54.         dataSource.setSeata(properties.getSeata());
  55.         return dataSource;
  56.     }
  57.     /**
  58.      * 初始化动态数据源
  59.      *
  60.      * @return
  61.      */
  62.     @Bean
  63.     public DynamicDataSourceProvider dynamicDataSourceProvider(ShardingDataSource shardingDataSource) {
  64.         return new AbstractDataSourceProvider() {
  65.             @Override
  66.             public Map<String, DataSource> loadDataSources() {
  67.                 Map<String, DataSource> dataSourceMap = new HashMap<>();
  68.                 // 将sharding数据源整体添加到动态数据源里
  69.                 dataSourceMap.put(CommonConstant.SHARDING_DS_NAME, shardingDataSource);
  70.                 // 同时把sharding内部管理的子数据源也添加到动态数据源里
  71.                 Map<String, DataSource> shardingInnerDataSources = shardingDataSource.getDataSourceMap();
  72.                 dataSourceMap.putAll(shardingInnerDataSources);
  73.                 return dataSourceMap;
  74.             }
  75.         };
  76.     }
  77. }
复制代码

  • 6.最后给出一份通过监听Nacos配置变更动态更新数据源的示例代码。注意:这份示例代码中只给出了Sharding配置变更时的处理逻辑,如果是dynamic数据源配置的话,有需要的可以参考着自行实现。
  1. @Slf4j
  2. @Configuration
  3. public class NacosShardingConfigListener {
  4.     @Value("${spring.cloud.nacos.config.extension-configs[0].data-id}")
  5.     private String dataId;
  6.     @Value("${spring.cloud.nacos.config.group:DEFAULT_GROUP}")
  7.     private String group;
  8.     @Value("${spring.application.name}")
  9.     private String appName;
  10.     @Autowired
  11.     private DataSource dataSource;
  12.     @Autowired
  13.     private NacosHelper nacosHelper;
  14.     @PostConstruct
  15.     public void shardingConfigListener() throws Exception {
  16.         ConfigService configService = nacosHelper.getConfigService();
  17.         if (configService == null) {
  18.             return;
  19.         }
  20.         configService.addListener(dataId, group, new Listener() {
  21.             @Override
  22.             public Executor getExecutor() {
  23.                 return null;
  24.             }
  25.             @Override
  26.             public void receiveConfigInfo(String configInfo) {
  27.                 log.info("configInfo:\n{}", configInfo);
  28.                 if (StringUtils.isBlank(configInfo)) {
  29.                     log.warn("sharding-jdbc配置为空,不会刷新数据源");
  30.                     return;
  31.                 }
  32.                 try {
  33.                     if (StringUtils.isNotBlank(configInfo)) {
  34.                         // 通过yaml配置创建sharding数据源(注意:如果分片规则是独立配置文件,那么需要提前合并数据源和分片规则配置)
  35.                         ShardingDataSource shardingDataSource = (ShardingDataSource) YamlShardingDataSourceFactory.createDataSource(configInfo.getBytes(StandardCharsets.UTF_8));
  36.                         Map<String, DataSource> shardingInnerDataSources = shardingDataSource.getDataSourceMap();
  37.                         DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
  38.                         // 遍历sharding子数据源
  39.                         for (String poolName : shardingInnerDataSources.keySet()) {
  40.                             // TODO 这里还有个细节,如果yaml配置删减了数据源,对应数据源应该要从ds中remove掉,且主数据源不能被remove。另外其实只有新增的数据源才需要执行flyway脚本
  41.                             // 将sharding子数据源逐个添加到动态数据源
  42.                             ds.addDataSource(poolName, shardingInnerDataSources.get(poolName));
  43.                             // 通过代码完成数据源Flyway配置,并执行迁移操作
  44.                                                                                                                 Flyway flyway = Flyway.configure()
  45.                                     .dataSource(dataSource)
  46.                                     .table("t_" + poolName + "_" + appName + "_version")
  47.                                     .baselineOnMigrate(true)
  48.                                     .outOfOrder(true)
  49.                                     .baselineVersion("1.0.0")
  50.                                     .baselineDescription(poolName + "初始化")
  51.                                     .locations(CommonConstant.SQL_BASE_LOCATION + CommonConstant.TENANT_DB)
  52.                                     .load();
  53.                             flyway.migrate();
  54.                         }
  55.                         // 将sharding数据源自身也添加到动态数据源
  56.                         ds.addDataSource(CommonConstant.SHARDING_DS_NAME, shardingDataSource);
  57.                         log.info("动态数据源刷新完成,现有数据源:{}", JSONUtil.toJsonStr(ds.getDataSources().keySet()));
  58.                     }
  59.                 } catch (Exception e) {
  60.                     log.error("创建sharding-jdbc数据源异常:{}", ExceptionUtils.getStackTrace(e));
  61.                 }
  62.             }
  63.         });
  64.     }
  65. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王國慶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表