阿里 Canal 实时同步 MySQL 增量数据至 ClickHouse 数据库

打印 上一主题 下一主题

主题 651|帖子 651|积分 1955

重要实现思路


  • 1、在clickhouse中创建MySQL引擎表。
  • 2、根据MySQL引擎表的信息创建目标表。
  • 3、实现canal实时增量同步MySQL数据到clickhouse。
MySQL 的准备

修改设置文件开启 Binlog
  1. [root@hadoop100 module]$ sudo vim /etc/my.cnf
  2. server-id=1
  3. log-bin=mysql-bin
  4. binlog_format=row
  5. binlog-do-db=test
复制代码

  • 注意:binlog-do-db 根据本身的情况进行修改,指定具体要同步的数据库,如果不设置则表现所有数据库均开启 Binlog。
MySQL中需要同步的库表
  1. MySQL [testck]> show databases;
  2. +--------------------+
  3. | Database           |
  4. +--------------------+
  5. | information_schema |
  6. | innodb             |
  7. | mysql              |
  8. | performance_schema |
  9. | sys                |
  10. | testck             |
  11. | tmp                |
  12. +--------------------+
  13. 11 rows in set (0.00 sec)
  14. MySQL [testck]> use testck;
  15. Reading table information for completion of table and column names
  16. You can turn off this feature to get a quicker startup with -A
  17. Database changed
  18. MySQL [testck]> show tables;
  19. +------------------+
  20. | Tables_in_testck |
  21. +------------------+
  22. | t_organization   |
  23. | t_user           |
  24. +------------------+
  25. 2 rows in set (0.00 sec)
  26. MySQL [testck]>
复制代码

  • testck库下的所有表
clickhouse 的准备


  • canal实时同步MySQL的数据需要在clickhouse中提前建好库表
在clickhouse建MySQL引擎的库


  • 把MySQL某个库中的所有表结构信息映射到clickhouse中,这样在clickhouse中就可以远程操作MySQL。
  1. cnnxpredn02 :) CREATE DATABASE t_tmp ENGINE = MySQL('ip:3306', 'testck', 'user', 'pass');
  2. CREATE DATABASE t_tmp
  3. ENGINE = MySQL('ip:3306', 'testck', 'user', 'pass')
  4. Query id: 9f6d7179-3c97-47c2-93ea-abc0c6ca873b
  5. Ok.
  6. 0 rows in set. Elapsed: 0.013 sec.
  7. cnnxpredn02 :) show databases;
  8. SHOW DATABASES
  9. Query id: 5b5baaf5-86f5-46c1-ac34-2618c34462f1
  10. ┌─name────┐
  11. │ default │
  12. │ system  │
  13. │ t_tmp   │
  14. └─────────┘
  15. 3 rows in set. Elapsed: 0.009 sec.
  16. cnnxpredn02 :)
复制代码
创建clickhouse中的库
  1. cnnxpredn02 :) create database testck;
  2. CREATE DATABASE testck
  3. Query id: 397261c0-a8f0-48c6-b4f6-71d121b975b8
  4. Ok.
  5. 0 rows in set. Elapsed: 0.004 sec.
  6. cnnxvopredn02 :) show databases;
  7. SHOW DATABASES
  8. Query id: f467f4bb-99fa-4322-a3c1-e33be74b6e81
  9. ┌─name────┐
  10. │ default │
  11. │ system  │
  12. │ t_tmp   │
  13. │ testck  │
  14. └─────────┘
  15. 4 rows in set. Elapsed: 0.002 sec.
  16. cnnxvpredn02 :)
复制代码
根据创建的MySQL引擎表创建clickhouse目标表结构
  1. cnnxvpredn02 :) create table testck.t_organization as t_tmp.t_organization;
  2. :-] create table testck.t_user as t_tmp.t_user;
  3. CREATE TABLE testck.t_organization AS t_tmp.t_organization
  4. Query id: f942cf5d-701f-4dbd-9ffd-de2206eec851
  5. Ok.
  6. 0 rows in set. Elapsed: 0.006 sec.
  7. CREATE TABLE testck.t_user AS t_tmp.t_user
  8. Query id: ef7dddd7-64b2-4dbc-88be-a105b49aff57
  9. Ok.
  10. 0 rows in set. Elapsed: 0.004 sec.
  11. cnnxvopredn02 :)
复制代码
Canal 的下载和安

下载并解压 Jar 包

  1. mkdir /opt/module/canal
  2. tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal
