从零搭建:Canal实时数据管道打通MySQL与Elasticsearch

打印 上一主题 下一主题

主题 981|帖子 981|积分 2943

     Canal实时同步Mysql   Binlog至   Elasticsearch  



  
一. 环境准备

   

  • 操纵体系:Linux(Ubuntu 20.04)
  • Java 环境:JDK 8+(发起 OpenJDK 11)
  • MySQL:已启用 Binlog(ROW 模式),并创建 Canal 用户
  • Elasticsearch:已摆设(版本 7.x 或 8.x)
  • Canal 二进制包:从 Canal Release 下载 canal.deployer-1.1.8.tar.gz 和 canal.adapter-1.1.8.tar.gz
  1.环境检查



  • 检查Mysql是否开启BinLog
  1. #root账号执行
  2. SHOW VARIABLES LIKE 'log_bin';
  3. SHOW VARIABLES LIKE 'binlog_format';
复制代码
输出如下证实已经打开:

创建 Canal 用户并授权:
  1. #创建用户
  2. CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Password@123';
  3. # 给新创建账户赋予从库权限
  4. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  5. # 刷新权限
  6. FLUSH PRIVILEGES;
复制代码
假如没打开BinLog可以通过如下方法打开:


  • 开启Mysql Binlog
修改my.cnf文件,加入如下内容:
  1. log_bin=mysql-bin
  2. binlog_format=ROW
  3. binlog_expire_logs_seconds=172800
  4. expire_logs_days=2
复制代码
  log_bin:启用二进制日志,日志文件会以 mysql-bin 为前缀,并依次天生日志文件(例如:mysql-bin.000001,mysql-bin.000002 等)。
  binlog_format:设置使用的二进制日志格式,在 MySQL 8.0 版本中,binlog_format 的默认值已经变为 ROW。以是,即使你在配置文件中没有明确设置 binlog_format,MySQL 会默认使用 ROW 作为二进制日志格式。在较早的 MySQL 版本中默认值是 STATEMENT。
  binlog_expire_logs_seconds=172800 和 expire_logs_days=2:这些配置设置了二进制日志的逾期时间(默认环境下,MySQL 会生存二进制日志,直到它们逾期或达到日志文件数的限定)。在这种环境下,日志会在 2 天后逾期。
  配置好后重启Mysql:
  1. systemctl restart mysqld.service
复制代码


  • Java环境检查
  1. echo $JAVA_HOME
