十念 发表于 2025-3-24 06:03:47

利用Debezium、Kafka实现Elasticsearch数据同步

数据同步方案

1、同步双写:写入数据库的时候也写入Es


[*]优点:实时性高
[*]缺点:耦合度高,代码中包含大量写入代码,应用服务必要等候写入结果,否则不能保证数据同等性
2、异步双写:写入数据库的同时写入MQ,ES订阅MQ(通过一个单独的服务去订阅MQ然后写入Es)


[*]优点:实现了与Es的异步,应用服务性能开销小
[*]缺点:耦合度高,只是把写入Es的步骤换成了MQ,能保证一定的数据同等性
3、基于SQL抽取:通过定时器扫描时间戳timestamp监听数据库的变革,将操作写入Es


[*]优点:与应用服务耦合低,能保证数据特定时间同等性
[*]缺点:实时性差,轮询数据库对数据库压力大
4、基于日志同步:利用cannal、debezium监听数据库binlog或wal(postgres事务日志)写入MQ,Es从MQ消费数据


[*]优点:耦合低,实时性好,能保证数据最终同等性
[*]缺点:中心件多
5、基于MQ同步:所有服务向MQ发送消息,数据库、Redis、Es消费消息,事件驱动架构


[*]优点:耦合低、实时性高、易于扩展
[*]缺点:架构复杂、最终同等性难以保障(如Es消费了数据但数据库没有消费数据)
基于日志实现Elasticsearch数据同步

https://i-blog.csdnimg.cn/direct/f6af2557d3c448aea54657b791b4466e.png#pic_center
Java应用内存占用太大,以下流程必要6G以上设置(4G勉强能用),假如你利用meilisearchRust写的搜刮引擎,4G就够了
Debezium是什么

Debezium是用于捕获变动数据的开源工具,开可以相应数据库所有插入、更新、删除的操作,是对 Kafka - Connector 功能在数据库 CDC 场景下的一种专业化扩展,能够监控各种数据库(如 MySQL、PostgreSQL、Oracle 等)的事务日志(如 MySQL 的 Binlog、PostgreSQL 的 WAL)。Debezium 将数据库中的数据更改事件(比方插入、更新、删除操作)转换为事件流消息,这些消息可以被发送到消息队列(通常是 Kafka)中


[*]Mysql日志:binlog(Binary Log)二进制日志文件
[*]Postgres日志:WAL(Write Ahead Log)预写式日志
其他类似工具:


[*]Canal:基于Mysql数据库增量日志解析
[*]RisingWave:流处理惩罚数据库
[*]Flink:流处理惩罚框架
CDC(Change Data Capture)

CDC(变动数据捕获)
CDC方式:


[*]时间戳(Timestamp - based CDC):在数据库表中添加时间戳timestamp来记录每行数据的最后更新时间,进行数据抽取时通过比较与上一次抽取的时间来确定哪些数据发生了变革

[*]优点:简单、对数据库性能影响小
[*]缺点:数据同等性风险高、无法获取历史变动

[*]快照(Snapshot - based CDC):在特定时间点对整个数据库表或部分数据进行一次完整的复制,后续通过时间点的快照确定数据的变革

[*]优点:能够捕获特定时间的完整数据状态
[*]缺点:资源消耗大、实时性差

[*]触发器(Trigger - based CDC):在数据库表上创建触发器,当发生插入、更新、删除操作时,触发器被激活,将变动数据记录到专门的CDC表或消息队列中

[*]优点:实时性好、细粒度控制
[*]缺点:对数据库性能影响大、维护本钱高

[*]日志(Log - based CDC): 通过读取数据库的事务日志(如 MySQL 的 Binlog、PostgreSQL 的 WAL)来捕获数据变动

[*]优点:实时性好、准确性高、对数据库性能影响小
[*]缺点:依赖于日志、必要合理设置日志计谋

Debezium能做什么



[*]1、数据同步与复制:如必要将数据库中的数据同步到Elasticsearch,通过捕获数据源的变动事件,将事件发送到Kadka,然后再目标数据库端消费这些事件并应用变动,从而保持多个数据库的数据同等性
[*]2、数据多副本维护:实现分布式容灾进步数据可用性,通过Debezium 可以将主数据中央的数据库变动及时同步到备份数据中央的数据库副本
[*]3、实时数据管道:作为数据管道的数据源,将数据进行洗濯(如,验证数据格式)、转换(如,将数据转换为得当分析的格式)、聚合(如统计订单总额)等操作
[*]4、ETL(Extract、Transform、Load):传统数据提取、转换、加载通常是定时进行的,Debezium 提供了实时提取数据变动的功能,它可以让数据堆栈更快地获取最新的业务数据,从而提供更及时的数据分析和决策支持
[*]5、触发业务流程:支持事件驱动的架构,数据库的变动事件可以作为触发其他业务流程的信号
[*]6、体系解耦:传统数据同步必要代码操作,Debezium通过MQ实现解耦
Debezium架构