复制代码
修改 canal.properties 的设置
  1. #################################################
  2. #########               common argument         #############
  3. #################################################
  4. # tcp bind ip
  5. canal.ip =
  6. # register ip to zookeeper
  7. canal.register.ip =
  8. canal.port = 11111
  9. canal.metrics.pull.port = 11112
  10. # canal instance user/passwd
  11. # canal.user = canal
  12. # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
  13. # canal admin config
  14. #canal.admin.manager = 127.0.0.1:8089
  15. canal.admin.port = 11110
  16. canal.admin.user = admin
  17. canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
  18. # admin auto register
  19. #canal.admin.register.auto = true
  20. #canal.admin.register.cluster =
  21. #canal.admin.register.name =
  22. canal.zkServers =
  23. # flush data to zk
  24. canal.zookeeper.flush.period = 1000
  25. canal.withoutNetty = false
  26. # tcp, kafka, rocketMQ, rabbitMQ
  27. canal.serverMode = tcp
  28. # flush meta cursor/parse position to file
  29. canal.file.data.dir = ${canal.conf.dir}
  30. canal.file.flush.period = 1000
  31. ## memory store RingBuffer size, should be Math.pow(2,n)
  32. ......
复制代码

  • 说明:这个文件是canal的基本通用设置,canal端口号默认就是 11111,修改canal的输出model,默认tcp,改为输出到 kafka多实例设置如果创建多个实例,通过前面canal架构,我们可以知道,一个canal服务中可以有多个instance,conf/下的每一个example即是一个实例,每个实例下面都有独立的设置文件。默认只有一个实例example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和设置文件中指定的名称划一,然后修改
    canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。
  1. #################################################
  2. ######### destinations #############
  3. #################################################
  4. canal.destinations = example
复制代码
修改 instance.properties

设置 MySQL 服务器地点
  1. #################################################
  2. ## mysql serverId , v1.0.26+ will autoGen
  3. canal.instance.mysql.slaveId=20
  4. # enable gtid use true/false
  5. canal.instance.gtidon=false
  6. # position info
  7. canal.instance.master.address=hadoop100:3306
复制代码
设置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal
  1. # username/password
  2. canal.instance.dbUsername=canal
  3. canal.instance.dbPassword=canal
  4. canal.instance.connectionCharset = UTF-8
  5. canal.instance.defaultDatabaseName =test
  6. # enable druid Decrypt database password
  7. canal.instance.enableDruid=false
复制代码
启动服务
  1. bin/startup.sh
复制代码

  • 如果看到如下日记则启动乐成
  1. [root@cnnxpredn02 canal]# cat canal.log
  2. 2022-03-07 08:47:21.349 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
  3. 2022-03-07 08:47:21.398 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
  4. 2022-03-07 08:47:21.415 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
  5. 2022-03-07 08:47:21.475 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server):11111]
  6. 2022-03-07 08:47:23.137 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
复制代码
canal-adapter 的下载和安装

下载并解


  • 下载canal-adapter: https://github.com/alibaba/canal/releases
  • 注意:canal-adapter解压后是分散的,我们在指定解压目次的时候需要将 canal-adapter 指定上
    mkdir canal-adapter
    tar -zxvf canal.adapter-1.1.5.tar.gz -C /../canal-adapter
在application.yml中设置MySQL和clickhouse的连接信息
  1. ...
  2. #  srcDataSources:
  3.     defaultDS:
  4.       url: jdbc:mysql://ip:3306/database?useUnicode=true
  5.       username: user
  6.       password: pass
  7.   canalAdapters:
  8.   - instance: test # canal instance Name or mq topic name
  9.     groups:
  10.     - groupId: g1
  11.       outerAdapters:
  12.       - name: logger
  13.       - name: rdb
  14.         key: mysql1
  15.         properties:
  16.           jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
  17.           jdbc.url: jdbc:clickhouse://ip:port/database
  18.           jdbc.username: user
  19.           jdbc.password: pass
  20. #      - name: rdb
  21. ...
复制代码
在/opt/soft/canal-adapter/conf/rdb/mytest_user.yml中设置需要同步的库表信息


  • canal可以同步整个库的所有表和单表的表,设置都在这里进行设置,如果需要同步多张表的数据,就在rdb下创建多个雷同的设置文件,文件名可以用表名来命名。
  1.                 Mirror schema synchronize config
  2.     dataSourceKey: defaultDS
  3.     destination: test
  4.     groupId: g1
  5.     outerAdapterKey: mysql1
  6.     concurrent: true
  7.     dbMapping:
  8.     mirrorDb: true
  9.     database: testck
复制代码

  • 注:我这里是同步testck库的所有表
启动canal-adapter服务
  1. bin/startup.sh
复制代码

  • 在/.../canal-adapter/logs/adapter下查看日记
本文由博客一文多发平台 OpenWrite 发布!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

拉不拉稀肚拉稀

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表