今日学习之debezium同步sqlserver数据库数据到kafka

打印 上一主题 下一主题

主题 906|帖子 906|积分 2718

Debezium先容

  1. Debezium是一个开源的分布式平台,用于捕获数据库的更改事件,并将这些事件转换为可观察的流。它可以连接到各种不同类型的数据库,包括MySQL、PostgreSQL、MongoDB等,捕获数据库中的数据更改,并将这些更改转发到消息代理系统(如Kafka)中,以便其他应用程序可以实时地消费这些更改事件。
  2. Debezium的主要特点包括:
  3. 1.实时数据捕获:Debezium能够实时地捕获数据库中的更改事件,包括插入、更新和删除操作,以便其他应用程序可以实时地获取这些更改。
  4. 2.可观察的流:捕获的数据库更改事件会被转换为可观察的流,这意味着其他应用程序可以轻松地订阅和消费这些事件,以便实时地获取数据库中的更改。
  5. 3.支持多种数据库:Debezium支持连接到各种不同类型的数据库,包括MySQL、PostgreSQL、MongoDB等,使得它非常灵活和通用。
  6. 4.可扩展性:由于Debezium是一个分布式平台,它具有良好的可扩展性,可以处理大规模的数据流,并确保高可用性和容错性。
  7. 5.Debezium 运行在 Kafka Connect 之上,这使它能够充分利用 Kafka 的分布式架构
复制代码
Debezium部署方式

  1. 1. kafka connect 通过kafka connect来部署debezium
  2. 2. debezium server 通过部署debezium服务器
  3. 3. embedded engine 将debezium connect作为嵌入示引擎 嵌入应用程序中如java应用中
复制代码
本文概括

  1. 本文使用常用的方式通过kafka connect来部署debezium,从监听sqlserver到生产消息入kafka队列
复制代码
准备工具:



  • kafka
  • sqlserver2019
  • debezium-connector-sqlserver
kafka集群和sqlserver2019本文不做部署分析,只针对debezium分析
详细步骤

一、 下载 debezium-connector-sqlserver
进入debezium官网下载connector组件 传送门
二、在kafka目录新建一个 connectors 目录

将下载的debezium connector解压放在该目录下

进入kafka config目录并编辑 connect-distributed.properties文件

在最下面到场plugin.path=刚刚新建的connectors目录
  1. # List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
  2. # Specify hostname as 0.0.0.0 to bind to all interfaces.
  3. # Leave hostname empty to bind to default interface.
  4. # Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
  5. listeners=HTTP://:8083
  6. # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
  7. # If not set, it uses the value for "listeners" if configured.
  8. #rest.advertised.host.name=
  9. #rest.advertised.port=
  10. #rest.advertised.listener=
  11. rest.port=18083
  12. # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
  13. # (connectors, converters, transformations). The list should consist of top level directories that include
  14. # any combination of:
  15. # a) directories immediately containing jars with plugins and their dependencies
  16. # b) uber-jars with plugins and their dependencies
  17. # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
  18. # Examples:
  19. # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
  20. #plugin.path=
  21. # 保存connectors的路径fka_2.12-3.5.1
  22. plugin.path=/opt/kafka/kafka_2.12-3.5.1/connectors
复制代码
首先启动kafka实行命令:
  1. [root@test config]# /opt/kafka/kafka_2.12-3.5.1/bin/kafka-server-start.sh /opt/kafka/kafka_2.12-3.5.1/config/server.properties
复制代码
启动分布式模式
  1. ../bin/connect-distributed.sh  /opt/kafka/kafka_2.12-3.5.1/config/connect-distributed.properties
复制代码
实行命令: curl localhost:8083 初出现下面则体现成功

sqlserver开启CDC脚本(可根据场景更改)
  1. if exists(select 1 from sys.databases where name='test' and is_cdc_enabled=0)
  2. begin
  3. exec sys.sp_cdc_enable_db--开启数据库CDC
  4. END
  5. go
  6. ALTER DATABASE crmnew ADD FILEGROUP CDC; --为该库添加名为CDC的⽂件组
  7. ALTER DATABASE crmnew
  8. ADD FILE
  9. (
  10. NAME='CDC',
  11. FILENAME ='D:\DB_data\CDC.ndf' --[日志目录]请按照实际情况指定
  12. )
  13. TO FILEGROUP CDC;
  14. go
  15. IF EXISTS(SELECT 1 FROM sys.tables WHERE name='aaaa' AND is_tracked_by_cdc = 0)
  16. BEGIN
  17.     EXEC sys.sp_cdc_enable_table
  18.         @source_schema = 'dbo', -- source_schema
  19.         @source_name = 'aaaa', -- table_name
  20.         @capture_instance = NULL, -- capture_instance
  21.         @role_name = NULL, -- role_name
  22.         @index_name = NULL, -- index_name
  23.         @filegroup_name = 'CDC' -- filegroup_name
  24. END;
  25. go
  26. EXEC sys.sp_cdc_change_job  
  27. @job_type = 'cleanup'
  28. ,@retention = 14400     --更改行将在更改表中保留的分钟数
  29. ,@threshold = 5000     --清除时可以使用一条语句删除的删除项的最大数量
  30. -- 重启作业
  31. go
  32. EXEC sys.sp_cdc_start_job @job_type = N'cleanup';
复制代码
接下来创建一个 Source 连接器,此前先要设定好这个连接器的相关设置,请求接口:localhost:8083/connectors/ json内容如下:
  1. {
  2.     "name": "sqlserver-connector3",
  3.     "config": {
  4.         "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
  5.         "database.hostname": "localhost",
  6.         "database.port": "1433",
  7.         "database.user": "sa",
  8.         "database.password": "123",
  9.         "database.dbname": "crmnew",
  10.         "table.include.list": "dbo.aaaa",
  11.         "database.server.name": "fullfillment",
  12.         "database.history.kafka.bootstrap.servers": "localhost:9092",
  13.         "database.history.kafka.topic": "fullfillment.dbo.aaaa"
  14.     }
  15. }
复制代码

我们监听kafka topic
  1. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic fullfillment.dbo.aaaa
复制代码

在sqlserver新增一条数据:

至此完成监听

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

郭卫东

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表