[*]source connector:负责把数据库中的记录发送到Kafka
[*]sink connector:负责把Kafka Topic中的数据发送到其他体系如Elasticsearch
https://i-blog.csdnimg.cn/direct/8b921bd348654f8c8af33f5a0b91c24f.png


[*]事件流处理惩罚(Event Streaming):将从数据库捕获的变动事件发送到 Apache Kafka 消息队列

[*]事件格式与主题(Topics):Debezium 发送到 Kafka 的事件具有同一的格式,包含了关于数据库变动的具体信息,如数据库名称、表名称、操作范例(插入、更新、删除)、变动前后的数据等

[*]消费者(Consumers):消费者可以是各种应用程序,它们从 Kafka 主题中订阅和吸取事件,然后进行相应的处理惩罚

[*]消费者组(Consumer Groups):为了进步消费的可靠性和可扩展性,Kafka 支持消费者组的概念

1、Docker部署Elasticsearch、Kibana

见:https://juejin.cn/post/7429666925842399258
上一篇已经创建了网络,检察docker network ls
2、Docker部署Kafka

见:https://juejin.cn/post/7429667424926023699
3、Docker部署postgres

见:https://juejin.cn/post/7409913195660410890
监控Postgres设置

修改设置文件
   Postgres挂载的data目录包含设置文件
vim /var/lib/postgresql/data/postgresql.conf
   假如你要监控Mysql,见以下监控Mysql设置
# 在启动时加载插件
shared_preload_libraries = 'pgoutput'
# REPLICATION
# 更改wal日志方式为logical(minimal、replica 、logical)
wal_level = logical
# 更改wal发送最大进程数(默认值为10),这个值和solts设置一样
max_wal_senders = 4
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 4
重启
docker restart postgres
监控Mysql设置

Mysql设置开启biblog:vim /etc/my.cnf

server-id=1
# 日志前缀
log-bin=mysql-bin
# 日志格式
binlog_format=row


[*]binlog_format:

[*]statement(SBR):每一条修改数据的sql都会记录到binlog中

[*]优点:日志文件小、可直接看到sql易于理解
[*]缺点:主从复制环境可能会导致数据差别等、复杂sql在从库可能执行失败

[*]row(RBR):不记录sql的上下文信息,仅记录每一行的数据变革

[*]优点:在主从复制环境可以保证数据同等性、可以具体记录修改前和修改后的值、不关心sql,不受sql复杂操作影响
[*]缺点:日志文件大、阅读比SBR难

[*]mixed(MBR):将以上两种混合利用,一样平常的复制利用statement模式生存的binlog,对于可能出现问题的操作利用row模式生存binlog

[*]优点:团结了SBR、RBR的优点
[*]缺点:复杂性增加、阅读更复杂、设置难度高


Postgres数据库设置用户和表

查询,返回logical则成功
SHOW wal_level
创建用户
CREATE USER user1 WITH PASSWORD 'user123456';
授权复制流权限
ALTER ROLE user1 replication;
授权登录
   我们的数据库名为postgres
GRANT CONNECT ON DATABASE postgres to user1;
授权所有权,包罗但不限于SELECT(查询)、INSERT(插入)、UPDATE(更新)、DELETE(删除)、TRUNCATE(截断表)
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO user1;
设置发布表
update pg_publication set puballtables=true where pubname is not null;
将所有表发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
查询发布表
select * from pg_publication_tables;
查询复制标识
   默以为d


[*]d(default):默认计谋,根据表的主键(假如存在)来辨认行,对于插入、更新、删除通过主键来定位行(所有表都应该创建自增id作为主键)
[*]full:利用表中所有列来辨认行(假如没有创建自增id作为主键,利用full计谋)
[*]nothing:逻辑复制不会利用任何行标识跟踪变革,在更新、删除时可能出现问题
select relreplident from pg_class where relname='user';
更改复制标识(可选,假如某一个表没有设置主键,请设置为full)
ALTER TABLE user REPLICA IDENTITY FULL;
4、Docker部署Debezium

由于debezium默认提供了jdbc、mongodb、mysql、oracle、postgres、soanner、sqlserver、vitess、db2的毗连器,我们必要额外安装Elasticsearch Connector 插件
先创建一个插件目录
mkdir -p /opt/debezium/kafak
挂载后会更换原来的插件目录,以是必要先运行debezium,然后将我们必要的kafka-connector插件拷贝出来
docker run -d \
--name debezium \
-p 8083:8083 \
--network pub-network \
--restart always \
-e BOOTSTRAP_SERVERS=broker:9092 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=connect_configs \
-e OFFSET_STORAGE_TOPIC=connect_offsets \
-e STATUS_STORAGE_TOPIC=connect_statuses \
debezium/connect
拷贝所有插件到宿主机
docker cp debezium:/kafka/connect/ /opt/debezium/kafak/
将kafka-connect-elasticsearch下载解压后上传到/opt/debezium/kafak/connect:https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
删除容器
docker rm -f debezium
然后启动(必要将kafka中产生的三个topic删掉)
   注意GROUP_ID不能重复,假如设置Debezium时404大概率是kakfa中存在相同ID
