ToB企服应用市场:ToB评测及商务社交产业平台

标题: 基于Canal二次开发实现MySQL到Kafka数据同步 [打印本页]

作者: 我爱普洱茶    时间: 2025-2-18 04:47
标题: 基于Canal二次开发实现MySQL到Kafka数据同步
概要

基于Canal二次开发实现MySQL到Kafka数据同步的完整指南
团体架构流程

MySQL Master → Canal Server(伪装Slave) → Canal Parser → Canal MQ Producer → Kafka → 下游消费者

技能名词解释

Canal Server 监听MySQL,剖析binlog,管理实例
Canal Client 消费剖析后的数据,支持自界说处置惩罚
Canal Admin 集群管理后台(需二次开发增强配置)
Canal Adapter 数据适配层(可扩展输出到Kafka)
技能细节

一、Canal核心原理与架构
  1. Canal Server        监听MySQL,解析binlog,管理实例
  2. Canal Client        消费解析后的数据,支持自定义处理
  3. Canal Admin        集群管理后台(需二次开发增强配置)
  4. Canal Adapter        数据适配层(可扩展输出到Kafka)
复制代码
  1. MySQL Master → Canal Server(伪装Slave) → Canal Parser → Canal MQ Producer → Kafka → 下游消费者
复制代码
二、环境准备
6. 底子依赖
  1. MySQL:开启binlog(log-bin=mysql-bin, binlog_format=ROW)
  2. Java 8+:Canal基于Java开发
  3. Kafka集群:建议2.0+版本
  4. Zookeeper:Canal Server依赖ZK做集群协调
复制代码
  1. git clone https://github.com/alibaba/canal.git
  2. #建议使用1.1.6稳定版
  3. git checkout canal-1.1.6
复制代码
三、快速验证Canal底子功能
3. 当地启动Canal Server
  1. 步骤1:修改conf/example/instance.properties
  2. #MySQL连接配置
  3. canal.instance.master.address=127.0.0.1:3306
  4. canal.instance.dbUsername=canal
  5. canal.instance.dbPassword=canal
  6. #监听所有库表
  7. canal.instance.filter.regex=.*\\..*
  8. 步骤2:启动Server
  9. sh bin/startup.sh
复制代码
  1. SimpleCanalClient
  2. CanalConnector connector = CanalConnectors.newSingleConnector(
  3.     new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
  4. connector.connect();
  5. connector.subscribe(".*\\..*");  // 订阅所有表
  6. while (true) {
  7.     Message message = connector.getWithoutAck(100);  // 批量获取
  8.     for (CanalEntry.Entry entry : message.getEntries()) {
  9.         CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  10.         // 解析出变更的行数据
  11.         System.out.println(rowChange);
  12.     }
  13.     connector.ack(message.getId());  // 确认消费
  14. }
复制代码
四、二次开发:页面化配置+推送到Kafka
  1.                   +-------------------+      +-----------------+
  2.                   |   Admin Web界面    |      |    MySQL配置存储 |
  3.                   +-------------------+      +-----------------+
  4.                             | 读写配置               |
  5.                             v                       v
  6. +------------+     +------------------+     +------------------+
  7. | MySQL主库  | --> | Canal Server集群 | --> | Kafka消息队列     |
  8. +------------+     +------------------+     +------------------+
  9.                             ^                       |
  10.                             | 动态加载配置           | 按规则分发Topic
  11.                             +-----------------------+
复制代码
  1. // 自定义配置DAO(伪代码)
  2. public interface InstanceConfigDAO {
  3.     List<InstanceConfig> listAll();
  4.     InstanceConfig getByDestination(String destination);
  5.     void save(InstanceConfig config);
  6. }
  7. // 重写InstanceConfigMonitor
  8. public class DBInstanceConfigMonitor extends AbstractCanalLifeCycle
  9.     implements InstanceConfigMonitor {
  10.     @Override
  11.     public InstanceConfig getInstanceConfig(String destination) {
  12.         return instanceConfigDAO.getByDestination(destination);
  13.     }
  14. }
复制代码
3.2 开发Web管理界面
技能选型:
  1. 前端:Vue + Element UI
  2. 后端:Spring Boot(集成Canal Admin API)
  3. 功能点:
  4. 库表监听规则配置(支持正则表达式)
  5. Kafka Topic映射(如:db1.table1 → topic_db1_table1)
  6. 监控看板(同步延迟、消息堆积)
复制代码
3.3 改造Canal Kafka Producer
目的:根据配置动态选择Topic
代码示例:
  1. // 自定义Topic路由策略
  2. public class DynamicTopicProducer extends KafkaFlatMessageProducer {
  3.     @Override
  4.     protected String getTopic(CanalEntry.Entry entry) {
  5.         String schema = entry.getHeader().getSchemaName();
  6.         String table = entry.getHeader().getTableName();
  7.         // 从配置中心获取schema.table对应的topic
  8.         return configService.getTopic(schema, table);
  9.     }
  10. }
  11. // 修改canal.properties启用自定义Producer
  12. canal.mq.producerClass=com.yourcompany.DynamicTopicProducer
复制代码
3.4 配置Kafka序列化
修改MQ配置:
  1. # kafka生产端配置
  2. kafka.bootstrap.servers=127.0.0.1:9092
  3. kafka.acks=all
  4. kafka.compression.type=snappy
  5. kafka.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
复制代码
五、摆设与验证
  1. [MySQL] ←→ [Canal Server集群] ←→ [Kafka集群] ←→ [业务消费者]
  2.                 ↑
  3.           [配置管理后台]
复制代码
实行MySQL插入:
  1. INSERT INTO db1.order VALUES (1001, 'test');
复制代码
查抄Kafka消息:
  1. kafka-console-consumer.sh --topic topic_orders --bootstrap-server localhost:9092
复制代码
预期输出:
  1. {
  2.   "data": [{"id":1001, "name":"test"}],
  3.   "type": "INSERT",
  4.   "database": "db1",
  5.   "table": "order"
  6. }
复制代码
更加详情代码和配置细节
六、注意事项
MySQL权限:Canal账号需
SELECT, REPLICATION SLAVE, REPLICATION CLIENT
Kafka分区策略:发起按主键哈希分区保证顺序性
监控诉警:重点关注:
Canal Server内存堆积(canal.memory.buffer.size)
Kafka生产耽误(canal.mq.send.thread.size)
数据一致性:
启用ACOM机制(canal.instance.global.mode=spring)
消费者需处置惩罚幂等(通过binlog的gtid或主键)
七、扩展优化发起
DDL同步:捕获ALTER TABLE事故并广播到全部消费者
数据过滤:在Canal Server层增加EventFilter拦截无关变更
分库分表合并:通过Canal Adapter将多个分表数据聚合到同一Topic
监控集成:对接Prometheus + Grafana展示同步耽误曲线

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4