ToB企服应用市场:ToB评测及商务社交产业平台

标题: 芝法酱学习笔记(2.6)——flink-cdc监听mysql binlog并同步数据至elastic- [打印本页]

作者: 火影    时间: 昨天 20:51
标题: 芝法酱学习笔记(2.6)——flink-cdc监听mysql binlog并同步数据至elastic-
一、需求背景

在有的项目中,尤其是进销存类的saas软件,一开始为了快速把产物做出来,并没有考虑缓存问题。而这类软件,有着复杂的业务逻辑。假如想在原先的代码中,添加redis缓存,改动面将非常大,还需要大量的测试工作。有些时间会有更离谱的环境,比如一些一些项目可能用JDK1.6写的,想要在这个框架下接入redis缓存,也会变得非常困难。
这时我们就会想到,能否像mysql的主从复制一样,监听mysql的binlog,对数据进行更新呢?Flink CDC就呼之欲出。
二、mysql环境搭建

需要留意的是,当前的flink-cdc,仅仅支持mysql8.0,8.4是完全不支持的。
由于我的mysql装的是8.4,为了方便起见,我们使用docker安装mysql8.0
2.1 docker-compose.yml

  1. services:
  2.   master:
  3.     image: mysql:8.0.41
  4.     container_name: mysql-8
  5.     restart: always
  6.     #mem_limit: 512M
  7.     environment:
  8.       MYSQL_ROOT_PASSWORD: study@2025
  9.       TZ: Asia/Shanghai
  10.     ports:
  11.       - "3307:3306"
  12.     volumes:
  13.       - ./cfg/my.cnf:/etc/my.cnf
  14.       - ./data:/var/lib/mysql
  15.       - ./initdb:/docker-entrypoint-initdb.d
  16.       - ./dump:/var/dump
  17.       - ./log:/var/log
  18.     networks:
  19.       - mysql-cluster
  20. networks:
  21.   mysql-cluster:
复制代码
2.2 初始化sql

  1. -- 创建复制用户
  2. create role role_app;
  3. GRANT SELECT,UPDATE,INSERT,DELETE ON *.* to role_app;
  4. GRANT REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO role_app;
  5. CREATE USER 'app'@'%' IDENTIFIED WITH caching_sha2_password by 'study@2025' DEFAULT ROLE role_app COMMENT 'app user';
  6. FLUSH PRIVILEGES;
  7. -- 创建两个数据库,用于测试
  8. CREATE SCHEMA `shop-center`;
  9. FLUSH TABLES WITH READ LOCK;
复制代码
2.3 留意点

起首把容器卷 - ./cfg/my.cnf:/etc/my.cnf的这一句注释掉,启动服务
而后使用下面语句,把配置文件粘出来
  1. docker exec <id> cp /etc/my.cnf ./cfg/my.cnf
复制代码
之后把注释打开,再重新启动
三、工程搭建与pom引用

3.1 主模块pom引用

flink步伐不需要接入Spring框架,直接一个main就可运行。
但我们还想使用一些我们熟悉的接口,来操作redis和el。
  1.                 <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-core</artifactId>
  4.             <version>1.20.0</version>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>org.apache.flink</groupId>
  8.             <artifactId>flink-streaming-java</artifactId>
  9.             <version>1.20.0</version>
  10.         </dependency>
  11.         <dependency>
  12.             <groupId>org.apache.flink</groupId>
  13.             <artifactId>flink-clients</artifactId>
  14.             <version>1.20.0</version>
  15.         </dependency>
  16.         <dependency>
  17.             <groupId>org.apache.flink</groupId>
  18.             <artifactId>flink-runtime</artifactId>
  19.             <version>1.20.0</version>
  20.         </dependency>
  21.         
  22.                 <dependency>
  23.             <groupId>org.apache.flink</groupId>
  24.             <artifactId>flink-connector-mysql-cdc</artifactId>
  25.             <version>3.3.0</version>
  26.         </dependency>       
复制代码
3.2 common-data模块

一些entity数据,为了保持各模块共通,最好独立到一个common模块。
同时,我还会把redis和el-search的操作,在这个模块接入并封装
3.2.1 pom引用

  1. <dependencies>
  2.         <dependency>
  3.             <groupId>org.yaml</groupId>
  4.             <artifactId>snakeyaml</artifactId>
  5.             <version>2.3</version>
  6.         </dependency>
  7.         <dependency>
  8.             <groupId>co.elastic.clients</groupId>
  9.             <artifactId>elasticsearch-java</artifactId>
  10.             <version>8.17.0</version>
  11.         </dependency>
  12.         <dependency>
  13.             <groupId>org.elasticsearch</groupId>
  14.             <artifactId>elasticsearch-x-content</artifactId>
  15.             <version>8.17.0</version>
  16.         </dependency>
  17.         <dependency>
  18.             <groupId>cn.hutool</groupId>
  19.             <artifactId>hutool-core</artifactId>
  20.             <version>5.8.32</version>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>org.projectlombok</groupId>
  24.             <artifactId>lombok</artifactId>
  25.             <optional>true</optional>
  26.             <scope>provided</scope>
  27.         </dependency>
  28.         <dependency>
  29.             <groupId>org.springframework.data</groupId>
  30.             <artifactId>spring-data-redis</artifactId>
  31.             <version>3.4.2</version>
  32.         </dependency>
  33.         <dependency>
  34.             <groupId>com.alibaba.fastjson2</groupId>
  35.             <artifactId>fastjson2-extension-spring6</artifactId>
  36.             <version>2.0.54</version>
  37.         </dependency>
  38.         <dependency>
  39.             <groupId>org.apache.commons</groupId>
  40.             <artifactId>commons-pool2</artifactId>
  41.             <version>2.12.1</version>
  42.         </dependency>
  43.         <dependency>
  44.             <groupId>com.alibaba.fastjson2</groupId>
  45.             <artifactId>fastjson2</artifactId>
  46.             <version>2.0.54</version>
  47.         </dependency>
  48.         <dependency>
  49.             <groupId>io.lettuce</groupId>
  50.             <artifactId>lettuce-core</artifactId>
  51.             <version>6.4.2.RELEASE</version>
  52.         </dependency>
  53.         <!-- Flink Redis Connector -->
  54.         <!--        <dependency>-->
  55.         <!--            <groupId>org.apache.bahir</groupId>-->
  56.         <!--            <artifactId>flink-connector-redis_2.12</artifactId>-->
  57.         <!--            <version>1.1.0</version>-->
  58.         <!--        </dependency>-->
  59.     </dependencies>
复制代码
3.2.2 一些根本的entity类

  1. @Data
  2. public class GenItemEntity{
  3.     Long id;
  4.     String name;
  5.     Long price;
  6.     String brand;
  7.     String specification;
  8.     Integer version;
  9. }
复制代码
四、 redis操作和elsearch操作的封装

4.1 redis操作的封装

在pom上,接入spring-data-redis
而后,我们可以使用我们熟悉的RedisTemplate来操作redis
  1. public class RedisConfig {
  2.     public RedisConfig(){
  3.         init();
  4.     }
  5.     protected FastJsonConfig redisFastJson(){
  6.         FastJsonConfig config = new FastJsonConfig();
  7.         config.setWriterFeatures(
  8.                 JSONWriter.Feature.WriteNullListAsEmpty,
  9.                 // 写入类名
  10.                 JSONWriter.Feature.WriteClassName,
  11.                 // 将 Boolean 类型的 null 转成 false
  12.                 JSONWriter.Feature.WriteNullBooleanAsFalse,
  13.                 JSONWriter.Feature.WriteEnumsUsingName);
  14.         config.setReaderFeatures(
  15.                 JSONReader.Feature.SupportClassForName,
  16.                 // 支持autoType
  17.                 JSONReader.Feature.SupportAutoType);
  18.         return config;
  19.     }
  20.     protected FastJsonRedisSerializer fastJsonRedisSerializer(FastJsonConfig pFastJsonConfig) {
  21.         FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
  22.         fastJsonRedisSerializer.setFastJsonConfig(pFastJsonConfig);
  23.         return fastJsonRedisSerializer;
  24.     }
  25.     protected RedisConnectionFactory redisConnectionFactory(){
  26.             // 这里最好读配置,我懒得搞了
  27.         RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration("192.168.0.64",6379);
  28.         redisConfiguration.setPassword("study@2025");
  29.         GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
  30.         poolConfig.setMaxTotal(2);  // 最大连接数
  31.         poolConfig.setMaxIdle(2);    // 最大空闲连接数
  32.         poolConfig.setMinIdle(2);    // 最小空闲连接数
  33.         poolConfig.setMaxWait(Duration.ofMillis(3000)); // 连接等待时间
  34.         ClientResources clientResources = DefaultClientResources.create();
  35.         LettucePoolingClientConfiguration lettucePoolingClientConfiguration = LettucePoolingClientConfiguration.builder()
  36.                 .poolConfig(poolConfig)
  37.                 .build();
  38.         LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
  39.                 .clientResources(clientResources)
  40.                 .commandTimeout(Duration.ofSeconds(5))
  41.                 .poolConfig(poolConfig)
  42.                 .build();
  43.         LettuceConnectionFactory redisConnectionFactory = new LettuceConnectionFactory(redisConfiguration,lettucePoolingClientConfiguration);
  44.         redisConnectionFactory.afterPropertiesSet(); // 初始化连接工厂
  45.         return redisConnectionFactory;
  46.     }
  47.     protected RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory, FastJsonRedisSerializer pFastJsonRedisSerializer) {
  48.         RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
  49.         redisTemplate.setConnectionFactory(factory);
  50.         redisTemplate.setEnableTransactionSupport(true);
  51.         redisTemplate.setKeySerializer(new StringRedisSerializer());
  52.         redisTemplate.setValueSerializer(pFastJsonRedisSerializer);
  53.         redisTemplate.setHashKeySerializer(new StringRedisSerializer());
  54.         redisTemplate.setHashValueSerializer(pFastJsonRedisSerializer);
  55.         return redisTemplate;
  56.     }
  57.     protected void init(){
  58.         mFastJsonConfig = redisFastJson();
  59.         mFastJsonRedisSerializer = fastJsonRedisSerializer(mFastJsonConfig);
  60.         mRedisConnectionFactory = redisConnectionFactory();
  61.         mRedisTemplate = redisTemplate(mRedisConnectionFactory,mFastJsonRedisSerializer);
  62.         mRedisTemplate.afterPropertiesSet();
  63.     }
  64.     private FastJsonConfig mFastJsonConfig;
  65.     private FastJsonRedisSerializer mFastJsonRedisSerializer;
  66.     private RedisConnectionFactory mRedisConnectionFactory;
  67.     private RedisTemplate<String, Object> mRedisTemplate;
  68.     public static RedisTemplate<String, Object> redisTemplate(){
  69.         return Holder.INSTANCE.mRedisTemplate;
  70.     }
  71.     public static <T> String serialize(T entity){
  72.         return JSON.toJSONString(entity,Holder.INSTANCE.mFastJsonConfig.getWriterFeatures());
  73.     }
  74.     private static class Holder {
  75.         private static final RedisConfig INSTANCE = new RedisConfig();
  76.     }
  77. }
复制代码
4.2 elasticsearch操作的封装

由于el-search的毗连器,需要配置apikey,以及https,我们最好使用yml配置,并且把http_ca.crt放进该模块的resouce中。
在IDEA环境下,有可能找不到子模块的资源,这时在主模块引入子模块时,只需要如许配置即可:
  1.         <dependency>
  2.             <groupId>indi.zhifa.study2025</groupId>
  3.             <artifactId>common-data</artifactId>
  4.             <version>${project.version}</version>
  5.             <scope>compile</scope>
  6.         </dependency>
复制代码
留意,重点是<scope>compile</scope>
  1. public class EsClientConfig {
  2.     @Setter
  3.     @Getter
  4.     private String host;
  5.     @Setter
  6.     @Getter
  7.     private Integer port;
  8.     @Setter
  9.     @Getter
  10.     private String apiKey;
  11. }
