ToB企服应用市场:ToB评测及商务社交产业平台

标题: 小项目不想引入 MQ?试试 Debezium! [打印本页]

作者: 瑞星    时间: 2024-5-13 03:19
标题: 小项目不想引入 MQ?试试 Debezium!
作者:是奉壹呀
链接:https://juejin.cn/post/7264791359839223823
奥卡姆剃刀原理,“如无必要,勿增实体"。
在一些小型项目当中,没有引入消息中间件,也不想引入,但有一些业务逻辑想要解耦异步,那怎么办呢?
我们的web项目,单独内网部署,由于大数据配景,公司消息中间件统一利用的kafka,在一些小项目上kafka就显得很笨重。 引入rocketmq或rabittmq也没必要。 变乱或多线程也不适合。
详细一点的,之前对接的一个体系,一张记录表有10+以上的范例状态,新的需求是,针对每种状态做出对应的差别的操作。
之前写入这张记录表的时间,方式也是五花八门,有的是单条记录写入,有的是批量写入,有的调用了统一的service,有的呢直接调用了DAO层mapper直接写入。
所以想找到一个统一入口进行切入处理,就不行了。
这个时间就算引入消息队列,也必要在差别的业务方法里进行写入消息的操作。业务方也不太乐意配合改。
可以利用触发器,但它是属于上个时代的产物,槽点太多。(这里并不是完全不主张利用触发器,技术永远是为业务服务的,只要评估觉得可行,就可以利用)那么这个时间,CDC技术就可以粉墨登场了。
CDC(change data capture)数据更改捕获。
常见的数据更改捕获都是通过数据库比如mysql的binlog来达到目的。
我们可以监控mysql binlog日记,当写入一条数据的时间,吸收到数据变更日记,做出相应的操作。这样的好处是,只需导入依靠,不额外引入组件,同时无需改动之前的代码。 两边完全解耦,互不干扰。
常见的CDC框架,比如,canal (非Camel)
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理

它的原理:
再比如,debezium(音同 dbzm 滴BZ姆)很多人可能不太了解. 包括databus,maxwell,flink cdc(大数据范畴)等等,它们同属CDC捕获数据更改(change data capture)类的技术。

为什么是debezium

这么多技术框架,为什么选debezium?
看起来很多。但一一排除下来就debezium和canal。
sqoop,kettle,datax之类的工具,属于前大数据时代的产物,地位类似于web范畴的structs2。而且,它们基于查询而非binlog日记,实在不属于CDC。首先排除。
flink cdc是大数据范畴的框架,一般web项目的数据量属于大材小用了。
同时databus,maxwell相对比力冷门,用得比力少。
最后不用canal的缘故原由有以下几点。

Debezium是一个捕获数据更改(CDC)平台,并且利用Kafka和Kafka Connect实现了本身的持久性、可靠性和容错性。
每一个部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一个上游数据库服务器,捕获所有的数据库更改,然后记录到一个或者多个Kafka topic(通常一个数据库表对应一个kafka topic)。
Kafka确保所有这些数据更改变乱都能够多副本并且总体上有序(Kafka只能保证一个topic的单个分区内有序),这样,更多的客户端可以独立消耗同样的数据更改变乱而对上游数据库体系造成的影响降到很小(如果N个应用都直接去监控数据库更改,对数据库的压力为N,而用debezium报告数据库更改变乱到kafka,所有的应用都去消耗kafka中的消息,可以把对数据库的压力降到1)。
另外,客户端可以随时停止消耗,然后重启,从前次停止消耗的地方接着消耗。每个客户端可以自行决定他们是否必要exactly-once或者at-least-once消息交付语义保证,并且所有的数据库或者表的更改变乱是按照上游数据库发生的顺序被交付的。

对于不必要或者不想要这种容错级别、性能、可扩展性、可靠性的应用,他们可以利用内嵌的Debezium connector引擎来直接在应用内部运行connector。
这种应用仍必要消耗数据库更改变乱,但更盼望connector直接传递给它,而不是持久化到Kafka里。
简介
Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且设置Debezium去监控你的数据库,然后你的应用就可以消耗对数据库的每一个行级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改变乱提供了一个统一的模子,所以你的应用不用担心每一种数据库管理体系的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日记来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的变乱,保证了所有的变乱都能被正确地、完全地处理掉。
监控数据库,并且在数据变动的时间获得通知一直是很复杂的变乱。关系型数据库的触发器可以做到,但是只对特定的数据库有用,而且通常只能更新数据库内的状态(无法和外部的进程通信)。一些数据库提供了监控数据变动的API或者框架,但是没有一个标准,每种数据库的实现方式都是差别的,并且必要大量特定的知识和明白特定的代码才能运用。确保以相同的顺序查看和处理所有更改,同时最小化影响数据库仍然非常具有挑战性。
Debezium提供了模块为你做这些复杂的工作。一些模块是通用的,并且能够适用多种数据库管理体系,但在功能和性能方面仍有一些限制。另一些模块是为特定的数据库管理体系定制的,所以他们通常可以更多地利用数据库体系本身的特性来提供更多功能。
github官网上摆列的一些
典型应用场景




