java整合Canal实现数据库监听(附完整的踩坑总结)

打印 上一主题 下一主题

主题 1002|帖子 1002|积分 3006

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
1. 预备工作

1.1. MySQL开启binlog日志

  1. [mysqld]
  2. log-bin=mysql-bin # 开启 binlog
  3. binlog-format=ROW # 选择 ROW 模式
  4. server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
复制代码
修改完成后输入: show variables like 'log_bin';查看binlog是否生效
  1. show variables like 'log_bin';
复制代码


1.2. 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限

  1. -- 创建一个用户名密码均为canal的账户
  2. CREATE USER canal IDENTIFIED BY 'canal';  
  3. -- 授予用户'canal'在所有数据库和表上进行SELECT查询以及复制从库和复制客户端的权限,并且允许该用户从任何主机连接到数据库服务器。
  4. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  5. -- 刷新权限
  6. FLUSH PRIVILEGES;
复制代码

2. 正式开始

2.1. 采用doker部署canal-server

2.1.1. 使用docker拉取最新版canal服务端

  1. docker pull canal/canal-server:latest
复制代码
2.1.2. 使用docker run命令创建容器并挂载设置文件所在的数据卷

  1. # -v 本地的instance.properties:容器的instance.properties将容器的instance.properties配置文件挂载到宿主机,方便后续变更
  2. docker run \
  3. --name mycanal \
  4. -p 11111:11111 \
  5. -v /tmp/canal/conf2/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
  6. -d \
  7. canal/canal-server
复制代码
2.2. 修改设置文件

进入数据卷挂载的目录新建一个instance.properties文件 修改设置如下
  1. ## mysql serverId 不能和mysql相同
  2. canal.instance.mysql.slaveId = 1234  
  3. #position info,需要改成自己的数据库信息
  4. canal.instance.master.address = 127.0.0.1:3306
  5. canal.instance.master.journal.name =
  6. canal.instance.master.position =
  7. canal.instance.master.timestamp =
  8. #canal.instance.standby.address =
  9. #canal.instance.standby.journal.name =
  10. #canal.instance.standby.position =
  11. #canal.instance.standby.timestamp =
  12. #username/password,需要改成自己的数据库信息 此处采用刚刚创建的用户
  13. canal.instance.dbUsername = canal   
  14. canal.instance.dbPassword = canal
  15. canal.instance.defaultDatabaseName =
  16. canal.instance.connectionCharset = UTF-8
  17. #需要改成同步的数据库表规则
  18. canal.instance.filter.regex = .\*\\\\..\*
复制代码
  设置参考 此处引用csdn博客
  mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:
  

  • 所有表:.   or  .\..*
  • canal schema下所有表: canal\..*
  • canal下的以canal打头的表:canal\.canal.*
  • canal schema下的一张表:canal.test1
  • 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
    注意:此过滤条件只针对row模式的数据有用(ps. mixed/statement由于不解析sql,以是无法准确提取tableName举行过滤)
  修改完成后重启 canal容器即可。
  1. docker restart  mycanal;
复制代码
至此,canal服务端就算设置完成
3. java示例

3.1. 导入相关maven依靠

  1. <dependency>
  2.     <groupId>com.alibaba.otter</groupId>
  3.     <artifactId>canal.client</artifactId>
  4.     <version>1.1.0</version>
  5. </dependency>
复制代码
3.2. 创建官方示例代码

  1. import java.net.InetSocketAddress;
  2. import java.util.List;
  3. import com.alibaba.otter.canal.client.CanalConnectors;
  4. import com.alibaba.otter.canal.client.CanalConnector;
  5. import com.alibaba.otter.canal.common.utils.AddressUtils;
  6. import com.alibaba.otter.canal.protocol.Message;
  7. import com.alibaba.otter.canal.protocol.CanalEntry.Column;
  8. import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
  9. import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
  10. import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
  11. import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
  12. import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
  13. public class SimpleCanalClientExample {
  14. public static void main(String args[]) {
  15.     // 创建链接
  16.     CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
  17.                                                                                         11111), "example", "", "");
  18.     int batchSize = 1000;
  19.     int emptyCount = 0;
  20.     try {
  21.         connector.connect();
  22.         connector.subscribe(".*\\..*");
  23.         connector.rollback();
  24.         int totalEmptyCount = 120;
  25.         while (emptyCount < totalEmptyCount) {
  26.             Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  27.             long batchId = message.getId();
  28.             int size = message.getEntries().size();
  29.             if (batchId == -1 || size == 0) {
  30.                 emptyCount++;
  31.                 System.out.println("empty count : " + emptyCount);
  32.                 try {
  33.                     Thread.sleep(1000);
  34.                 } catch (InterruptedException e) {
  35.                 }
  36.             } else {
  37.                 emptyCount = 0;
  38.                 // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
  39.                 printEntry(message.getEntries());
  40.             }
  41.             connector.ack(batchId); // 提交确认
  42.             // connector.rollback(batchId); // 处理失败, 回滚数据
  43.         }
  44.         System.out.println("empty too many times, exit");
  45.     } finally {
  46.         connector.disconnect();
  47.     }
  48. }
  49. private static void printEntry(List<Entry> entrys) {
  50.     for (Entry entry : entrys) {
  51.         if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  52.             continue;
  53.         }
  54.         RowChange rowChage = null;
  55.         try {
  56.             rowChage = RowChange.parseFrom(entry.getStoreValue());
  57.         } catch (Exception e) {
  58.             throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  59.                                        e);
  60.         }
  61.         EventType eventType = rowChage.getEventType();
  62.         System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
  63.                                          entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  64.                                          entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  65.                                          eventType));
  66.         for (RowData rowData : rowChage.getRowDatasList()) {
  67.             if (eventType == EventType.DELETE) {
  68.                 printColumn(rowData.getBeforeColumnsList());
  69.             } else if (eventType == EventType.INSERT) {
  70.                 printColumn(rowData.getAfterColumnsList());
  71.             } else {
  72.                 System.out.println("-------&gt; before");
  73.                 printColumn(rowData.getBeforeColumnsList());
  74.                 System.out.println("-------&gt; after");
  75.                 printColumn(rowData.getAfterColumnsList());
  76.             }
  77.         }
  78.     }
  79. }
  80. private static void printColumn(List<Column> columns) {
  81.     for (Column column : columns) {
  82.         System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
  83.     }
  84. }
  85.    
  86.   
  87. }
复制代码
3.3. 运行Client

首先启动Canal Server
启动Canal Client后,可以从控制台从看到雷同消息:
  1. empty count : 1
  2. empty count : 2
  3. empty count : 3
  4. empty count : 4
复制代码
此时代表当前数据库无变更数据
改改数据库,如修改或添加数据时,会出现如下提示:
  1. ================&gt; binlog[binlog.000002:7402] , name[mydb,asd] , eventType : UPDATE
  2. -------&gt; before
  3. 123 : 123    update=false
  4. -------&gt; after
  5. 123 : 1233    update=true
复制代码
出现如上消息代表数据库被改变 至此,恭喜你成功设置.

4. 踩坑总结

4.1. java客户端无法连接到canal的服务端

如下图所示:


需要检查下地址的设置,确认能ping通


如果可以ping通但是仍然无法连接,查看是否是捏造机的防火墙是否打开,11111端口可能被拦截。
4.2. java客户端连接到了canal但是修改数据库无日志打印

4.2.1. 缘故原由1: canal无法连接到MySQL


  • 首先检查数据库中是否有设置文件设置的账户和暗码,如本文中的 账号:canal 暗码:canal
  • 如果是MySQL 8.0以上,需要修改身份验证插件
  1. ## 自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password
  2. ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'password';
复制代码

  • 如果是在docker中部署的MySQL,需要注意此时的设置文件 canal.instance.master.address = 127.0.0.1:3306 的地址需要是docker的地址,通过 ifconfig查看


4.2.2. 缘故原由2: MySQL的binlog日志未开启,此处参考第一步开启

4.2.3. 缘故原由3: MySQL的binlog日志的binlog-format非ROW模式,修改即可

4.3. 其他缘故原由,手动查看canal的日志,排查题目

输入 docker ps 查看正在运行的容器
  1. [root@localhost ~]# docker ps
  2. CONTAINER ID   IMAGE                COMMAND                   CREATED          STATUS          PORTS                                                                           NAMES
  3. b4b3328b1a08   canal/canal-server   "/alidata/bin/main.s…"   55 minutes ago   Up 50 minutes   9100/tcp, 11110/tcp, 11112/tcp, 0.0.0.0:11111->11111/tcp, :::11111->11111/tcp   mycanal2
  4. c34cdcfea74e   mysql                "docker-entrypoint.s…"   2 hours ago      Up 2 hours      0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp                            mymysql
  5. 460bf1f5daaf   canal/canal-server   "/alidata/bin/main.s…"   4 hours ago      Up 4 hours      9100/tcp, 11110-11112/tcp                                                       zealous_hermann
复制代码
找到canal的容器id 输入 docker exec -it [容器id] /bin/bash 进入容器内部
进入 /canal-server/logs/example目录输入 cat example.log 查看日志根据具体内容排查
比方:


参考文档:
java整合canal 实现数据同步_java 融合canal-client-CSDN博客
canal官方wiki

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

惊落一身雪

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表