大数据-266 实时数仓 - Canal 对接 Kafka 客户端测试

打印 上一主题 下一主题

主题 870|帖子 870|积分 2610

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!



  • MyBatis 更新完毕
  • 目前开始更新 Spring,一起深入浅出!
目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(已更完)
  • 实时数仓(正在更新…)
章节内容



  • Canal 摆设安装
  • 启动服务 常见问题办理

Canal 简介

Canal 是阿里巴巴开源的 MySQL binlog 增量订阅与消耗平台。它模拟 MySQL 的主从复制机制,通过分析 MySQL 的二进制日记(binlog),实现数据库变动的数据捕获(CDC, Change Data Capture)。


  • 数据同步:支持将数据库的变动数据同步到其他数据源或消息系统,如 Kafka、RocketMQ、Elasticsearch 等。
  • 实时性:基于 binlog 的分析和订阅,可以或许实现毫秒级的数据变动捕获。
  • 分布式架构:支持集群摆设,满足高可用性和高吞吐量需求。
  • 多种数据支持:除 MySQL 外,还支持 MariaDB 和部分兼容 MySQL 协议的数据库。
Kafka 简介

Kafka 是一个分布式消息系统,用于高吞吐量、低耽误的数据流处置惩罚。它支持消息持久化、订阅消耗、流处置惩罚等功能,常用于日记采集、事故流处置惩罚、大数据分析和消息队列场景。


  • Producer(生产者):负责将数据写入 Kafka。
  • Consumer(消耗者):从 Kafka 中读取数据。
  • Broker:Kafka 集群中的服务器,负责存储和分发消息。
  • Topic:消息的分类存储单位。
  • Partition:将数据分区存储,以实现并行处置惩罚和负载均衡。
Canal 与 Kafka 的集成原理

Canal 和 Kafka 通常共同使用,用于构建高效的数据同步管道,实现数据库变动到消息队列的实时推送。流程如下:


  • 数据源捕获:Canal 监听 MySQL 的 binlog 数据变动事故。
  • 数据分析:Canal 将 binlog 数据分析为 JSON 格式或其他布局化数据。
  • 消息推送:Canal 将分析后的数据发送到 Kafka 的指定 Topic 中。
  • 消息消耗与处置惩罚:Kafka Consumer 消耗数据,并进一步分发给其他服务或存储,如 Hadoop、Elasticsearch、Redis 等。
使用场景

数据同步与分发



  • 实现多种异构系统之间的数据一致性。
  • 比如将 MySQL 数据变动同步到 Elasticsearch,实现实时搜索引擎。
日记分析与监控

将数据库操纵事故推送到 Kafka,供日记分析或实时监控系统使用。
实时数据流处置惩罚

数据经过 Kafka 进入 Flink、Spark Streaming 等流处置惩罚框架,满足复杂的数据处置惩罚需求。
缓存革新

数据库更新后,推送变动消息到 Kafka,再由消耗者更新 Redis 缓存,提高一致性和访问性能。
注意事项与优化

数据一致性保障

确保 binlog 与业务日记一致,以制止遗漏或重复消耗。
分区与负载均衡

使用 Kafka 的分区机制分配差别的表或业务流量,提升并行消耗能力。
消息格式优化

选择扁平化 JSON 格式传输数据,便于消耗端分析处置惩罚。
容错与规复机制

在 Canal 和 Kafka 之间配置重试机制,制止临时网络故障导致数据丢失。
安全性

配置 Canal 访问 MySQL 的最小权限账户,只授予 REPLICATION SLAVE 和 REPLICATION CLIENT 权限。
环境要求



  • MySQL
  • Canal 采集 binlog
  • 在 Kafka 做验证
新建主题

  1. # 新建主题
  2. kafka-topics.sh --zookeeper h123.wzk.icu:2181 --create --replication-factor 3 --partitions 1 --topic dwshow
复制代码
查看主题

  1. # 查看主题
  2. kafka-topics.sh --zookeeper h123.wzk.icu:2181 --list
复制代码
执行结果如下图所示:

启动生产者

  1. # 启动生产者
  2. kafka-console-producer.sh --broker-list h123.wzk.icu:9092 --topic dwshow
复制代码
启动消耗者

  1. # 启动消费者
  2. kafka-console-consumer.sh --bootstrap-server h123.wzk.icu:9092 --topic dwshow --from-beginning
复制代码
操纵数据

此时 MySQL 数据表如有变化,会将 row 范例的 log 写进 Kafka,详细格式为 JSON。
INSERT 操纵

  1. {
  2.     "data": [
  3.         {
  4.             "id": "6",
  5.             "payMethod": "meituan",
  6.             "payName": "美团支付",
  7.             "description": "美团支付",
  8.             "payOrder": "0",
  9.             "online": "-1"
  10.         }
  11.     ],
  12.     "database": "dwshow",
  13.     "es": 1604461572000,
  14.     "id": 6,
  15.     "isDdl": false,
  16.     "mysqlType": {
  17.         "id": "int(11)",
  18.         "payMethod": "varchar(20)",
  19.         "payName": "varchar(255)",
  20.         "description": "varchar(255)",
  21.         "payOrder": "int(11)",
  22.         "online": "tinyint(4)"
  23.     },
  24.     "old": null,
  25.     "pkNames": null,
  26.     "sql": "",
  27.     "sqlType": {
  28.         "id": 4,
  29.         "payMethod": 12,
  30.         "payName": 12,
  31.         "description": 12,
  32.         "payOrder": 4,
  33.         "online": -6
  34.     },
  35.     "table": "wzk_payments",
  36.     "ts": 1604461572297,
  37.     "type": "INSERT"
  38. }
复制代码
UPDATE 操纵

  1. {
  2.   "data": [
  3.     {
  4.       "productId": "115908",
  5.       "productName": "索尼 xxx10",
  6.       "shopId": "100365",
  7.       "price": "300.0",
  8.       "isSale": "1",
  9.       "status": "0",
  10.       "categoryId": "10395",
  11.       "createTime": "2020-07-12 13:22:22",
  12.       "modifyTime": "2020-09-27 02:51:16"
  13.     }
  14.   ],
  15.   "database": "dwshow",
  16.   "es": 1601189476000,
  17.   "id": 456,
  18.   "isDdl": false,
  19.   "mysqlType": {
  20.     "productId": "bigint(11)",
  21.     "productName": "varchar(200)",
  22.     "shopId": "bigint(11)",
  23.     "price": "decimal(11,2)",
  24.     "isSale": "tinyint(4)",
  25.     "status": "tinyint(4)",
  26.     "categoryId": "int(11)",
  27.     "createTime": "varchar(25)",
  28.     "modifyTime": "datetime"
  29.   },
  30.   "old": [
  31.     {
  32.       "price": "597.80",
  33.       "modifyTime": "2020-07-12 13:22:22"
  34.     }
  35.   ],
  36.   "pkNames": null,
  37.   "sql": "",
  38.   "sqlType": {
  39.     "productId": -5,
  40.     "productName": 12,
  41.     "shopId": -5,
  42.     "price": 3,
  43.     "isSale": -6,
  44.     "status": -6,
  45.     "categoryId": 4,
  46.     "createTime": 12,
  47.     "modifyTime": 93
  48.   },
  49.   "table": "wzk_product_info",
  50.   "ts": 1601189477116,
  51.   "type": "UPDATE"
  52. }
复制代码
DELETE 操纵

  1. {
  2.   "data": [
  3.     {
  4.       "productId": "115908",
  5.       "productName": "索尼 xxx10",
  6.       "shopId": "100365",
  7.       "price": "300.0",
  8.       "isSale": "1",
  9.       "status": "0",
  10.       "categoryId": "10395",
  11.       "createTime": "2020-07-12 13:22:22",
  12.       "modifyTime": "2020-09-27 02:51:16"
  13.     }
  14.   ],
  15.   "database": "dwshow",
  16.   "es": 1601189576000,
  17.   "id": 457,
  18.   "isDdl": false,
  19.   "mysqlType": {
  20.     "productId": "bigint(11)",
  21.     "productName": "varchar(200)",
  22.     "shopId": "bigint(11)",
  23.     "price": "decimal(11,2)",
  24.     "isSale": "tinyint(4)",
  25.     "status": "tinyint(4)",
  26.     "categoryId": "int(11)",
  27.     "createTime": "varchar(25)",
  28.     "modifyTime": "datetime"
  29.   },
  30.   "old": null,
  31.   "pkNames": null,
  32.   "sql": "",
  33.   "sqlType": {
  34.     "productId": -5,
  35.     "productName": 12,
  36.     "shopId": -5,
  37.     "price": 3,
  38.     "isSale": -6,
  39.     "status": -6,
  40.     "categoryId": 4,
  41.     "createTime": 12,
  42.     "modifyTime": 93
  43.   },
  44.   "table": "wzk_product_info",
  45.   "ts": 1601189576594,
  46.   "type": "DELETE"
  47. }
复制代码
上面的 JSON 格式表明如下:


  • data:最新的数据,为json数组。
  • 如果是插入则体现最新插入的数据;
  • 如果是更新,则体现更新后的最新数据;
  • 如果是删除,则体现被删除的数据
  • database:数据库名称
  • es:事故时间,13位的时间戳
  • id:事故操纵的序列号,1,2,3…
  • isDdl:是否是DDL操纵
  • mysqlType:字段范例
  • old:旧数据
  • pkNames:主键名称
  • sql:SQL语句
  • sqlType:是经过canal转换处置惩罚的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal
  • table:表名
  • ts:日记时间
  • type:操纵范例,比如DELETE,UPDATE,INSERT
当我们操纵数据库的时间,可以看到Kafka 中写入了大量的数据(这里我是消耗者在监听):


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宝塔山

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表