springboot 整合 Debezium

推荐一个开源免费的 Spring Boot 实战项目:
https://github.com/javastacks/spring-boot-best-practice
依靠
  1. <debezium.version>1.7.0.Final</debezium.version>
  2. <mysql.connector.version>8.0.26</mysql.connector.version>
  3. <dependency>
  4.     <groupId>mysql</groupId>
  5.     <artifactId>mysql-connector-java</artifactId>
  6.     <version>${mysql.connector.version}</version>
  7.     <scope>runtime</scope>
  8. </dependency>
  9. <dependency>
  10.     <groupId>io.debezium</groupId>
  11.     <artifactId>debezium-api</artifactId>
  12.     <version>${debezium.version}</version>
  13. </dependency>
  14. <dependency>
  15.     <groupId>io.debezium</groupId>
  16.     <artifactId>debezium-embedded</artifactId>
  17.     <version>${debezium.version}</version>
  18. </dependency>
  19. <dependency>
  20.     <groupId>io.debezium</groupId>
  21.     <artifactId>debezium-connector-mysql</artifactId>
  22.     <version>${debezium.version}</version>
  23.     <exclusions>
  24.         <exclusion>
  25.             <groupId>mysql</groupId>
  26.             <artifactId>mysql-connector-java</artifactId>
  27.         </exclusion>
  28.     </exclusions>
  29. </dependency>
复制代码
注意debezium版本为1.7.0.Final,对应mysql驱动为8.0.26,低于这个版本会报兼容错误。
设置

相应的设置
  1. debezium.datasource.hostname = localhost
  2. debezium.datasource.port = 3306
  3. debezium.datasource.user = root
  4. debezium.datasource.password = 123456
  5. debezium.datasource.tableWhitelist = test.test
  6. debezium.datasource.storageFile = E:/debezium/test/offsets/offset.dat
  7. debezium.datasource.historyFile = E:/debezium/test/history/custom-file-db-history.dat
  8. debezium.datasource.flushInterval = 10000
  9. debezium.datasource.serverId = 1
  10. debezium.datasource.serverName = name-1
复制代码
然后进行设置初始化。
主要的设置项:
  1. connector.class
复制代码
  1. offset.storage
复制代码
  1. offset.storage.file.filename
复制代码
  1. offset.flush.interval.ms
复制代码
  1. table.whitelist
复制代码
  1. database.whitelist
复制代码
  1. import io.debezium.connector.mysql.MySqlConnector;
  2. import io.debezium.relational.history.FileDatabaseHistory;
  3. import lombok.Data;
  4. import org.apache.kafka.connect.storage.FileOffsetBackingStore;
  5. import org.springframework.boot.context.properties.ConfigurationProperties;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.io.File;
  9. import java.io.IOException;
  10. /**
  11. * @className: MysqlConfig
  12. * @author: nyp
  13. * @description: TODO
  14. * @date: 2023/8/7 13:53
  15. * @version: 1.0
  16. */
  17. @Configuration
  18. @ConfigurationProperties(prefix ="debezium.datasource")
  19. @Data
  20. public class MysqlBinlogConfig {
  21.     private String hostname;
  22.     private String port;
  23.     private String user;
  24.     private String password;
  25.     private String tableWhitelist;
  26.     private String storageFile;
  27.     private String historyFile;
  28.     private Long flushInterval;
  29.     private String serverId;
  30.     private String serverName;
  31.     @Bean
  32.     public io.debezium.config.Configuration MysqlBinlogConfig () throws Exception {
  33.         checkFile();
  34.         io.debezium.config.Configuration configuration = io.debezium.config.Configuration.create()
  35.                 .with("name", "mysql_connector")
  36.                 .with("connector.class", MySqlConnector.class)
  37.                 // .with("offset.storage", KafkaOffsetBackingStore.class)
  38.                 .with("offset.storage", FileOffsetBackingStore.class)
  39.                 .with("offset.storage.file.filename", storageFile)
  40.                 .with("offset.flush.interval.ms", flushInterval)
  41.                 .with("database.history", FileDatabaseHistory.class.getName())
  42.                 .with("database.history.file.filename", historyFile)
  43.                 .with("snapshot.mode", "Schema_only")
  44.                 .with("database.server.id", serverId)
  45.                 .with("database.server.name", serverName)
  46.                 .with("database.hostname", hostname)
  47. //                .with("database.dbname", dbname)
  48.                 .with("database.port", port)
  49.                 .with("database.user", user)
  50.                 .with("database.password", password)
  51. //                .with("database.whitelist", "test")
  52.                 .with("table.whitelist", tableWhitelist)
  53.                 .build();
  54.         return configuration;
  55.     }
  56.     private void checkFile() throws IOException {
  57.         String dir = storageFile.substring(0, storageFile.lastIndexOf("/"));
  58.         File dirFile = new File(dir);
  59.         if(!dirFile.exists()){
  60.             dirFile.mkdirs();
  61.         }
  62.         File file = new File(storageFile);
  63.         if(!file.exists()){
  64.             file.createNewFile();
  65.         }
  66.     }
  67. }
