从 MySQL 到 ClickHouse 实时数据同步 —— Debezium + Kafka 表引擎 ...

打印 上一主题 下一主题

主题 891|帖子 891|积分 2673

目录
一、总体架构
二、安装配置 MySQL 主从复制
三、安装配置 ClickHouse 集群
四、安装 JDK
五、安装配置 Zookeeper 集群
六、安装配置 Kafaka 集群
七、安装配置 Debezium-Connector-MySQL 插件
1. 创建插件目录
2. 解压文件到插件目录
3. 配置 Kafka Connector
(1)配置属性文件
(2)分发到其它节点
(3)以 distributed 方式启动 Kafka connect
(4)确认 connector 插件和主动生成的 topic
4. 创建 source connector
(1)Debezium 三个须要的配置说明
(2)创建源 mysql 配置文件
(3)创建 mysql source connector
八、在 ClickHouse 中创建库表、物化视图和视图
1. 建库
2. 创建 Kafka 表
3. 创建主表
4. 创建斲丧者物化视图
5. 创建视图
6. 验证
参考:


        本文先容从 MySQL 作为源到 ClickHouse 作为目标的整个过程。MySQL 数据库更改通过 Debezium 捕获,并作为变乱发布在到 Kafka 上。ClickHouse 通过 Kafka 表引擎按部分次序应用这些更改,实时并保持最终划一性。相关软件版本如下:


  • MySQL:8.0.16
  • ClickHouse:24.1.8
  • JDK:11.0.22
  • zookeeper:3.9.1
  • Kafka:3.7.0
  • debezium-connector-mysql:2.4.2
        这种方案的优点之一是可以做到 ClickHouse 与 MySQL 的数据最终严酷划一。
一、总体架构

        总体结构如下图所示。

        ClickHouse 是由四个实例构成的两分片、每分片两副本集群,票选和协调器利用 ClickHouse 自带的 keeper 组件。分片、副本、keeper 节点、Zookeeper集群、Kafaka集群、Debezium-Connector-MySQL 插件的部署如下表所示。
IP

主机名

实例脚色

ClickHouse

Keeper

Zookeeper

Kafka

Debezium

Connector

MySQL

172.18.4.126

node1

分片1副本1

*




172.18.4.188

node2

分片1副本2

*

*

*

*

172.18.4.71

node3

分片2副本1

*

*

*

*

172.18.4.86

node4

分片2副本2


*

*

*

二、安装配置 MySQL 主从复制

        配置好主从复制后,在主库创建测试库表及数据:
  1. -- 建库
  2. create database test;
  3. -- 建表
  4. create table test.t1 (
  5.   id bigint(20) not null auto_increment,
  6.   remark varchar(32) default null comment '备注',
  7.   createtime timestamp not null default current_timestamp comment '创建时间',
  8.   primary key (id));
  9. -- 插入三条测试数据
  10. insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');
  11. commit;
复制代码
三、安装配置 ClickHouse 集群

四、安装 JDK

五、安装配置 Zookeeper 集群

六、安装配置 Kafaka 集群

七、安装配置 Debezium-Connector-MySQL 插件

        在 node2 上实验以下步调。
1. 创建插件目录

  1. mkdir $KAFKA_HOME/plugins
复制代码
2. 解压文件到插件目录

  1. cd ~
  2. # debezium-connector-mysql
  3. unzip debezium-debezium-connector-mysql-2.4.2.zip -d $KAFKA_HOME/plugins/
复制代码
3. 配置 Kafka Connector

(1)配置属性文件

  1. # 先备份
  2. cp $KAFKA_HOME/config/connect-distributed.properties $KAFKA_HOME/config/connect-distributed.properties.bak
  3. # 编辑 connect-distributed.properties 文件
  4. vim $KAFKA_HOME/config/connect-distributed.properties
复制代码
        内容如下:
  1. bootstrap.servers=node2:9092,node3:9092,node4:9092
  2. group.id=connect-cluster
  3. key.converter=org.apache.kafka.connect.json.JsonConverter
  4. value.converter=org.apache.kafka.connect.json.JsonConverter
  5. key.converter.schemas.enable=false
  6. value.converter.schemas.enable=false
  7. offset.storage.topic=connect-offsets
  8. offset.storage.replication.factor=3
  9. offset.storage.partitions=3
  10. config.storage.topic=connect-configs
  11. config.storage.replication.factor=3
  12. status.storage.topic=connect-status
  13. status.storage.replication.factor=3
  14. status.storage.partitions=3
  15. offset.flush.interval.ms=10000
  16. plugin.path=/root/kafka_2.13-3.7.0/plugins
