ToB企服应用市场:ToB评测及商务社交产业平台
标题:
基于Canal二次开发实现MySQL到Kafka数据同步
[打印本页]
作者:
我爱普洱茶
时间:
2025-2-18 04:47
标题:
基于Canal二次开发实现MySQL到Kafka数据同步
概要
基于Canal二次开发实现MySQL到Kafka数据同步的完整指南
团体架构流程
MySQL Master → Canal Server(伪装Slave) → Canal Parser → Canal MQ Producer → Kafka → 下游消费者
支持通过页面配置监听MySQL的指定库表。
剖析binlog并推送到Kafka。
不同项目消费Kafka实现数据同步。
技能名词解释
Canal Server 监听MySQL,剖析binlog,管理实例
Canal Client 消费剖析后的数据,支持自界说处置惩罚
Canal Admin 集群管理后台(需二次开发增强配置)
Canal Adapter 数据适配层(可扩展输出到Kafka)
技能细节
一、Canal核心原理与架构
Canal定位:阿里巴巴开源的MySQL binlog增量订阅&消费组件,
核心能力:伪装MySQL Slave抓取主库binlog,剖析binlog为结构化数据(RowChange)、支持将数据推送到MQ(Kafka/RocketMQ)
核心模块
Canal Server 监听MySQL,解析binlog,管理实例
Canal Client 消费解析后的数据,支持自定义处理
Canal Admin 集群管理后台(需二次开发增强配置)
Canal Adapter 数据适配层(可扩展输出到Kafka)
复制代码
数据流转流程
MySQL Master → Canal Server(伪装Slave) → Canal Parser → Canal MQ Producer → Kafka → 下游消费者
复制代码
二、环境准备
6. 底子依赖
MySQL:开启binlog(log-bin=mysql-bin, binlog_format=ROW)
Java 8+:Canal基于Java开发
Kafka集群:建议2.0+版本
Zookeeper:Canal Server依赖ZK做集群协调
复制代码
下载Canal源码
git clone https://github.com/alibaba/canal.git
#建议使用1.1.6稳定版
git checkout canal-1.1.6
复制代码
三、快速验证Canal底子功能
3. 当地启动Canal Server
步骤1:修改conf/example/instance.properties
#MySQL连接配置
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#监听所有库表
canal.instance.filter.regex=.*\\..*
步骤2:启动Server
sh bin/startup.sh
复制代码
使用Canal Client消费数据
SimpleCanalClient
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
connector.connect();
connector.subscribe(".*\\..*"); // 订阅所有表
while (true) {
Message message = connector.getWithoutAck(100); // 批量获取
for (CanalEntry.Entry entry : message.getEntries()) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
// 解析出变更的行数据
System.out.println(rowChange);
}
connector.ack(message.getId()); // 确认消费
}
复制代码
四、二次开发:页面化配置+推送到Kafka
动态配置:通过Web界面管理监听规则(库、表、过滤条件)
Kafka集成:自动将数据按规则发送到指定Topic
多用户隔离:不同项目消费不同Topic
架构改造方案
+-------------------+ +-----------------+
| Admin Web界面 | | MySQL配置存储 |
+-------------------+ +-----------------+
| 读写配置 |
v v
+------------+ +------------------+ +------------------+
| MySQL主库 | --> | Canal Server集群 | --> | Kafka消息队列 |
+------------+ +------------------+ +------------------+
^ |
| 动态加载配置 | 按规则分发Topic
+-----------------------+
复制代码
关键开发步调
3.1 扩展Canal Admin实现配置管理
目的:将配置从文件迁移到数据库,支持页面增删改查
代码示例:
// 自定义配置DAO(伪代码)
public interface InstanceConfigDAO {
List<InstanceConfig> listAll();
InstanceConfig getByDestination(String destination);
void save(InstanceConfig config);
}
// 重写InstanceConfigMonitor
public class DBInstanceConfigMonitor extends AbstractCanalLifeCycle
implements InstanceConfigMonitor {
@Override
public InstanceConfig getInstanceConfig(String destination) {
return instanceConfigDAO.getByDestination(destination);
}
}
复制代码
3.2 开发Web管理界面
技能选型:
前端:Vue + Element UI
后端:Spring Boot(集成Canal Admin API)
功能点:
库表监听规则配置(支持正则表达式)
Kafka Topic映射(如:db1.table1 → topic_db1_table1)
监控看板(同步延迟、消息堆积)
复制代码
3.3 改造Canal Kafka Producer
目的:根据配置动态选择Topic
代码示例:
// 自定义Topic路由策略
public class DynamicTopicProducer extends KafkaFlatMessageProducer {
@Override
protected String getTopic(CanalEntry.Entry entry) {
String schema = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
// 从配置中心获取schema.table对应的topic
return configService.getTopic(schema, table);
}
}
// 修改canal.properties启用自定义Producer
canal.mq.producerClass=com.yourcompany.DynamicTopicProducer
复制代码
3.4 配置Kafka序列化
修改MQ配置:
# kafka生产端配置
kafka.bootstrap.servers=127.0.0.1:9092
kafka.acks=all
kafka.compression.type=snappy
kafka.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
复制代码
五、摆设与验证
摆设拓扑
[MySQL] ←→ [Canal Server集群] ←→ [Kafka集群] ←→ [业务消费者]
↑
[配置管理后台]
复制代码
验证流程
在管理界面添加规则:监听db1.order表 → 推送到topic_orders
实行MySQL插入:
INSERT INTO db1.order VALUES (1001, 'test');
复制代码
查抄Kafka消息:
kafka-console-consumer.sh --topic topic_orders --bootstrap-server localhost:9092
复制代码
预期输出:
{
"data": [{"id":1001, "name":"test"}],
"type": "INSERT",
"database": "db1",
"table": "order"
}
复制代码
更加详情代码和配置细节
六、注意事项
MySQL权限:Canal账号需
SELECT, REPLICATION SLAVE, REPLICATION CLIENT
Kafka分区策略:发起按主键哈希分区保证顺序性
监控诉警:重点关注:
Canal Server内存堆积(canal.memory.buffer.size)
Kafka生产耽误(canal.mq.send.thread.size)
数据一致性:
启用ACOM机制(canal.instance.global.mode=spring)
消费者需处置惩罚幂等(通过binlog的gtid或主键)
七、扩展优化发起
DDL同步:捕获ALTER TABLE事故并广播到全部消费者
数据过滤:在Canal Server层增加EventFilter拦截无关变更
分库分表合并:通过Canal Adapter将多个分表数据聚合到同一Topic
监控集成:对接Prometheus + Grafana展示同步耽误曲线
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4