复制代码
  1. public class ElasticSearchClientProvider {
  2.     private EsClientConfig esClientConfig;
  3.     private RestClientBuilder builder;
  4.     public ElasticSearchClientProvider() {
  5.         try{
  6.             init();
  7.         }catch (Exception e){
  8.             e.printStackTrace();
  9.         }
  10.     }
  11.     public void init() throws IOException {
  12.         Yaml yaml = new Yaml();
  13.         try (InputStream inputStream = FileUtil.class.getClassLoader().getResourceAsStream("el-config.yml")) {
  14.             if (inputStream == null) {
  15.                 throw new IllegalArgumentException("File not found: el-config.yml");
  16.             }
  17.             esClientConfig = yaml.loadAs(inputStream, EsClientConfig.class);
  18.         } catch (Exception e) {
  19.             throw new RuntimeException("Failed to load YAML file", e);
  20.         }
  21.         SSLContext sslContext;
  22.         try (InputStream inputStream = FileUtil.class.getClassLoader().getResourceAsStream("http_ca.crt")){
  23.             sslContext = TransportUtils.sslContextFromHttpCaCrt(inputStream);
  24.         }catch (Exception e) {
  25.             throw new RuntimeException("Failed to load http_ca.crt", e);
  26.         }
  27.         builder = RestClient.builder(
  28.                         new HttpHost(esClientConfig.getHost(), esClientConfig.getPort(), "https") // 替换为你的Elasticsearch地址
  29.                 ).setDefaultHeaders(new Header[]{
  30.                         new BasicHeader("Authorization", "ApiKey " + esClientConfig.getApiKey())
  31.                 })
  32.                 .setFailureListener(new RestClient.FailureListener(){
  33.                     @Override
  34.                     public void onFailure(Node node) {
  35.                         super.onFailure(node);
  36.                     }
  37.                 }).setHttpClientConfigCallback(hc->
  38.                    hc.setSSLContext(sslContext)
  39.                 );
  40.     }
  41.     public ElasticsearchClient get(){
  42.         RestClient restClient = builder.build();
  43.         ElasticsearchTransport transport = new RestClientTransport(
  44.                 restClient, new JacksonJsonpMapper());
  45.         ElasticsearchClient esClient = new ElasticsearchClient(transport);
  46.         return esClient;
  47.     }
  48.     public static ElasticSearchClientProvider getInstance(){
  49.         return Holder.INSTANCE;
  50.     }
  51.     private static class Holder {
  52.         private static final ElasticSearchClientProvider INSTANCE = new ElasticSearchClientProvider();
  53.     }
  54. }
复制代码
五、 redis和elsearch的自定义sink编写

5.1 redis的sink编写

我们希望传入redis时,数据是被处置惩罚好的,redis的sink不需要处置惩罚任何逻辑,只管更新缓存和删除缓存。
5.1.1 RedisSinkCommand

  1. public class RedisSinkCommand<T> {
  2.     @Setter
  3.     @Getter
  4.     protected ERedisCommand command;
  5.     @Setter
  6.     @Getter
  7.     protected long dua;
  8.     @Setter
  9.     @Getter
  10.     protected  String key;
  11.     @Setter
  12.     @Getter
  13.     protected  T value;
  14.     public void initSet(String pKey, T pValue) {
  15.         command = ERedisCommand.SET;
  16.         dua = 300;
  17.         key = pKey;
  18.         value = pValue;
  19.     }
  20.     public void initDel(String pKey) {
  21.         command = ERedisCommand.DEL;
  22.         key = pKey;
  23.     }
  24. }
复制代码
  1. public enum ERedisCommand {
  2.     SET,
  3.     DEL
  4. }
复制代码
5.1.2 SpringDataRedisSink

  1. @Slf4j
  2. public class SpringDataRedisSink<T> implements Sink<RedisSinkCommand<T>> {
  3.     @Override
  4.     public SinkWriter<RedisSinkCommand<T>> createWriter(InitContext context) throws IOException {
  5.         return null;
  6.     }
  7.     @Override
  8.     public SinkWriter<RedisSinkCommand<T>> createWriter(WriterInitContext context){
  9.         return new LettuceRedisSinkWriter();
  10.     }
  11.     class LettuceRedisSinkWriter implements SinkWriter<RedisSinkCommand<T>> {
  12.         @Override
  13.         public void write(RedisSinkCommand<T> pCmd, Context context) throws IOException, InterruptedException {
  14.             RedisTemplate<String, Object> redisTemplate = RedisConfig.redisTemplate();
  15.             switch (pCmd.getCommand()){
  16.                 case SET-> {
  17.                     redisTemplate.opsForValue().set(pCmd.getKey(),pCmd.getValue(),pCmd.getDua());
  18.                 }
  19.                 case DEL -> {
  20.                     redisTemplate.delete(pCmd.getKey());
  21.                 }
  22.             }
  23.         }
  24.         @Override
  25.         public void flush(boolean endOfInput) throws IOException, InterruptedException {
  26.         }
  27.         @Override
  28.         public void close() throws Exception {
  29.         }
  30.     }
  31. }