broker为kafka容器名,假如监听地址设置为ip,这里也改成ip
docker run -d \
--name debezium \
-p 8083:8083 \
--network pub-network \
--restart always \
-e BOOTSTRAP_SERVERS=broker:9092 \
-v /opt/debezium/kafak/connect:/kafka/connect \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=connect_configs \
-e OFFSET_STORAGE_TOPIC=connect_offsets \
-e STATUS_STORAGE_TOPIC=connect_statuses \
debezium/connect


[*]BOOTSTRAP_SERVERS:指定Kafka地址和端口
[*]GROUP_ID:指定消费者组id
[*]TOPIC_PREFIX:发送到Kafka的TOPIC前缀
[*]CONFIG_STORAGE_TOPIC:存储Debezium的设置信息到Kafka的Topic名,用于重启时加载设置
[*]OFFSET_STORAGE_TOPIC:存储消费者偏移量信息到Kafka的Topic名,用于记录消费者进度
[*]KEY_CONVERTER:指定键值转换器,利用Avro来格式化、序列化键值数据
[*]VALUE_CONVERTER:指定键值转换器,利用Avro来格式化、序列化键值数据
[*]CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL:指定Avro键值转换器的模式注册表地址,用于存储和管理Avro模式
[*]CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:指定Avro键值转换器的模式注册表地址,用于存储和管理Avro模式
设置Debezium

db-connector.json这里创建一个source-connector,代表debezium通过kafka connect向kafka写入数据
   请修改数据库设置,最后两行设置监听所有表变革
{
        "name": "db-source-connector",
        "config": {
                "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
                "plugin.name": "pgoutput",
                "database.hostname": "postgres",
                "database.port": "5432",
                "database.user": "user1",
                "database.password": "user123456",
                "database.dbname": "postgres",
                "database.server.name": "postgres",
          "topic.prefix": "pg",
                "table.include.list": "public.*"
        }
}
设置毗连器(你也可以用Postman发POST请求到http://localhost:8083/connectors/,在body/raw中填写以上设置)
   在''中添加以上设置,更换localhost为ip地址,假如没相应或者访问不到请删除Kafka中自动创建的三个Topic
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ --data ''

相应包含以下内容则成功
"tasks": [],
"type": "source"
验证(默认发GET请求,用Postman GET:http://localhost:8083/connectors/是一样的)
curl -H "Accept:application/json" localhost:8083/connectors/

返回以下内容则成功,返回[]说明可以访问
["db-source-connector"]
插入数据到数据库则可以看到创建了Tpoic
https://i-blog.csdnimg.cn/direct/d5411f9a32d0449a9b98c6aac42d2179.png
删除毗连器

先检察设置了哪些毗连器
curl -H "Accept:application/json" http://localhost:8083/connectors/
删除指定毗连器
curl -X DELETE http://localhost:8083/connectors/your-connector-name
ES订阅Kafka获取数据

elasticsearch-connector.json这里代表debezium通过kafka connect从kafka导出数据到es
   我们将所有pg.public.user的topic写入es中,貌似只能指定topic,不能用正则表达式,不外在利用过程中大概率只会将商品、日志等数据同步到es
{
        "name": "elasticsearch-sink-connector",
        "config": {
                "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
                "topics": "pg.public.user",
                "connection.url": "es:9200",
                "type.name": "_doc",
                "key.ignore": true,
                "value.converter": "org.apache.kafka.connect.json.JsonConverter"
        }
}
设置毗连器(你也可以用Postman发POST请求到http://localhost:8083/connectors/,在body/raw中填写以上设置)
   在''中添加以上设置,更换localhost为ip地址,假如没相应或者访问不到请删除Kafka中自动创建的三个Topic
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ --data ''

验证(默认发GET请求,用Postman GET:http://localhost:8083/connectors/是一样的)
curl -H "Accept:application/json" localhost:8083/connectors/

在中Elasticsearch检察,利用kibana的控制台发送DSL请求
GET /_search
可以看到对user表的更改同步到了es中
https://i-blog.csdnimg.cn/direct/fd55825b199141ec86e5188cc8d8828a.png
假如必要docker compose,见:https://github.com/VCCICCV/MGR/tree/main/auth

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 利用Debezium、Kafka实现Elasticsearch数据同步