复制代码
(2)分发到其它节点

  1. scp $KAFKA_HOME/config/connect-distributed.properties node3:$KAFKA_HOME/config/
  2. scp $KAFKA_HOME/config/connect-distributed.properties node4:$KAFKA_HOME/config/
  3. scp -r $KAFKA_HOME/plugins node3:$KAFKA_HOME/
  4. scp -r $KAFKA_HOME/plugins node4:$KAFKA_HOME/
复制代码
(3)以 distributed 方式启动 Kafka connect

  1. connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties 
  2. # 确认日志是否有 ERROR
  3. grep ERROR ~/kafka_2.13-3.7.0/logs/connectDistributed.out
复制代码
(4)确认 connector 插件和主动生成的 topic

        查看毗连器插件:
  1. curl -X GET http://node2:8083/connector-plugins | jq
复制代码
        从输出中可以看到,Kafka connect 已经识别到了 MySqlConnector source 插件:
  1. [root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connector-plugins | jq
  2.   % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
  3.                                  Dload  Upload   Total   Spent    Left  Speed
  4. 100   403  100   403    0     0   3820      0 --:--:-- --:--:-- --:--:--  3838
  5. [
  6.   {
  7.     "class": "io.debezium.connector.mysql.MySqlConnector",
  8.     "type": "source",
  9.     "version": "2.4.2.Final"
  10.   },
  11.   {
  12.     "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
  13.     "type": "source",
  14.     "version": "3.7.0"
  15.   },
  16.   {
  17.     "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
  18.     "type": "source",
  19.     "version": "3.7.0"
  20.   },
  21.   {
  22.     "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
  23.     "type": "source",
  24.     "version": "3.7.0"
  25.   }
  26. ]
  27. [root@vvml-yz-hbase-test~]#
复制代码
        查看 topic:
  1. kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
复制代码
        从输出中可以看到,Kafka connect 启动时主动创建了 connect-configs、connect-offsets、connect-status 三个 topic:
  1. [root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
  2. __consumer_offsets
  3. connect-configs
  4. connect-offsets
  5. connect-status
  6. [root@vvml-yz-hbase-test~]#
复制代码
4. 创建 source connector

(1)Debezium 三个须要的配置说明

        Debezium 是一个众所周知的用于读取息争析 MySQL Binlog 的工具。它将 KafkaConnect 作为一个毗连器进行集成,并对 Kafka 主题进行每一次更改。


  • 只记录后状态
        默认情况下,Debezium 会向 Kafka 发出每个操作的前状态和后状态的每条记录,这很难被 ClickHouse Kafka 表解析。别的,在实验删除操作的情况下(Clickhouse 同样无法解析),它会创建 tombstone 记录,即具有 Null 值的记录。下表展示了这个行为。
操作

操作前

操作后

附加记录

Create

Null

新纪录

-

Update

更新前的记录

更新后的记录

-

Delete

删除前的记录

Null

墓碑记录

        在 Debezium 配置中利用 ExtractNewRecod 转换器来处理此问题。由于有了这个选项,Debezium 只为创建/更新操作保留 after 状态,而忽略 before 状态。但缺点是,它删除了包罗先前状态的 Delete 记录和墓碑记录,换句话说就是不再捕获删除操作。紧接着说明怎样办理这个问题。
  1. "transforms": "unwrap",
  2. "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
复制代码


  • 重写删除变乱
        要捕获删除操作,必须添加如下所示的重写配置:
  1. "transforms.unwrap.delete.handling.mode":"rewrite"
复制代码
        Debezium 利用此配置添加字段 __deleted,对于 delete 操作为 true,对于其他操作为 false。因此,删除将包罗从前的状态以及 __deleted:true 字段。


  • 处理非主键更新
        在提供上述配置的情况下,更新记录(主键除外的每一列)会发出一个具有新状态的简单记录。通常在关系数据库体系中,更新后的记录会更换前一个记录,但在 ClickHouse 不可。出于性能考虑,ClickHouse 将行级更新变为多版本插入。在本示例中,MySQL 中的 test.t1 表以 id 列为主键,假如更新了 remark 列,在 ClikHouse 中,最终会得到重复的记录,这意味着 id 相同,但 remark 差别!
        荣幸的是有办法应付这种情况。默认情况下,Debezium 会创建一个删除记录和一个创建记录,用于更新主键。因此,假如源更新 id,它会发出一个带有前一个 id 的删除记录和一个带有新 id 的创建记录。带有 __deleted=ture 字段的前一个记录将更换 CH 中的 stall 记录。然后,可以在视图中过滤暗示删除的记录。可以利用以下选项将此行为扩展到其他列:
  1. "message.key.columns": "test.t1:id;test.t1:remark;test.t1:createtime"
复制代码
        注意:
        通过更改毗连器的键列,Debezium 将这些列用作主键,而不是源表的默认主键。因此,与数据库的一条记录相关的差别操作可能最终会出现在 Kafka 中的其他分区。由于记录在差别分区中失去次序,除非确保 ClickHouse 次序键和 Debezium 消息键相同,否则可能会导致 Clikchouse 中的数据不划一。
        经验法则如下:

  • 根据想要的表结构来设计分区键和排序键。
  • 提取分区和排序键的泉源,假设它们是在物化过程中计算的。
  • 合并所有这些列。
  • 将步调 3 的结果定义为 Debezium 毗连器配置中的 message.column.keys。
  • 检查 Clickhouse 排序键是否包罗所有这些列。假如没有则添加它们。
        现在,通过将上述所有选项和常用选项放在一起,将拥有一个功能齐全的 Debezium 配置,能够处理 ClickHouse 所需的任何更改。
(2)创建源 mysql 配置文件

  1. # 编辑文件
  2. vim $KAFKA_HOME/plugins/source-mysql.json
复制代码
        内容如下:
  1. {
  2.  "name": "mysql-source-connector",
  3.  "config": {
  4.      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  5.      "database.hostname": "172.18.16.156",
  6.      "database.port": "3307",
  7.      "database.user": "dba",
  8.      "database.password": "123456",
  9.      "database.server.id": "1563307",
  10.      "database.server.name": "dbserver1",
  11.      "database.include.list": "test",
  12.      "table.include.list": "test.t1",
  13.      "topic.prefix": "mysql-clickhouse-test",
  14.      "schema.history.internal.kafka.bootstrap.servers": "node2:9092,node3:9092,node4:9092",
  15.      "schema.history.internal.kafka.topic": "schemahistory.mysql-clickhouse-test",
  16.      "message.key.columns": "test.t1:id;test.t1:remark;test.t1:createtime",
  17.      "transforms":"unwrap",
  18.      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  19.      "transforms.unwrap.delete.handling.mode": "rewrite"
  20.      }
  21.  }
复制代码
(3)创建 mysql source connector

  1. # 创建 connector
  2. curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"; echo
  3. # 查看 connector 状态
  4. curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
  5. # 查看 topic
  6. kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
复制代码
        从输出中可以看到,mysql-source-connector 状态为 RUNNING,并主动创建了三个 topic:
  1. [root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"; echo
  2. HTTP/1.1 201 Created
  3. Date: Thu, 25 Apr 2024 03:47:26 GMT
  4. Location: http://node2:8083/connectors/mysql-source-connector
  5. Content-Type: application/json
  6. Content-Length: 818
  7. Server: Jetty(9.4.53.v20231009)
  8. {"name":"mysql-source-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"172.18.16.156","database.port":"3307","database.user":"dba","database.password":"123456","database.server.id":"1563307","database.server.name":"dbserver1","database.include.list":"test","table.include.list":"test.t1","topic.prefix":"mysql-clickhouse-test","schema.history.internal.kafka.bootstrap.servers":"node2:9092,node3:9092,node4:9092","schema.history.internal.kafka.topic":"schemahistory.mysql-clickhouse-test","message.key.columns":"test.t1:id;test.t1:remark;test.t1:createtime","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.delete.handling.mode":"rewrite","name":"mysql-source-connector"},"tasks":[],"type":"source"}
  9. [root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
  10.   % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
  11.                                  Dload  Upload   Total   Spent    Left  Speed
  12. 100   182  100   182    0     0  24045      0 --:--:-- --:--:-- --:--:-- 26000
  13. {
  14.   "name": "mysql-source-connector",
  15.   "connector": {
  16.     "state": "RUNNING",
  17.     "worker_id": "172.18.4.188:8083"
  18.   },
  19.   "tasks": [
  20.     {
  21.       "id": 0,
  22.       "state": "RUNNING",
  23.       "worker_id": "172.18.4.188:8083"
  24.     }
  25.   ],
  26.   "type": "source"
  27. }
  28. [root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
  29. __consumer_offsets
  30. connect-configs
  31. connect-offsets
  32. connect-status
  33. mysql-clickhouse-test
  34. mysql-clickhouse-test.test.t1
  35. schemahistory.mysql-clickhouse-test
  36. [root@vvml-yz-hbase-test~]#
复制代码
八、在 ClickHouse 中创建库表、物化视图和视图

        ClickHouse 可以利用 Kafka 表引擎将 Kafka 记录放入一个表中。需要定义三个对象:Kafka 表、主表和斲丧者物化视图。
1. 建库

  1. create database db2 on cluster cluster_2S_2R;
复制代码
2. 创建 Kafka 表

  1. CREATE TABLE db2.kafka_t1 on cluster cluster_2S_2R
  2. (
  3.     `id` Int64,
  4.     `remark` Nullable(String),
  5.     `createtime` String,
  6.     `__deleted` String
  7. )
  8. ENGINE = Kafka('node2:9092,node3:9092,node4:9092', 'mysql-clickhouse-test.test.t1', 'clickhouse', 'JSONEachRow');
复制代码
3. 创建主表

        主表具有源结构和 __deleted 字段。这里利用的是 ReplicatedReplacingMergeTree,因为需要用已删除或更新的记录更换 stall 记录。
  1. -- 创建本地表
  2. CREATE TABLE db2.stream_t1 on cluster cluster_2S_2R
  3. (
  4.     `id` Int64,
  5.     `remark` Nullable(String),
  6.     `createtime` timestamp,
  7.     `__deleted` String
  8. )
  9. ENGINE = ReplicatedReplacingMergeTree(
  10.     '/clickhouse/tables/{shard}/db2/t1',
  11.     '{replica}'
  12. )
  13. ORDER BY (id, createtime)
  14. SETTINGS index_granularity = 8192;
  15. -- 创建分布式表,以源表的主键 id 作为分片键,保证同一 id 的数据落在同一分片上
  16. create table db2.t1_replica_all on cluster cluster_2S_2R
  17. as db2.stream_t1
  18. engine = Distributed(cluster_2S_2R, db2, stream_t1, id);
复制代码
4. 创建斲丧者物化视图

        在创建物化视图前,先停止MySQL从库的复制。从库停止复制,不影响主库的正常利用,也就不会影响业务。此时从库的数据处于静止状态,不会产生变化,这使得获取存量数据变得十拿九稳。然后创建物化视图时会主动将数据写入 db2.t1_replica_all 对应的本地表中。之后在 ClickHouse 集群中的任一实例上,都能从物化视图中查询到划一的 MySQL 存量数据。
  1. -- MySQL 从库停止复制
  2. stop slave;
复制代码
        Kafka 表的每一条记录只读取一次,因为它的斲丧者组会改变偏移量,不能读取两次。因此,需要定义一个主表,并通过物化视图将每个 Kafka 表记录具化到它:
  1. -- 注意时间戳的处理
  2. CREATE MATERIALIZED VIEW db2.consumer_t1 on cluster cluster_2S_2R
  3. TO db2.t1_replica_all
  4. (
  5.     `id` Int64,
  6.     `remark` Nullable(String),
  7.     `createtime` timestamp,
  8.     `__deleted` String
  9. ) AS
  10. SELECT id, remark, addHours(toDateTime(substring(createtime,1,length(createtime)-1)),8) createtime, __deleted FROM db2.kafka_t1;
复制代码
5. 创建视图

        末了需要过滤每个被删除的记录,并拥有最新的记录,以防差别的记录具有相同的排序键。可以定义一个简单的视图来隐式完成这项工作:
  1. CREATE VIEW db2.t1 on cluster cluster_2S_2R
  2. (
  3.     `id` Int64,
  4.     `remark` Nullable(String),
  5.     `createtime` String,
  6.     `__deleted` String
  7. ) AS
  8. SELECT *
  9. FROM db2.consumer_t1
  10. FINAL
  11. WHERE __deleted = 'false';
复制代码
6. 验证

        从 clickhouse 视图查询存量数据:
  1. vvml-yz-hbase-test.172.18.4.126 :) select * from db2.t1;
  2. SELECT *
  3. FROM db2.t1
  4. Query id: 2a51fd5e-6b4f-4b78-b522-62b7be32535b
  5. ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
  6. │  2 │ 第二行:row2 │ 2024-04-25 11:51:07 │ false     │
  7. └────┴──────────────┴─────────────────────┴───────────┘
  8. ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
  9. │  1 │ 第一行:row1 │ 2024-04-25 11:51:07 │ false     │
  10. │  3 │ 第三行:row3 │ 2024-04-25 11:51:07 │ false     │
  11. └────┴──────────────┴─────────────────────┴───────────┘
  12. 3 rows in set. Elapsed: 0.007 sec. 
  13. vvml-yz-hbase-test.172.18.4.126 :) 
复制代码
        可以看到,存量数据已经与 MySQL 同步。
  1. -- MySQL 主库修改数据
  2. insert into test.t1 (remark) values ('第四行:row4');
  3. update test.t1 set remark = '第五行:row5' where id = 4;
  4. delete from test.t1 where id =1;
  5. insert into test.t1 (remark) values ('第六行:row6');
  6.  
  7. -- MySQL 从库启动复制
  8. start slave;
复制代码
        此时 MySQL 的数据如下:
  1. mysql> select * from test.t1;
  2. +----+------------------+---------------------+
  3. | id | remark           | createtime          |
  4. +----+------------------+---------------------+
  5. |  2 | 第二行:row2     | 2024-04-25 11:51:07 |
  6. |  3 | 第三行:row3     | 2024-04-25 11:51:07 |
  7. |  4 | 第五行:row5     | 2024-04-25 11:56:29 |
  8. |  5 | 第六行:row6     | 2024-04-25 11:56:29 |
  9. +----+------------------+---------------------+
  10. 4 rows in set (0.00 sec)
复制代码
        从 clickhouse 视图查询增量数据:
  1. vvml-yz-hbase-test.172.18.4.126 :) select * from db2.t1;
  2. SELECT *
  3. FROM db2.t1
  4. Query id: b34bb37b-091b-490e-b55b-a0e9eedf5573
  5. ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
  6. │  2 │ 第二行:row2 │ 2024-04-25 11:51:07 │ false     │
  7. └────┴──────────────┴─────────────────────┴───────────┘
  8. ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
  9. │  4 │ 第五行:row5 │ 2024-04-25 11:56:29 │ false     │
  10. └────┴──────────────┴─────────────────────┴───────────┘
  11. ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
  12. │  3 │ 第三行:row3 │ 2024-04-25 11:51:07 │ false     │
  13. └────┴──────────────┴─────────────────────┴───────────┘
  14. ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
  15. │  5 │ 第六行:row6 │ 2024-04-25 11:56:29 │ false     │
  16. └────┴──────────────┴─────────────────────┴───────────┘
  17. 4 rows in set. Elapsed: 0.008 sec. 
  18. vvml-yz-hbase-test.172.18.4.126 :) 
复制代码
        可以看到,增量数据已经与 MySQL 同步,现在从 ClickHouse 视图查询的数据与 MySQL 划一。
        查看 Kafka 斲丧:
  1. kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group clickhouse
复制代码
        输出如下:
  1. [root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group clickhouse
  2. GROUP           TOPIC                         PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                                  HOST            CLIENT-ID
  3. clickhouse      mysql-clickhouse-test.test.t1 0          8               8               0               ClickHouse-vvml-yz-hbase-test.172.18.4.126-db2-kafka_t1-26e6aa8e-1f08-4491-8af7-f1822f1a7e94 /172.18.4.126   ClickHouse-vvml-yz-hbase-test.172.18.4.126-db2-kafka_t1
  4. [root@vvml-yz-hbase-test~]#
复制代码
        可以看到,末了被斲丧的消息偏移量是8,MySQL 的存量、增量数据都已经通过 Kafka 消息同步到了 ClickHouse。
参考:



  • Apply CDC from MySQL to ClickHouse
  • New Record State Extraction
  • 基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步
  • Greenplum 实时数据仓库实践(5)——实时数据同步

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天津储鑫盛钢材现货供应商

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表