复制代码
5.2 elasticsearch的sink编写

elasticsearch的sink与redis的要求一致,在sink中不关心业务逻辑
5.2.1 ElCommand

  1. @Data
  2. public class ElCommand<T> {
  3.     protected EElCommand command;
  4.     protected String index;
  5.     protected T entity;
  6.     protected String id;
  7. }
复制代码
  1. public enum EElCommand {
  2.     CREATE,UPDATE,DELETE
  3. }
复制代码
5.2.2 ElSearchSink

  1. public class ElSearchSink<T> implements Sink<ElCommand<T>> {
  2.     @Override
  3.     public SinkWriter<ElCommand<T>> createWriter(InitContext context) throws IOException {
  4.         return null;
  5.     }
  6.     @Override
  7.     public SinkWriter<ElCommand<T>> createWriter(WriterInitContext context){
  8.         return new ElSearchSink.ElSearchSinkWriter();
  9.     }
  10.     class ElSearchSinkWriter implements SinkWriter<ElCommand<T>> {
  11.         @Override
  12.         public void write(ElCommand<T> pCmd, Context context) throws IOException, InterruptedException {
  13.             ElasticSearchClientProvider elasticSearchClientProvider = ElasticSearchClientProvider.getInstance();
  14.             ElasticsearchClient elClient =  elasticSearchClientProvider.get();
  15.             String index = pCmd.getIndex();
  16.             String id = pCmd.getId();
  17.             T entity = pCmd.getEntity();
  18.             switch (pCmd.getCommand()){
  19.                 case CREATE,UPDATE -> {
  20.                     elClient.index(i->i.index(index).id(id).document(entity));
  21.                 }
  22.                 case DELETE -> {
  23.                     elClient.delete(d->d.index(index).id(id));
  24.                 }
  25.             }
  26.         }
  27.         @Override
  28.         public void flush(boolean endOfInput) throws IOException, InterruptedException {
  29.         }
  30.         @Override
  31.         public void close() throws Exception {
  32.         }
  33.     }
  34. }
复制代码
六、主函数编写

  1. public class FlinkMain {
  2.     public static void main(String[] args) throws Exception {
  3.         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  4.                 .hostname("192.168.0.64")
  5.                 .port(3307)
  6.                 .databaseList("shop-center") // set captured database
  7.                 .tableList("shop-center.item") // set captured table
  8.                 .username("app")
  9.                 .password("study@2025")
  10.                 .serverTimeZone("Asia/Shanghai")
  11.                 .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
  12.                 .startupOptions(StartupOptions.latest())
  13.                 .includeSchemaChanges(true)
  14.                 .build();
  15. //        FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder()
  16. //                .setHost("192.168.0.64") // 替换为 Redis 主机
  17. //                .setPort(6379) // Redis 端口
  18. //                .setPassword("ilv0404@1314") // 如果有密码,设置密码
  19. //                .build();
  20.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21.         env.setParallelism(1);
  22. //        DataStream<BinlogInfo> mysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source")
  23. //                .map(str->{
  24. //                    BinlogInfo res =JSONObject.parseObject(str, BinlogInfo.class);
  25. //                    return res;
  26. //                    }
  27. //                 ).filter(bi->bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d"));
  28. //
  29. //        mysqlStream.addSink(new RedisSink(jedisConfig,new RedisItemMapper()));
  30.         DataStream<RedisSinkCommand<GenItemEntity>> newMysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source to redis")
  31.                         .map(str->JSONObject.parseObject(str, new TypeReference<BinlogInfo<GenItemEntity>>() {
  32.                         }), TypeInformation.of(new TypeHint<BinlogInfo<GenItemEntity>>() {}))
  33.                 .filter(bi->bi.getSource().getTable().equals("item") &&  (bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d")))
  34.                 .map(bi->{
  35.                     String op = bi.getOp();
  36.                     GenItemEntity itemEntity = bi.getAfter();
  37.                     String key = "item:"+itemEntity.getId();
  38.                     switch (op){
  39.                         case "c","u"->{
  40.                             RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
  41.                             redisSinkCommand.initSet(key,itemEntity);
  42.                             return redisSinkCommand;
  43.                         }
  44.                         case "d" ->{
  45.                             RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
  46.                             redisSinkCommand.initDel(key);
  47.                             return redisSinkCommand;
  48.                         }
  49.                         default -> {
  50.                             RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
  51.                             redisSinkCommand.initDel(key);
  52.                             return redisSinkCommand;
  53.                         }
  54.                     }
  55.                 },TypeInformation.of(new TypeHint<RedisSinkCommand<GenItemEntity>>() {}));
  56.         newMysqlStream.sinkTo(new SpringDataRedisSink<GenItemEntity>());
  57.         DataStream<ElCommand<GenItemEntity>> mySqlToElStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source to el")
  58.                 .map(str->JSONObject.parseObject(str, new TypeReference<BinlogInfo<GenItemEntity>>() {})
  59.                         , TypeInformation.of(new TypeHint<BinlogInfo<GenItemEntity>>() {}))
  60.         .filter(bi->bi.getSource().getTable().equals("item") &&  (bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d")))
  61.                 .map(bi->{
  62.                     ElCommand elCommand = new ElCommand();
  63.                     GenItemEntity itemEntity = bi.getAfter();
  64.                     elCommand.setId(itemEntity.getId().toString());
  65.                     elCommand.setEntity(itemEntity);
  66.                     elCommand.setIndex("item_npc");
  67.                     String op = bi.getOp();
  68.                     switch (op){
  69.                         case "c"->elCommand.setCommand(EElCommand.CREATE);
  70.                         case "u"->elCommand.setCommand(EElCommand.UPDATE);
  71.                         case "d"->elCommand.setCommand(EElCommand.DELETE);
  72.                     }
  73.                     return elCommand;
  74.                 },TypeInformation.of(new TypeHint<ElCommand<GenItemEntity>>() {}));
  75.         mySqlToElStream.sinkTo(new ElSearchSink());
  76.         env.execute();
  77.     }
  78. }
复制代码
七、代码展示

请道友移步码云
八、相关实践的思考

8.1 redis相关

我这里的代码,仅仅是学习用的。在真实项目中,redis缓存的更新,通常源于查询时,假如发现缓存中没有数据,则查mysql,并把缓存数据加入redis。假如监听到表数据的更改或删除,则直接删除相应缓存,等待查询时重新加入缓存。固然,如许做在同一数据并发访问时,会有重复设置缓存的可能性,我们把这种征象叫缓存穿透。可以在更新缓存前,用redisson加个锁,防止重复读取mysql并更新redis。
  1. public class CacheService {
  2.     @Autowired
  3.     private RedissonClient redissonClient;
  4.     @Autowired
  5.     private RedisTemplate<String, Object> redisTemplate;
  6.     @Autowired
  7.     private DataRepository dataRepository;
  8.     public Object getData(String key) {
  9.         // 第一次检查缓存
  10.         Object value = redisTemplate.opsForValue().get(key);
  11.         if (value != null) {
  12.             return value;
  13.         }
  14.         RLock lock = redissonClient.getLock(key + ":LOCK");
  15.         try {
  16.             // 尝试加锁,设置锁超时时间防止死锁
  17.             if (lock.tryLock(5, 30, TimeUnit.SECONDS)) {
  18.                 try {
  19.                     // 双重检查缓存
  20.                     value = redisTemplate.opsForValue().get(key);
  21.                     if (value != null) {
  22.                         return value;
  23.                     }
  24.                     // 查询数据库
  25.                     Object dbData = dataRepository.findById(key);
  26.                     // 更新缓存,设置合理过期时间
  27.                     redisTemplate.opsForValue().set(key, dbData, 1, TimeUnit.HOURS);
  28.                     return dbData;
  29.                 } finally {
  30.                     lock.unlock();
  31.                 }
  32.             } else {
  33.                 // 未获取到锁,短暂等待后重试
  34.                 Thread.sleep(100);
  35.                 return redisTemplate.opsForValue().get(key);
  36.             }
  37.         } catch (InterruptedException e) {
  38.             Thread.currentThread().interrupt();
  39.             throw new RuntimeException("获取锁失败", e);
  40.         }
  41.     }
  42. }
复制代码
8.2 es相关

对于es,着实更新数据不建议接纳这种方式。由于es中需要反范式设计,不可能用1张表的数据做es查询数据的。
对于电商体系的商品查询,我们可以在商品上架的时间更新es。并且商品商家状态下,不允许修改商品。商品下架时,删除es的数据。想要修改商品数据,可以先下架,再修改,而后上架。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4