复制代码
snapshot.mode 快照模式,指定连接器启动时运行快照的条件。可能的设置有:
  1. database.server.id
复制代码
  1. io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'binlog.000013' at 46647257, the last event read from './binlog.000013' at 125, the last byte read from './binlog.000013' at 46647257. Error code: 1236; SQLSTATE: HY000.
  2.         at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1167)
  3.         at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1212)
  4.         at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980)
  5.         at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
  6.         at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
  7.         at java.lang.Thread.run(Thread.java:750)
  8. Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'binlog.000013' at 46647257, the last event read from './binlog.000013' at 125, the last byte read from './binlog.000013' at 46647257.
  9.         at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:944)
  10.         ... 3 common frames omitted
复制代码
监听

设置监听服务
  1. import com.alibaba.fastjson.JSON;
  2. import io.debezium.config.Configuration;
  3. import io.debezium.data.Envelope;
  4. import io.debezium.engine.ChangeEvent;
  5. import io.debezium.engine.DebeziumEngine;
  6. import io.debezium.engine.format.Json;
  7. import lombok.Builder;
  8. import lombok.Data;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.apache.commons.lang3.StringUtils;
  11. import org.springframework.beans.factory.annotation.Qualifier;
  12. import org.springframework.stereotype.Component;
  13. import javax.annotation.PostConstruct;
  14. import javax.annotation.PreDestroy;
  15. import javax.annotation.Resource;
  16. import java.io.IOException;
  17. import java.util.ArrayList;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.Objects;
  21. import java.util.concurrent.Executor;
  22. /**
  23. * @projectName: test
  24. * @package: com.test.config
  25. * @className: MysqlBinlogListener
  26. * @author: nyp
  27. * @description: TODO
  28. * @date: 2023/8/7 13:56
  29. * @version: 1.0
  30. */
  31. @Component
  32. @Slf4j
  33. public class MysqlBinlogListener {
  34.     @Resource
  35.     private Executor taskExecutor;
  36.     private final List<DebeziumEngine<ChangeEvent<String, String>>> engineList = new ArrayList<>();
  37.     private MysqlBinlogListener (@Qualifier("mysqlConnector") Configuration configuration) {
  38.             this.engineList.add(DebeziumEngine.create(Json.class)
  39.                     .using(configuration.asProperties())
  40.                     .notifying(record -> receiveChangeEvent(record.value()))
  41.                     .build());
  42.     }
  43.     private void receiveChangeEvent(String value) {
  44.         if (Objects.nonNull(value)) {
  45.             Map<String, Object> payload = getPayload(value);
  46.             String op = JSON.parseObject(JSON.toJSONString(payload.get("op")), String.class);
  47.             if (!(StringUtils.isBlank(op) || Envelope.Operation.READ.equals(op))) {
  48.                 ChangeData changeData = getChangeData(payload);
  49.                // 这里抛出异常会导致后面的日志监听失败
  50.                 try {
  51.                     mysqlBinlogService.service(changeData);
  52.                 }catch (Exception e){
  53.                     log.error("binlog处理异常,原数据: " + changeData, e);
  54.                 }
  55.             }
  56.         }
  57.     }
  58.     @PostConstruct
  59.     private void start() {
  60.         for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
  61.             taskExecutor.execute(engine);
  62.         }
  63.     }
  64.     @PreDestroy
  65.     private void stop() {
  66.         for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
  67.             if (engine != null) {
  68.                 try {
  69.                     engine.close();
  70.                 } catch (IOException e) {
  71.                     log.error("", e);
  72.                 }
  73.             }
  74.         }
  75.     }
  76.     public static Map<String, Object> getPayload(String value) {
  77.         Map<String, Object> map = JSON.parseObject(value, Map.class);
  78.         Map<String, Object> payload = JSON.parseObject(JSON.toJSONString(map.get("payload")), Map.class);
  79.         return payload;
  80.     }
  81.     public static ChangeData getChangeData(Map<String, Object> payload) {
  82.         Map<String, Object> source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
  83.         return ChangeData.builder()
  84.                 .op(payload.get("op").toString())
  85.                 .table(source.get("table").toString())
  86.                 .after(JSON.parseObject(JSON.toJSONString(payload.get("after")), Map.class))
  87.                 .source(JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class))
  88.                 .before(JSON.parseObject(JSON.toJSONString(payload.get("before")), Map.class))
  89.                 .build();
  90.     }
  91.     @Data
  92.     @Builder
  93.     public static class ChangeData {
  94.         /**
  95.          * 更改前数据
  96.          */
  97.         private Map<String, Object> after;
  98.         private Map<String, Object> source;
  99.         /**
  100.          * 更改后数据
  101.          */
  102.         private Map<String, Object> before;
  103.         /**
  104.          * 更改的表名
  105.          */
  106.         private String table;
  107.         /**
  108.          * 操作类型, 枚举 Envelope.Operation
  109.          */
  110.         private String op;
  111.     }
  112. }