复制代码
2.新建测试库和表

  1. CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;
  2. CREATE TABLE `test_user` (
  3.   `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  4.   `name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '姓名',
  5.   `sex` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',
  6.   `tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '电话',
  7.   PRIMARY KEY (`id`) USING BTREE
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
复制代码
3.新建Es索引

  1. curl -X PUT "http://<your es IP>:9200/test_user" -H 'Content-Type: application/json' -u <es账号>:<es 密码> -d'
  2. {
  3.   "mappings": {
  4.     "properties": {
  5.       "id": {
  6.         "type": "long"
  7.       },
  8.       "title": {
  9.         "type": "text"
  10.       },
  11.       "sex": {
  12.         "type": "text"
  13.       },
  14.       "tel": {
  15.         "type": "text"
  16.       }
  17.     }
  18.   }
  19. }
  20. '
复制代码
二.摆设 Canal Server

2.1 解压安装包

  1. # 创建目录
  2. mkdir -p /opt/canal/server /opt/canal/adapter
  3. # 解压 Server
  4. tar -zxvf canal.deployer-1.1.8.tar.gz -C /opt/canal/server
  5. # 解压 Adapter
  6. tar -zxvf canal.adapter-1.1.8.tar.gz -C /opt/canal/adapter
复制代码
2.2 配置 Canal Server

修改配置文件 /opt/canal/server/conf/canal.properties:
  1. # tcp bind ip
  2. canal.ip =127.0.0.1
  3. # register ip to zookeeper
  4. canal.register.ip =
  5. canal.port = 11111
  6. canal.metrics.pull.port = 11112
  7. # 目标实例名称(默认 example)
  8. canal.destinations = example
  9. # 持久化模式(默认内存,可选 H2/MySQL)
  10. canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
复制代码
这里主要修改canal.ip其他保持默认即可。
修改实例配置 /opt/canal/server/conf/example/instance.properties:
  1. #被同步的mysql地址,填写自己的IP地址
  2. canal.instance.master.address=127.0.0.1:3306
  3. #第一步创建的数据库从库权限账号/密码
  4. canal.instance.dbUsername=canal
  5. canal.instance.dbPassword=Password@123
  6. #数据库连接编码
  7. canal.instance.connectionCharset = UTF-8
  8. #Binlog 过滤规则(监控所有库表)
  9. canal.instance.filter.regex=.*\\..*
  10. #指定了 Canal 消费者(比如 MQ 客户端)读取和写入消息的目标主题,保持默认即可
  11. canal.mq.topic=example
复制代码
2.3 启动 Canal Server

  1. cd /opt/canal/server/bin
  2. ./startup.sh
  3. # 查看日志
  4. tail -f /opt/canal/server/logs/canal/canal.log
  5. tail -f /opt/canal/server/logs/example/example.log
复制代码



可以看到日志没有明显报错,且进程已经启动,则表现Canal Server已经启动成功。

三. 摆设 Canal Adapter(同步到 Elasticsearch)

3.1 配置 Adapter

修改配置文件 /opt/canal/adapter/conf/application.yml:
  1. server:
  2.   port: 8081
  3. spring:
  4.   jackson:
  5.     date-format: yyyy-MM-dd HH:mm:ss
  6.     time-zone: GMT+8
  7.     default-property-inclusion: non_null
  8.    
  9. canal.conf:
  10.   mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
  11.   flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  12.   zookeeperHosts:    # 对应集群模式下的zk地址
  13.   syncBatchSize: 1000 # 每次同步的批数量
  14.   retries: 0 # 重试次数, -1为无限重试
  15.   timeout: # 同步超时时间, 单位毫秒
  16.   accessKey:
  17.   secretKey:
  18.   consumerProperties:
  19.     # canal tcp consumer
  20.     canal.tcp.server.host: 127.0.0.1:11111 #配置canal-server的地址
  21.     canal.tcp.zookeeper.hosts:
  22.     canal.tcp.batch.size: 500
  23.     canal.tcp.username:
  24.     canal.tcp.password:
  25.   srcDataSources: # 源数据库配置
  26.     defaultDScanal是测试数据库
  27.       url: jdbc:mysql://<yourIP>:3306/canal?useUnicode=true&useSSL=true #数据库连接,canal是测试用的数据库
  28.       username: root #数据库账号
  29.       password: Pass@1234 #数据库密码
  30.   canalAdapters: # 适配器列表
  31.   - instance: example # canal实例名,和上述Server的配置一样
  32.     groups: # 分组列表
  33.     - groupId: g1 # 分组id, 如果是MQ模式将用到该值
  34.       outerAdapters:
  35.       - name: logger # 日志打印适配器
  36.       - name: es8 # ES同步适配器根据自己的es版本来
  37.         hosts: <your IP>:9200 # ES连接地址
  38.         properties:
  39.           mode: rest # 模式可选transport端口(9300) 或者 rest端口(9200)
  40.           security.auth: elastic:123456 #  连接es的用户和密码,仅rest模式使用
  41.           cluster.name: elasticsearch # ES集群名称
复制代码
怎样获取es集群名称,命令输出的cluster_name就是上面须要配置的集群名字:
  1. curl -u elastic:<esPass> -X GET "http://<es IP>:9200/_cluster/health?pretty"
复制代码

3.2 配置数据映射

创建 Elasticsearch 映射文件 /opt/canal/adapter/conf/es8/mytest_user.yml:
  1. dataSourceKey: defaultDS # 源数据源的key, 对应上面application配置的srcDataSources中的值
  2. destination: example  # canal的instance或者MQ的topic
  3. groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
  4. esMapping:
  5.   _index: test_user # es 的索引名称
  6.   _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  7.   sql: "SELECT
  8.          tb.id AS _id,
  9.          tb.name,
  10.          tb.sex,
  11.          tb.tel
  12.         FROM
  13.          test_user us"        # sql映射
  14.   etlCondition: "where p.id>={}"   #etl的条件参数
  15.   commitBatch: 3000   # 提交批大小
复制代码
3.3 启动 Adapter

  1. cd /opt/canal/adapter/bin
  2. ./startup.sh
  3. #查看日志
  4. tail -f /opt/canal/adapter/logs/adapter/adapter.log
复制代码
会输出很多数据库变动的日志:


4. 验证同步

4.1 插入测试数据到 MySQL

  1. #执行sql
  2. INSERT INTO test_user (name, sex, tel) VALUES ('Paco', '男', '123456789');
复制代码


4.2 查询 Elasticsearch

  1. curl -u elastic:<esPass> -X GET "http://<esIP>:9200/test_user/_search?pretty"
复制代码
也可以在工具上查看,这边是Eage插件:


至此,即可验证可同步成功。我们可以修改数据测试,看是否能同步。


然后我们测试修改Es的数据:


可以发现数据库并没有变,至此Canal单向实时同步Mysql Binlog至Elasticsearch就配置完成了。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

风雨同行

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表