ToB企服应用市场:ToB评测及商务社交产业平台

标题: Debezium的基本使用(以MySQL为例) [打印本页]

作者: 九天猎人    时间: 2022-9-16 17:18
标题: Debezium的基本使用(以MySQL为例)
一、Debezium介绍

摘自官网:
Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.
简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。
二、基本使用

下面以MySQL为例介绍Debezium的基本使用。
1. MySQL的准备工作

  1. CREATE USER 'dbz'@'%' IDENTIFIED BY 'dbzpwd';
  2. GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz' IDENTIFIED BY 'dbzpwd';
复制代码
  1. SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';
  2. -- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled...
  3. -- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;
复制代码
如果是OFF则需要修改MySQL配置文件,类似下面这样:
  1. server-id         = 223344                #必须有
  2. log_bin           = mysql-bin        #log_bin的值是binlog文件序列的基本名称
  3. binlog_format     = ROW                                #必须是ROW
  4. binlog_row_image  = FULL                        #必须是FULL
  5. expire_logs_days  = 10                                #依据实际情况而定
复制代码
  1. create database inventory;
  2. create table inventory.a (id bigint primary key auto_increment, name varchar(32));
  3. insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');
复制代码
2. 编写程序

2.1. 工程依赖(Maven)

pom.xml
  1. <dependency>
  2.     <groupId>io.debezium</groupId>
  3.     <artifactId>debezium-api</artifactId>
  4.     <version>${version.debezium}</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>io.debezium</groupId>
  8.     <artifactId>debezium-embedded</artifactId>
  9.     <version>${version.debezium}</version>
  10. </dependency>
  11. <dependency>
  12.     <groupId>io.debezium</groupId>
  13.     <artifactId>debezium-connector-mysql</artifactId>
  14.     <version>${version.debezium}</version>
  15. </dependency>
复制代码
目前Debezium最新稳定版本为:1.9.5.Final
2.2. 准备数据库&表
  1. create database inventory;
  2. create table inventory.a (id bigint primary key, name varchar(32));
  3. insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');
复制代码
2.3. 代码编写
  1. package com.greatdb.dbzdemo;
  2. import java.io.IOException;
  3. import java.util.Properties;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.TimeUnit;
  7. import io.debezium.engine.ChangeEvent;
  8. import io.debezium.engine.DebeziumEngine;
  9. import io.debezium.engine.format.Json;
  10. /**
  11. * @author wang.jianwen
  12. * @version 1.0
  13. * @date 2022/07/29
  14. */
  15. public class DebeziumTest {
  16.     private static DebeziumEngine<ChangeEvent<String, String>> engine;
  17.     public static void main(String[] args) throws Exception {
  18.         final Properties props = new Properties();
  19.         props.setProperty("name", "dbz-engine");
  20.         props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
  21.         //offset config begin - 使用文件来存储已处理的binlog偏移量
  22.         props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
  23.         props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");
  24.         props.setProperty("offset.flush.interval.ms", "0");
  25.         //offset config end
  26.         props.setProperty("database.server.name", "mysql-connector");
  27.         props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
  28.         props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");
  29.         props.setProperty("database.server.id", "122112");        //需要与MySQL的server-id不同
  30.         props.setProperty("database.hostname", "tmg");
  31.         props.setProperty("database.port", "3306");
  32.         props.setProperty("database.user", "mysqluser");
  33.         props.setProperty("database.password", "mysqlpw");
  34.         props.setProperty("database.include.list", "inventory");//要捕获的数据库名
  35.         props.setProperty("table.include.list", "inventory.a");//要捕获的数据表
  36.         props.setProperty("snapshot.mode", "initial");//全量+增量
  37.         // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式
  38.         engine = DebeziumEngine.create(Json.class)
  39.                 .using(props)
  40.                 .notifying(record -> {
  41.                     System.out.println(record);//输出到控制台
  42.                 })
  43.                 .using((success, message, error) -> {
  44.                     if (error != null) {
  45.                         // 报错回调
  46.                         System.out.println("------------error, message:" + message + "exception:" + error);
  47.                     }
  48.                     closeEngine(engine);
  49.                 })
  50.                 .build();
  51.         ExecutorService executor = Executors.newSingleThreadExecutor();
  52.         executor.execute(engine);
  53.         addShutdownHook(engine);
  54.         awaitTermination(executor);
  55.         System.out.println("------------main finished.");
  56.     }
  57.     private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {
  58.         try {
  59.             engine.close();
  60.         } catch (IOException ignored) {
  61.         }
  62.     }
  63.     private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {
  64.         Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));
  65.     }
  66.     private static void awaitTermination(ExecutorService executor) {
  67.         if (executor != null) {
  68.             try {
  69.                 executor.shutdown();
  70.                 while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
  71.                 }
  72.             } catch (InterruptedException e) {
  73.                 Thread.currentThread().interrupt();
  74.             }
  75.         }
  76.     }
  77. }
复制代码
3. 测试

程序跑起来后,可以看到控制台输出:
  1. ...(省略)
  2. EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
  3. EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
  4. EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
  5. ...(省略)
复制代码
可以看到全量的数据已经输出,关键的数据如下:
  1. ..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"...
  2. ..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"...
  3. ..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...
复制代码
  1. insert into inventory.a values (4, 'n4');
复制代码
控制台输出:
  1. ..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...
复制代码
  1. update inventory.a set name = 'n4-upd' where id = 4;
复制代码
控制台输出:
  1. ..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...
复制代码
  1. delete from inventory.a where id = 1;
复制代码
控制台输出:
  1. ..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...
复制代码
三、总结

本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。
参考:https://debezium.io/documentation/reference/1.8/index.html

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4