一、需求背景
在有的项目中,尤其是进销存类的saas软件,一开始为了快速把产物做出来,并没有考虑缓存问题。而这类软件,有着复杂的业务逻辑。假如想在原先的代码中,添加redis缓存,改动面将非常大,还需要大量的测试工作。有些时间会有更离谱的环境,比如一些一些项目可能用JDK1.6写的,想要在这个框架下接入redis缓存,也会变得非常困难。
这时我们就会想到,能否像mysql的主从复制一样,监听mysql的binlog,对数据进行更新呢?Flink CDC就呼之欲出。
二、mysql环境搭建
需要留意的是,当前的flink-cdc,仅仅支持mysql8.0,8.4是完全不支持的。
由于我的mysql装的是8.4,为了方便起见,我们使用docker安装mysql8.0
2.1 docker-compose.yml
- services:
- master:
- image: mysql:8.0.41
- container_name: mysql-8
- restart: always
- #mem_limit: 512M
- environment:
- MYSQL_ROOT_PASSWORD: study@2025
- TZ: Asia/Shanghai
- ports:
- - "3307:3306"
- volumes:
- - ./cfg/my.cnf:/etc/my.cnf
- - ./data:/var/lib/mysql
- - ./initdb:/docker-entrypoint-initdb.d
- - ./dump:/var/dump
- - ./log:/var/log
- networks:
- - mysql-cluster
- networks:
- mysql-cluster:
复制代码 2.2 初始化sql
- -- 创建复制用户
- create role role_app;
- GRANT SELECT,UPDATE,INSERT,DELETE ON *.* to role_app;
- GRANT REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO role_app;
- CREATE USER 'app'@'%' IDENTIFIED WITH caching_sha2_password by 'study@2025' DEFAULT ROLE role_app COMMENT 'app user';
- FLUSH PRIVILEGES;
- -- 创建两个数据库,用于测试
- CREATE SCHEMA `shop-center`;
- FLUSH TABLES WITH READ LOCK;
复制代码 2.3 留意点
起首把容器卷 - ./cfg/my.cnf:/etc/my.cnf的这一句注释掉,启动服务
而后使用下面语句,把配置文件粘出来
- docker exec <id> cp /etc/my.cnf ./cfg/my.cnf
复制代码 之后把注释打开,再重新启动
三、工程搭建与pom引用
3.1 主模块pom引用
flink步伐不需要接入Spring框架,直接一个main就可运行。
但我们还想使用一些我们熟悉的接口,来操作redis和el。
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>1.20.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>1.20.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>1.20.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime</artifactId>
- <version>1.20.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>3.3.0</version>
- </dependency>
复制代码 3.2 common-data模块
一些entity数据,为了保持各模块共通,最好独立到一个common模块。
同时,我还会把redis和el-search的操作,在这个模块接入并封装
3.2.1 pom引用
- <dependencies>
- <dependency>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- <version>2.3</version>
- </dependency>
- <dependency>
- <groupId>co.elastic.clients</groupId>
- <artifactId>elasticsearch-java</artifactId>
- <version>8.17.0</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch-x-content</artifactId>
- <version>8.17.0</version>
- </dependency>
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-core</artifactId>
- <version>5.8.32</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.data</groupId>
- <artifactId>spring-data-redis</artifactId>
- <version>3.4.2</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.fastjson2</groupId>
- <artifactId>fastjson2-extension-spring6</artifactId>
- <version>2.0.54</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-pool2</artifactId>
- <version>2.12.1</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.fastjson2</groupId>
- <artifactId>fastjson2</artifactId>
- <version>2.0.54</version>
- </dependency>
- <dependency>
- <groupId>io.lettuce</groupId>
- <artifactId>lettuce-core</artifactId>
- <version>6.4.2.RELEASE</version>
- </dependency>
- <!-- Flink Redis Connector -->
- <!-- <dependency>-->
- <!-- <groupId>org.apache.bahir</groupId>-->
- <!-- <artifactId>flink-connector-redis_2.12</artifactId>-->
- <!-- <version>1.1.0</version>-->
- <!-- </dependency>-->
- </dependencies>
复制代码 3.2.2 一些根本的entity类
- @Data
- public class GenItemEntity{
- Long id;
- String name;
- Long price;
- String brand;
- String specification;
- Integer version;
- }
复制代码 四、 redis操作和elsearch操作的封装
4.1 redis操作的封装
在pom上,接入spring-data-redis
而后,我们可以使用我们熟悉的RedisTemplate来操作redis
- public class RedisConfig {
- public RedisConfig(){
- init();
- }
- protected FastJsonConfig redisFastJson(){
- FastJsonConfig config = new FastJsonConfig();
- config.setWriterFeatures(
- JSONWriter.Feature.WriteNullListAsEmpty,
- // 写入类名
- JSONWriter.Feature.WriteClassName,
- // 将 Boolean 类型的 null 转成 false
- JSONWriter.Feature.WriteNullBooleanAsFalse,
- JSONWriter.Feature.WriteEnumsUsingName);
- config.setReaderFeatures(
- JSONReader.Feature.SupportClassForName,
- // 支持autoType
- JSONReader.Feature.SupportAutoType);
- return config;
- }
- protected FastJsonRedisSerializer fastJsonRedisSerializer(FastJsonConfig pFastJsonConfig) {
- FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
- fastJsonRedisSerializer.setFastJsonConfig(pFastJsonConfig);
- return fastJsonRedisSerializer;
- }
- protected RedisConnectionFactory redisConnectionFactory(){
- // 这里最好读配置,我懒得搞了
- RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration("192.168.0.64",6379);
- redisConfiguration.setPassword("study@2025");
- GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
- poolConfig.setMaxTotal(2); // 最大连接数
- poolConfig.setMaxIdle(2); // 最大空闲连接数
- poolConfig.setMinIdle(2); // 最小空闲连接数
- poolConfig.setMaxWait(Duration.ofMillis(3000)); // 连接等待时间
- ClientResources clientResources = DefaultClientResources.create();
- LettucePoolingClientConfiguration lettucePoolingClientConfiguration = LettucePoolingClientConfiguration.builder()
- .poolConfig(poolConfig)
- .build();
- LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
- .clientResources(clientResources)
- .commandTimeout(Duration.ofSeconds(5))
- .poolConfig(poolConfig)
- .build();
- LettuceConnectionFactory redisConnectionFactory = new LettuceConnectionFactory(redisConfiguration,lettucePoolingClientConfiguration);
- redisConnectionFactory.afterPropertiesSet(); // 初始化连接工厂
- return redisConnectionFactory;
- }
- protected RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory, FastJsonRedisSerializer pFastJsonRedisSerializer) {
- RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
- redisTemplate.setConnectionFactory(factory);
- redisTemplate.setEnableTransactionSupport(true);
- redisTemplate.setKeySerializer(new StringRedisSerializer());
- redisTemplate.setValueSerializer(pFastJsonRedisSerializer);
- redisTemplate.setHashKeySerializer(new StringRedisSerializer());
- redisTemplate.setHashValueSerializer(pFastJsonRedisSerializer);
- return redisTemplate;
- }
- protected void init(){
- mFastJsonConfig = redisFastJson();
- mFastJsonRedisSerializer = fastJsonRedisSerializer(mFastJsonConfig);
- mRedisConnectionFactory = redisConnectionFactory();
- mRedisTemplate = redisTemplate(mRedisConnectionFactory,mFastJsonRedisSerializer);
- mRedisTemplate.afterPropertiesSet();
- }
- private FastJsonConfig mFastJsonConfig;
- private FastJsonRedisSerializer mFastJsonRedisSerializer;
- private RedisConnectionFactory mRedisConnectionFactory;
- private RedisTemplate<String, Object> mRedisTemplate;
- public static RedisTemplate<String, Object> redisTemplate(){
- return Holder.INSTANCE.mRedisTemplate;
- }
- public static <T> String serialize(T entity){
- return JSON.toJSONString(entity,Holder.INSTANCE.mFastJsonConfig.getWriterFeatures());
- }
- private static class Holder {
- private static final RedisConfig INSTANCE = new RedisConfig();
- }
- }
复制代码 4.2 elasticsearch操作的封装
由于el-search的毗连器,需要配置apikey,以及https,我们最好使用yml配置,并且把http_ca.crt放进该模块的resouce中。
在IDEA环境下,有可能找不到子模块的资源,这时在主模块引入子模块时,只需要如许配置即可:
- <dependency>
- <groupId>indi.zhifa.study2025</groupId>
- <artifactId>common-data</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
复制代码 留意,重点是<scope>compile</scope>
- public class EsClientConfig {
- @Setter
- @Getter
- private String host;
- @Setter
- @Getter
- private Integer port;
- @Setter
- @Getter
- private String apiKey;
- }
复制代码- public class ElasticSearchClientProvider {
- private EsClientConfig esClientConfig;
- private RestClientBuilder builder;
- public ElasticSearchClientProvider() {
- try{
- init();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- public void init() throws IOException {
- Yaml yaml = new Yaml();
- try (InputStream inputStream = FileUtil.class.getClassLoader().getResourceAsStream("el-config.yml")) {
- if (inputStream == null) {
- throw new IllegalArgumentException("File not found: el-config.yml");
- }
- esClientConfig = yaml.loadAs(inputStream, EsClientConfig.class);
- } catch (Exception e) {
- throw new RuntimeException("Failed to load YAML file", e);
- }
- SSLContext sslContext;
- try (InputStream inputStream = FileUtil.class.getClassLoader().getResourceAsStream("http_ca.crt")){
- sslContext = TransportUtils.sslContextFromHttpCaCrt(inputStream);
- }catch (Exception e) {
- throw new RuntimeException("Failed to load http_ca.crt", e);
- }
- builder = RestClient.builder(
- new HttpHost(esClientConfig.getHost(), esClientConfig.getPort(), "https") // 替换为你的Elasticsearch地址
- ).setDefaultHeaders(new Header[]{
- new BasicHeader("Authorization", "ApiKey " + esClientConfig.getApiKey())
- })
- .setFailureListener(new RestClient.FailureListener(){
- @Override
- public void onFailure(Node node) {
- super.onFailure(node);
- }
- }).setHttpClientConfigCallback(hc->
- hc.setSSLContext(sslContext)
- );
- }
- public ElasticsearchClient get(){
- RestClient restClient = builder.build();
- ElasticsearchTransport transport = new RestClientTransport(
- restClient, new JacksonJsonpMapper());
- ElasticsearchClient esClient = new ElasticsearchClient(transport);
- return esClient;
- }
- public static ElasticSearchClientProvider getInstance(){
- return Holder.INSTANCE;
- }
- private static class Holder {
- private static final ElasticSearchClientProvider INSTANCE = new ElasticSearchClientProvider();
- }
- }
复制代码 五、 redis和elsearch的自定义sink编写
5.1 redis的sink编写
我们希望传入redis时,数据是被处置惩罚好的,redis的sink不需要处置惩罚任何逻辑,只管更新缓存和删除缓存。
5.1.1 RedisSinkCommand
- public class RedisSinkCommand<T> {
- @Setter
- @Getter
- protected ERedisCommand command;
- @Setter
- @Getter
- protected long dua;
- @Setter
- @Getter
- protected String key;
- @Setter
- @Getter
- protected T value;
- public void initSet(String pKey, T pValue) {
- command = ERedisCommand.SET;
- dua = 300;
- key = pKey;
- value = pValue;
- }
- public void initDel(String pKey) {
- command = ERedisCommand.DEL;
- key = pKey;
- }
- }
复制代码- public enum ERedisCommand {
- SET,
- DEL
- }
复制代码 5.1.2 SpringDataRedisSink
- @Slf4j
- public class SpringDataRedisSink<T> implements Sink<RedisSinkCommand<T>> {
- @Override
- public SinkWriter<RedisSinkCommand<T>> createWriter(InitContext context) throws IOException {
- return null;
- }
- @Override
- public SinkWriter<RedisSinkCommand<T>> createWriter(WriterInitContext context){
- return new LettuceRedisSinkWriter();
- }
- class LettuceRedisSinkWriter implements SinkWriter<RedisSinkCommand<T>> {
- @Override
- public void write(RedisSinkCommand<T> pCmd, Context context) throws IOException, InterruptedException {
- RedisTemplate<String, Object> redisTemplate = RedisConfig.redisTemplate();
- switch (pCmd.getCommand()){
- case SET-> {
- redisTemplate.opsForValue().set(pCmd.getKey(),pCmd.getValue(),pCmd.getDua());
- }
- case DEL -> {
- redisTemplate.delete(pCmd.getKey());
- }
- }
- }
- @Override
- public void flush(boolean endOfInput) throws IOException, InterruptedException {
- }
- @Override
- public void close() throws Exception {
- }
- }
- }
复制代码 5.2 elasticsearch的sink编写
elasticsearch的sink与redis的要求一致,在sink中不关心业务逻辑
5.2.1 ElCommand
- @Data
- public class ElCommand<T> {
- protected EElCommand command;
- protected String index;
- protected T entity;
- protected String id;
- }
复制代码- public enum EElCommand {
- CREATE,UPDATE,DELETE
- }
复制代码 5.2.2 ElSearchSink
- public class ElSearchSink<T> implements Sink<ElCommand<T>> {
- @Override
- public SinkWriter<ElCommand<T>> createWriter(InitContext context) throws IOException {
- return null;
- }
- @Override
- public SinkWriter<ElCommand<T>> createWriter(WriterInitContext context){
- return new ElSearchSink.ElSearchSinkWriter();
- }
- class ElSearchSinkWriter implements SinkWriter<ElCommand<T>> {
- @Override
- public void write(ElCommand<T> pCmd, Context context) throws IOException, InterruptedException {
- ElasticSearchClientProvider elasticSearchClientProvider = ElasticSearchClientProvider.getInstance();
- ElasticsearchClient elClient = elasticSearchClientProvider.get();
- String index = pCmd.getIndex();
- String id = pCmd.getId();
- T entity = pCmd.getEntity();
- switch (pCmd.getCommand()){
- case CREATE,UPDATE -> {
- elClient.index(i->i.index(index).id(id).document(entity));
- }
- case DELETE -> {
- elClient.delete(d->d.index(index).id(id));
- }
- }
- }
- @Override
- public void flush(boolean endOfInput) throws IOException, InterruptedException {
- }
- @Override
- public void close() throws Exception {
- }
- }
- }
复制代码 六、主函数编写
- public class FlinkMain {
- public static void main(String[] args) throws Exception {
- MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
- .hostname("192.168.0.64")
- .port(3307)
- .databaseList("shop-center") // set captured database
- .tableList("shop-center.item") // set captured table
- .username("app")
- .password("study@2025")
- .serverTimeZone("Asia/Shanghai")
- .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
- .startupOptions(StartupOptions.latest())
- .includeSchemaChanges(true)
- .build();
- // FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder()
- // .setHost("192.168.0.64") // 替换为 Redis 主机
- // .setPort(6379) // Redis 端口
- // .setPassword("ilv0404@1314") // 如果有密码,设置密码
- // .build();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // DataStream<BinlogInfo> mysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source")
- // .map(str->{
- // BinlogInfo res =JSONObject.parseObject(str, BinlogInfo.class);
- // return res;
- // }
- // ).filter(bi->bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d"));
- //
- // mysqlStream.addSink(new RedisSink(jedisConfig,new RedisItemMapper()));
- DataStream<RedisSinkCommand<GenItemEntity>> newMysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source to redis")
- .map(str->JSONObject.parseObject(str, new TypeReference<BinlogInfo<GenItemEntity>>() {
- }), TypeInformation.of(new TypeHint<BinlogInfo<GenItemEntity>>() {}))
- .filter(bi->bi.getSource().getTable().equals("item") && (bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d")))
- .map(bi->{
- String op = bi.getOp();
- GenItemEntity itemEntity = bi.getAfter();
- String key = "item:"+itemEntity.getId();
- switch (op){
- case "c","u"->{
- RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
- redisSinkCommand.initSet(key,itemEntity);
- return redisSinkCommand;
- }
- case "d" ->{
- RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
- redisSinkCommand.initDel(key);
- return redisSinkCommand;
- }
- default -> {
- RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
- redisSinkCommand.initDel(key);
- return redisSinkCommand;
- }
- }
- },TypeInformation.of(new TypeHint<RedisSinkCommand<GenItemEntity>>() {}));
- newMysqlStream.sinkTo(new SpringDataRedisSink<GenItemEntity>());
- DataStream<ElCommand<GenItemEntity>> mySqlToElStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source to el")
- .map(str->JSONObject.parseObject(str, new TypeReference<BinlogInfo<GenItemEntity>>() {})
- , TypeInformation.of(new TypeHint<BinlogInfo<GenItemEntity>>() {}))
- .filter(bi->bi.getSource().getTable().equals("item") && (bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d")))
- .map(bi->{
- ElCommand elCommand = new ElCommand();
- GenItemEntity itemEntity = bi.getAfter();
- elCommand.setId(itemEntity.getId().toString());
- elCommand.setEntity(itemEntity);
- elCommand.setIndex("item_npc");
- String op = bi.getOp();
- switch (op){
- case "c"->elCommand.setCommand(EElCommand.CREATE);
- case "u"->elCommand.setCommand(EElCommand.UPDATE);
- case "d"->elCommand.setCommand(EElCommand.DELETE);
- }
- return elCommand;
- },TypeInformation.of(new TypeHint<ElCommand<GenItemEntity>>() {}));
- mySqlToElStream.sinkTo(new ElSearchSink());
- env.execute();
- }
- }
复制代码 七、代码展示
请道友移步码云
八、相关实践的思考
8.1 redis相关
我这里的代码,仅仅是学习用的。在真实项目中,redis缓存的更新,通常源于查询时,假如发现缓存中没有数据,则查mysql,并把缓存数据加入redis。假如监听到表数据的更改或删除,则直接删除相应缓存,等待查询时重新加入缓存。固然,如许做在同一数据并发访问时,会有重复设置缓存的可能性,我们把这种征象叫缓存穿透。可以在更新缓存前,用redisson加个锁,防止重复读取mysql并更新redis。
- public class CacheService {
- @Autowired
- private RedissonClient redissonClient;
- @Autowired
- private RedisTemplate<String, Object> redisTemplate;
- @Autowired
- private DataRepository dataRepository;
- public Object getData(String key) {
- // 第一次检查缓存
- Object value = redisTemplate.opsForValue().get(key);
- if (value != null) {
- return value;
- }
- RLock lock = redissonClient.getLock(key + ":LOCK");
- try {
- // 尝试加锁,设置锁超时时间防止死锁
- if (lock.tryLock(5, 30, TimeUnit.SECONDS)) {
- try {
- // 双重检查缓存
- value = redisTemplate.opsForValue().get(key);
- if (value != null) {
- return value;
- }
- // 查询数据库
- Object dbData = dataRepository.findById(key);
- // 更新缓存,设置合理过期时间
- redisTemplate.opsForValue().set(key, dbData, 1, TimeUnit.HOURS);
- return dbData;
- } finally {
- lock.unlock();
- }
- } else {
- // 未获取到锁,短暂等待后重试
- Thread.sleep(100);
- return redisTemplate.opsForValue().get(key);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("获取锁失败", e);
- }
- }
- }
复制代码 8.2 es相关
对于es,着实更新数据不建议接纳这种方式。由于es中需要反范式设计,不可能用1张表的数据做es查询数据的。
对于电商体系的商品查询,我们可以在商品上架的时间更新es。并且商品商家状态下,不允许修改商品。商品下架时,删除es的数据。想要修改商品数据,可以先下架,再修改,而后上架。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |