canal实现mysql数据同步
目次1、canal下载
2、mysql同步用户创建和授权
3、canal admin安装和启动
4、canal server安装和启动
5、canal数据同步
5.1、java 端集成监听canal 同步的mysql数据
5.2、kafka同步数据
6、java tcp同步只是此中一种方式,还可以通过kafka、rabbitmq等方式举行数据同步
1、canal下载
canal实现mysql数据同步可以直接安装canal server就可以了,但是为了方便管理(instance配置,canal server状态管理,集群等),必要安装canal admin,应用下载地址:Releases · alibaba/canal · GitHub
进入页面可以选择必要安装的版本
https://i-blog.csdnimg.cn/direct/b5a9279440864c2d900b480104db8d28.png
下载canal.deployer-1.1.8.tar.gz和canal.admin-1.1.8.tar.gz
2、mysql同步用户创建和授权
登录mysql
mysql -h 127.0.0.1 -P 3306 -u root -p
创建同步用户 repl 密码设为123456
CREATE USER 'repl'@'%' IDENTIFIED BY '123456';
给予同步权限
GRANT REPLICATION SLAVE ON *.* to 'repl'@'%' identified by '123456';
给予repl只读test库的权限,test库是用来同步数据的
GRANT SELECT ON test.* to 'repl'@'%' identified by '123456';
canal_manager是canal admin需要的,给予repl对该库的读写权限
GRANT ALL PRIVILEGES ON canal_manager.* to 'repl'@'%' identified by '123456';
mysql my.cnf配置文件增加主从配置master数据库的配置信息
#主数据主从配置 唯一id
server_id=1
#开启logbin
log-bin=mysql-bin
#写入模式 row
binlog-format=ROW
#需要同步的库
binlog-do-db=test
#忽略的数据库
replicate-ignore-db=mysql
replicate-ignore-db=sys
replicate-ignore-db=information_schema
replicate-ignore-db=performance_schema
https://i-blog.csdnimg.cn/direct/a8b7990ba16d4154ae41f2ae9b366040.png
在canal-admin解压文件的conf中有一个canal_manager.sql,导入到master数据库
https://i-blog.csdnimg.cn/direct/58b0eb4b24824cbaafa23ab179012b75.png
3、canal admin安装和启动
把canal.admin-1.1.8.tar.gz上传到linux
解压 tar -zvxf canal.admin-1.1.8.tar.gz
进入conf目次下,编辑application.yml配置文件。
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: repl
password: 123456
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: 123456
重点先容以下几个参数:
address:我们必要订阅(也就是mysql master服务器)mysql地点的服务器IP和数据库端口。
database:canal.admin web系统必须的几张表,必要在mysql master服务器上初始化conf/canal_manager.sql文件。
sername和password就是mysql master服务器创建的用于复制的用户和密码,也就是我们在canal server中配置的repl 和 123456。
driver-class-name:mysql的驱动,默认是MYSQL5的驱动,假如你的MYSQL是8的(我的就是),要将驱动改为com.mysql.cj.jdbc.Driver。
别的,还必要在mysql毗连后面加上allowPublicKeyRetrieval=true,否则启动时,有大概报错。
启动canal.admin
进入bin目次,实行如下下令,启动canal.admin:
./startup.sh 查看 admin 日志
2022-12-10 03:13:58.995 INFOo.s.jmx.export.annotation.AnnotationMBeanExporter -
Located MBean 'dataSource': registering with JMX server as MBean
2022-12-10 03:13:59.015 INFOorg.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-12-10 03:13:59.038 INFOorg.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-12-10 03:13:59.214 INFOo.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-12-10 03:13:59.221 INFOcom.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 14.281 seconds (JVM running for 15.894) 假如出现上述日志,阐明启动成功!
登录admin
通过http://127.0.0.1:8089/访问,默认密码:admin/123456。
留意,IP和密码必要改成你自己配置的。假如是在服务器上配置的,别忘记放开8089端口。
https://i-blog.csdnimg.cn/img_convert/c186d323a58d3fad2eab151916ab77ea.jpeg
输入用户名和密码之后,出现上述页面阐明配置成功!
假如必要修改密码,直接通过实行 select upper(sha1(unhex(sha1('1234567')))) 这个sql得到结果,然后复制到canal_manager库的canal_user表的password字段中就可以了,此中1234567是明文密码,实行上述sql会得到一个密码。
4、canal server安装和启动
把canal.deployer-1.1.8.tar.gz上传到linux
解压 tar -zvxf ccanal.deployer-1.1.8.tar.gz
进入conf目次下,编辑canal.properties配置文件。
留意,假如直接编辑canal.properties,大概无法启动,报如下错误:
https://i-blog.csdnimg.cn/direct/09adc7e2cfdb486bb258d7fe80800050.png
可以通过如下方式修改
mv canal.properties canal.properties_bak
cp canal_local.properties canal.properties
vim canal.properties canal.properties文件全部内容如下:
# register ip
canal.register.ip =
# canal admin configcanalAdmin 的链接、端口、用户名和MD5密码
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
# admin auto register canal server启动后自动注入到canal admin管理模块
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =
一般只必要修改下面这3个
canal.admin.manager = 127.0.0.1:8089
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
启动canal.server
进入bin目次,实行如下下令,启动canal.server:
./startup.sh 查看canal日志
https://i-blog.csdnimg.cn/direct/c32d4a6308a5491d8e5eb293dd87a553.png
启动后,canalAdmin的server管理模块,对应创建的canal server会动态识别到,状态变为启动
https://i-blog.csdnimg.cn/direct/38f0166d3152449e9428a66bc6a57664.png
5、canal数据同步
5.1、java 端集成监听canal 同步的mysql数据
1、引入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency> 2、编写测试代码
package com.hy.das.config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient implements InitializingBean{
private final static int BATCH_SIZE = 1000;
@Override
public void afterPropertiesSet() throws Exception {
// 创建链接 此处的11111为tcp端口 在canal admin Server管理模块可以查看
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
"test", "", "");
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
System.out.println(message.getEntries().size());
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println("----------------");
//如果有数据,处理数据
//遍历entries,单条解析
for (CanalEntry.Entry entry : message.getEntries()) {
//获取表名
String tableName = entry.getHeader().getTableName();
//获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
//获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
//判断entry类型是否为ROWDATA类型
if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
//反序列化
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//获取当前事件操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
//获取数据集
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
//遍历
for (CanalEntry.RowData rowData : rowDatasList) {
//改变前数据
JSONObject jsonObjectBefore = new JSONObject();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
jsonObjectBefore.put(column.getName(),column.getValue());
}
//改变后数据
JSONObject jsonObjectAfter = new JSONObject();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
jsonObjectAfter.put(column.getName(),column.getValue());
}
System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
}
}else {
System.out.println("当前操作类型为:"+entryType);
}
}
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
}
https://i-blog.csdnimg.cn/direct/4e3d23bc2e5b452dbe2cc0f132c1e1f8.png
newSingleConnector方法里面的test是一个instance实列,界说了必要同步的master库的信息(ip、端口、用户名、密码、binlog文件名称、同步位置、必要同步的库、不必要同步的库等)
在canal admin web管理界面的Instance 管理模块,点击新建Instance举行创建,新建页面的Instance名称就是test,这个可以任意填写,代码对应修改就行,所属集群/主机,因为我这里是单机摆设,直接选择自动注入的canal server就行,点击载入模板,获取配置初始信息,下图中标出的信息按照现实的修改填入就行,点击保存后,启动这个Instance。
https://i-blog.csdnimg.cn/direct/0eb509ffe35f4428987f4348245989b8.png
https://i-blog.csdnimg.cn/direct/9fe77661d57946ce9b790502ee38629a.png
3、启动服务,对test库的sys_user表举行数据更新,可以看到背景已经收到变更数据
https://i-blog.csdnimg.cn/direct/1230291bb0e942159e1a163dfd848d41.png
5.2、kafka同步数据
1:canal.properties配置文件增加如下配置
#数据变更发送到kafka
# 设置输出目标为 kafka
canal.serverMode = kafka
# Kafka 地址
canal.mq.servers= xx.xx.xx.xx:9092
# 投递失败的重试次数,默认0,改为2
canal.mq.retries = 2
# Kafka batch.size,即producer一个微批次的大小,默认16K,这里加倍
canal.mq.batchSize = 32768
# Kafka max.request.size,即一个请求的最大大小,默认1M,这里也加倍
canal.mq.maxRequestSize = 2097152
# Kafka linger.ms,即sender线程在检查微批次是否就绪时的超时,默认0ms,这里改为200ms
# 满足batch.size和linger.ms其中之一,就会发送消息
canal.mq.lingerMs = 200
# Kafka buffer.memory,缓存大小,默认32M
canal.mq.bufferMemory = 33554432
# 获取binlog数据的批次大小,默认50
canal.mq.canalBatchSize = 50
# 获取binlog数据的超时时间,默认200ms
canal.mq.canalGetTimeout = 200
# 是否将binlog转为JSON格式。如果为false,就是原生Protobuf格式
canal.mq.flatMessage = true
# 压缩类型,官方文档没有描述
canal.mq.compressionType = none
# Kafka acks,默认all,表示分区leader会等所有follower同步完才给producer发送ack
# 0表示不等待ack,1表示leader写入完毕之后直接ack
canal.mq.acks = all
# Kafka消息投递是否使用事务
# 主要针对flatMessage的异步发送和动态多topic消息投递进行事务控制来保持和Canal binlog位置的一致性
# flatMessage模式下建议开启
canal.mq.transaction = true
https://i-blog.csdnimg.cn/direct/00301d856f5c44bbb7a1464d3aececd0.png
2:在canal admin web界面修改instance mq配置,增加数据同步到kakfa的topic
https://i-blog.csdnimg.cn/direct/4a4c77d8fc4c45c59aaabb0ef1715c24.png
3:如上两步配置完成重启后,在kafka监听配置的topic就可以接收到数据了
6、java tcp同步只是此中一种方式,还可以通过kafka、rabbitmq等方式举行数据同步
留意上面必要提供对外访问的端口必要开通安全组,比如8089、11111等端口。
参考文章:
【CanalAdmin摆设文档】_canal-admin-CSDN博客
https://zhuanlan.zhihu.com/p/590705531
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]