复制代码
将监听到的binlog日记封装为ChangeData对象,包括表名,更改前后的数据,
以及操作范例
  1. READ("r"),
  2. CREATE("c"),
  3. UPDATE("u"),
  4. DELETE("d"),
  5. TRUNCATE("t");
复制代码
测试

update操作输出
  1. MysqlListener.ChangeData(after = {
  2.         name = Suzuki Mio2,
  3.         id = 1
  4. }, source = {
  5.         file = binlog .000013,
  6.         connector = mysql,
  7.         pos = 42587833,
  8.         name = test - 1,
  9.         row = 0,
  10.         server_id = 1,
  11.         version = 1.7 .0.Final,
  12.         ts_ms = 1691458956000,
  13.         snapshot = false,
  14.         db = test
  15.         table = test
  16. }, before = {
  17.         name = Suzuki Mio,
  18.         id = 1
  19. }, table = test, op = u)
  20. data = {
  21.         name = Suzuki Mio2,
  22.         id = 1
  23. }
复制代码
新增操作输出
  1. MysqlListener.ChangeData(after = {
  2.         name = 王五,
  3.         id = 0
  4. }, source = {
  5.         file = binlog .000013,
  6.         connector = mysql,
  7.         pos = 42588175,
  8.         name = test - 1,
  9.         row = 0,
  10.         server_id = 1,
  11.         version = 1.7 .0.Final,
  12.         ts_ms = 1691459066000,
  13.         snapshot = false,
  14.         db = test,
  15.         table = test
  16. }, before = null, table = test, op = c)
复制代码
删除操作输出
  1. MysqlListener.ChangeData(after = null, source = {
  2.         file = binlog .000013,
  3.         connector = mysql,
  4.         pos = 42588959,
  5.         name = test - 1,
  6.         row = 0,
  7.         server_id = 1,
  8.         version = 1.7 .0.Final,
  9.         ts_ms = 1691459104000,
  10.         snapshot = false,
  11.         db = test
  12.         table = test
  13. }, before = {
  14.         name = 王五,
  15.         id = 0
  16. }, table = test, op = d)
复制代码
我们之前设置的保存读取进度的文件storageFile,类似于kafka的偏移量,记录的内容如下:

停止服务,对数据库进行操作,再次重启,会根据进度重新读取。
小结

本文先容了debezium,更多的时间,我们一谈到CDC,第一想到的是大量数据同步的工具。 但实在也可以利用其数据变更捕获的特性,来达到一部份消息队列的作用。
但其毕竟不能完全替换消息队列。大家理性对待与选择。
本文的重点在先容一种思路,详细的某项技术反而不那么紧张。
更多文章推荐:
1.Spring Boot 3.x 教程,太全了!
2.2,000+ 道 Java口试题及答案整理(2024最新版)
3.免费获取 IDEA 激活码的 7 种方式(2024最新版)
觉得不错,别忘了顺手点赞+转发哦!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4