概要
基于Canal二次开发实现MySQL到Kafka数据同步的完整指南
团体架构流程
MySQL Master → Canal Server(伪装Slave) → Canal Parser → Canal MQ Producer → Kafka → 下游消费者
- 支持通过页面配置监听MySQL的指定库表。
- 剖析binlog并推送到Kafka。
- 不同项目消费Kafka实现数据同步。
技能名词解释
Canal Server 监听MySQL,剖析binlog,管理实例
Canal Client 消费剖析后的数据,支持自界说处置惩罚
Canal Admin 集群管理后台(需二次开发增强配置)
Canal Adapter 数据适配层(可扩展输出到Kafka)
技能细节
一、Canal核心原理与架构
- Canal定位:阿里巴巴开源的MySQL binlog增量订阅&消费组件,
- 核心能力:伪装MySQL Slave抓取主库binlog,剖析binlog为结构化数据(RowChange)、支持将数据推送到MQ(Kafka/RocketMQ)
- 核心模块
- Canal Server 监听MySQL,解析binlog,管理实例
- Canal Client 消费解析后的数据,支持自定义处理
- Canal Admin 集群管理后台(需二次开发增强配置)
- Canal Adapter 数据适配层(可扩展输出到Kafka)
复制代码- MySQL Master → Canal Server(伪装Slave) → Canal Parser → Canal MQ Producer → Kafka → 下游消费者
复制代码 二、环境准备
6. 底子依赖
- MySQL:开启binlog(log-bin=mysql-bin, binlog_format=ROW)
- Java 8+:Canal基于Java开发
- Kafka集群:建议2.0+版本
- Zookeeper:Canal Server依赖ZK做集群协调
复制代码- git clone https://github.com/alibaba/canal.git
- #建议使用1.1.6稳定版
- git checkout canal-1.1.6
复制代码 三、快速验证Canal底子功能
3. 当地启动Canal Server
- 步骤1:修改conf/example/instance.properties
- #MySQL连接配置
- canal.instance.master.address=127.0.0.1:3306
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- #监听所有库表
- canal.instance.filter.regex=.*\\..*
- 步骤2:启动Server
- sh bin/startup.sh
复制代码- SimpleCanalClient
- CanalConnector connector = CanalConnectors.newSingleConnector(
- new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
- connector.connect();
- connector.subscribe(".*\\..*"); // 订阅所有表
- while (true) {
- Message message = connector.getWithoutAck(100); // 批量获取
- for (CanalEntry.Entry entry : message.getEntries()) {
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- // 解析出变更的行数据
- System.out.println(rowChange);
- }
- connector.ack(message.getId()); // 确认消费
- }
复制代码 四、二次开发:页面化配置+推送到Kafka
- 动态配置:通过Web界面管理监听规则(库、表、过滤条件)
- Kafka集成:自动将数据按规则发送到指定Topic
- 多用户隔离:不同项目消费不同Topic
- 架构改造方案
- +-------------------+ +-----------------+
- | Admin Web界面 | | MySQL配置存储 |
- +-------------------+ +-----------------+
- | 读写配置 |
- v v
- +------------+ +------------------+ +------------------+
- | MySQL主库 | --> | Canal Server集群 | --> | Kafka消息队列 |
- +------------+ +------------------+ +------------------+
- ^ |
- | 动态加载配置 | 按规则分发Topic
- +-----------------------+
复制代码
- 关键开发步调
3.1 扩展Canal Admin实现配置管理
目的:将配置从文件迁移到数据库,支持页面增删改查
代码示例:
- // 自定义配置DAO(伪代码)
- public interface InstanceConfigDAO {
- List<InstanceConfig> listAll();
- InstanceConfig getByDestination(String destination);
- void save(InstanceConfig config);
- }
- // 重写InstanceConfigMonitor
- public class DBInstanceConfigMonitor extends AbstractCanalLifeCycle
- implements InstanceConfigMonitor {
- @Override
- public InstanceConfig getInstanceConfig(String destination) {
- return instanceConfigDAO.getByDestination(destination);
- }
- }
复制代码 3.2 开发Web管理界面
技能选型:
- 前端:Vue + Element UI
- 后端:Spring Boot(集成Canal Admin API)
- 功能点:
- 库表监听规则配置(支持正则表达式)
- Kafka Topic映射(如:db1.table1 → topic_db1_table1)
- 监控看板(同步延迟、消息堆积)
复制代码 3.3 改造Canal Kafka Producer
目的:根据配置动态选择Topic
代码示例:
- // 自定义Topic路由策略
- public class DynamicTopicProducer extends KafkaFlatMessageProducer {
- @Override
- protected String getTopic(CanalEntry.Entry entry) {
- String schema = entry.getHeader().getSchemaName();
- String table = entry.getHeader().getTableName();
- // 从配置中心获取schema.table对应的topic
- return configService.getTopic(schema, table);
- }
- }
- // 修改canal.properties启用自定义Producer
- canal.mq.producerClass=com.yourcompany.DynamicTopicProducer
复制代码 3.4 配置Kafka序列化
修改MQ配置:
- # kafka生产端配置
- kafka.bootstrap.servers=127.0.0.1:9092
- kafka.acks=all
- kafka.compression.type=snappy
- kafka.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
复制代码 五、摆设与验证
- [MySQL] ←→ [Canal Server集群] ←→ [Kafka集群] ←→ [业务消费者]
- ↑
- [配置管理后台]
复制代码
- 验证流程
在管理界面添加规则:监听db1.order表 → 推送到topic_orders
实行MySQL插入:
- INSERT INTO db1.order VALUES (1001, 'test');
复制代码 查抄Kafka消息:
- kafka-console-consumer.sh --topic topic_orders --bootstrap-server localhost:9092
复制代码 预期输出:
- {
- "data": [{"id":1001, "name":"test"}],
- "type": "INSERT",
- "database": "db1",
- "table": "order"
- }
复制代码 更加详情代码和配置细节
六、注意事项
MySQL权限:Canal账号需
SELECT, REPLICATION SLAVE, REPLICATION CLIENT
Kafka分区策略:发起按主键哈希分区保证顺序性
监控诉警:重点关注:
Canal Server内存堆积(canal.memory.buffer.size)
Kafka生产耽误(canal.mq.send.thread.size)
数据一致性:
启用ACOM机制(canal.instance.global.mode=spring)
消费者需处置惩罚幂等(通过binlog的gtid或主键)
七、扩展优化发起
DDL同步:捕获ALTER TABLE事故并广播到全部消费者
数据过滤:在Canal Server层增加EventFilter拦截无关变更
分库分表合并:通过Canal Adapter将多个分表数据聚合到同一Topic
监控集成:对接Prometheus + Grafana展示同步耽误曲线
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |