基于Canal二次开发实现MySQL到Kafka数据同步

打印 上一主题 下一主题

主题 854|帖子 854|积分 2562

概要

基于Canal二次开发实现MySQL到Kafka数据同步的完整指南
团体架构流程

MySQL Master → Canal Server(伪装Slave) → Canal Parser → Canal MQ Producer → Kafka → 下游消费者


  • 支持通过页面配置监听MySQL的指定库表。
  • 剖析binlog并推送到Kafka。
  • 不同项目消费Kafka实现数据同步。
技能名词解释

Canal Server 监听MySQL,剖析binlog,管理实例
Canal Client 消费剖析后的数据,支持自界说处置惩罚
Canal Admin 集群管理后台(需二次开发增强配置)
Canal Adapter 数据适配层(可扩展输出到Kafka)
技能细节

一、Canal核心原理与架构

  • Canal定位:阿里巴巴开源的MySQL binlog增量订阅&消费组件,
  • 核心能力:伪装MySQL Slave抓取主库binlog,剖析binlog为结构化数据(RowChange)、支持将数据推送到MQ(Kafka/RocketMQ)
  • 核心模块
  1. Canal Server        监听MySQL,解析binlog,管理实例
  2. Canal Client        消费解析后的数据,支持自定义处理
  3. Canal Admin        集群管理后台(需二次开发增强配置)
  4. Canal Adapter        数据适配层(可扩展输出到Kafka)
复制代码

  • 数据流转流程
  1. MySQL Master → Canal Server(伪装Slave) → Canal Parser → Canal MQ Producer → Kafka → 下游消费者
复制代码
二、环境准备
6. 底子依赖
  1. MySQL:开启binlog(log-bin=mysql-bin, binlog_format=ROW)
  2. Java 8+:Canal基于Java开发
  3. Kafka集群:建议2.0+版本
  4. Zookeeper:Canal Server依赖ZK做集群协调
复制代码

  • 下载Canal源码
  1. git clone https://github.com/alibaba/canal.git
  2. #建议使用1.1.6稳定版
  3. git checkout canal-1.1.6
复制代码
三、快速验证Canal底子功能
3. 当地启动Canal Server
  1. 步骤1:修改conf/example/instance.properties
  2. #MySQL连接配置
  3. canal.instance.master.address=127.0.0.1:3306
  4. canal.instance.dbUsername=canal
  5. canal.instance.dbPassword=canal
  6. #监听所有库表
  7. canal.instance.filter.regex=.*\\..*
  8. 步骤2:启动Server
  9. sh bin/startup.sh
复制代码

  • 使用Canal Client消费数据
  1. SimpleCanalClient
  2. CanalConnector connector = CanalConnectors.newSingleConnector(
  3.     new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
  4. connector.connect();
  5. connector.subscribe(".*\\..*");  // 订阅所有表
  6. while (true) {
  7.     Message message = connector.getWithoutAck(100);  // 批量获取
  8.     for (CanalEntry.Entry entry : message.getEntries()) {
  9.         CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  10.         // 解析出变更的行数据
  11.         System.out.println(rowChange);
  12.     }
  13.     connector.ack(message.getId());  // 确认消费
  14. }
复制代码
四、二次开发:页面化配置+推送到Kafka

  • 动态配置:通过Web界面管理监听规则(库、表、过滤条件)
  • Kafka集成:自动将数据按规则发送到指定Topic
  • 多用户隔离:不同项目消费不同Topic
  • 架构改造方案
  1.                   +-------------------+      +-----------------+
  2.                   |   Admin Web界面    |      |    MySQL配置存储 |
  3.                   +-------------------+      +-----------------+
  4.                             | 读写配置               |
  5.                             v                       v
  6. +------------+     +------------------+     +------------------+
  7. | MySQL主库  | --> | Canal Server集群 | --> | Kafka消息队列     |
  8. +------------+     +------------------+     +------------------+
  9.                             ^                       |
  10.                             | 动态加载配置           | 按规则分发Topic
  11.                             +-----------------------+
复制代码

  • 关键开发步调
    3.1 扩展Canal Admin实现配置管理
    目的:将配置从文件迁移到数据库,支持页面增删改查
    代码示例:
  1. // 自定义配置DAO(伪代码)
  2. public interface InstanceConfigDAO {
  3.     List<InstanceConfig> listAll();
  4.     InstanceConfig getByDestination(String destination);
  5.     void save(InstanceConfig config);
  6. }
  7. // 重写InstanceConfigMonitor
  8. public class DBInstanceConfigMonitor extends AbstractCanalLifeCycle
  9.     implements InstanceConfigMonitor {
  10.     @Override
  11.     public InstanceConfig getInstanceConfig(String destination) {
  12.         return instanceConfigDAO.getByDestination(destination);
  13.     }
  14. }
复制代码
3.2 开发Web管理界面
技能选型:
  1. 前端:Vue + Element UI
  2. 后端:Spring Boot(集成Canal Admin API)
  3. 功能点:
  4. 库表监听规则配置(支持正则表达式)
  5. Kafka Topic映射(如:db1.table1 → topic_db1_table1)
  6. 监控看板(同步延迟、消息堆积)
复制代码
3.3 改造Canal Kafka Producer
目的:根据配置动态选择Topic
代码示例:
  1. // 自定义Topic路由策略
  2. public class DynamicTopicProducer extends KafkaFlatMessageProducer {
  3.     @Override
  4.     protected String getTopic(CanalEntry.Entry entry) {
  5.         String schema = entry.getHeader().getSchemaName();
  6.         String table = entry.getHeader().getTableName();
  7.         // 从配置中心获取schema.table对应的topic
  8.         return configService.getTopic(schema, table);
  9.     }
  10. }
  11. // 修改canal.properties启用自定义Producer
  12. canal.mq.producerClass=com.yourcompany.DynamicTopicProducer
复制代码
3.4 配置Kafka序列化
修改MQ配置:
  1. # kafka生产端配置
  2. kafka.bootstrap.servers=127.0.0.1:9092
  3. kafka.acks=all
  4. kafka.compression.type=snappy
  5. kafka.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
复制代码
五、摆设与验证

  • 摆设拓扑
  1. [MySQL] ←→ [Canal Server集群] ←→ [Kafka集群] ←→ [业务消费者]
  2.                 ↑
  3.           [配置管理后台]
复制代码

  • 验证流程
    在管理界面添加规则:监听db1.order表 → 推送到topic_orders
实行MySQL插入:
  1. INSERT INTO db1.order VALUES (1001, 'test');
复制代码
查抄Kafka消息:
  1. kafka-console-consumer.sh --topic topic_orders --bootstrap-server localhost:9092
复制代码
预期输出:
  1. {
  2.   "data": [{"id":1001, "name":"test"}],
  3.   "type": "INSERT",
  4.   "database": "db1",
  5.   "table": "order"
  6. }
复制代码
更加详情代码和配置细节
六、注意事项
MySQL权限:Canal账号需
SELECT, REPLICATION SLAVE, REPLICATION CLIENT
Kafka分区策略:发起按主键哈希分区保证顺序性
监控诉警:重点关注:
Canal Server内存堆积(canal.memory.buffer.size)
Kafka生产耽误(canal.mq.send.thread.size)
数据一致性:
启用ACOM机制(canal.instance.global.mode=spring)
消费者需处置惩罚幂等(通过binlog的gtid或主键)
七、扩展优化发起
DDL同步:捕获ALTER TABLE事故并广播到全部消费者
数据过滤:在Canal Server层增加EventFilter拦截无关变更
分库分表合并:通过Canal Adapter将多个分表数据聚合到同一Topic
监控集成:对接Prometheus + Grafana展示同步耽误曲线

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我爱普洱茶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表