Flink CDC(SQL Client)毗连 MySQL 数据库教程

打印 上一主题 下一主题

主题 1777|帖子 1777|积分 5331

Flink CDC(SQL Client)毗连 MySQL 数据库教程

这篇文章将指导怎样使用 Flink CDC 毗连到 MySQL 数据库,并捕获数据变动。我们将逐步完成以下操作:
1. 检查 Binlog 是否启用

首先,您须要确保 MySQL 的 Binlog 功能已经启用,由于 Flink CDC 依赖于 Binlog 来捕获数据变动。
  1. -- 检查 Binlog 是否启用
  2. SHOW VARIABLES LIKE 'log_bin';
复制代码
如果 log_bin 返回的不是 ON 大概一个文件名,那么 Binlog 没有启用。
2. 检查 Binlog 格式

  1. -- 检查 Binlog 格式
  2. SHOW VARIABLES LIKE 'binlog_format';
复制代码
确保 binlog_format 是 ROW。如果不是,您须要修改配置以启用 Binlog 并设置正确的格式。
3. 开启 Binlog 并配置相关参数

如果 log_bin 的值为 OFF,这意味着 MySQL 的二进制日志(Binlog)功能没有开启。以下是开启 Binlog 并配置相关参数的步骤:
3.1 编辑 MySQL 配置文件

找到 MySQL 的配置文件 /etc/my.cn/。
在 [mysqld] 部门添加或修改以下配置:
  1. [mysqld]
  2. log_bin = mysql-bin
  3. binlog_format = ROW
  4. server_id = 1
复制代码


  • log_bin 设置 Binlog 的日志文件名前缀。
  • binlog_format 设置为 ROW,这是 Flink CDC 所需的格式。
  • server_id 设置为一个唯一的整数,用于标识 MySQL 服务器。
3.2 重启 MySQL 服务

生存配置文件后,重启 MySQL 服务以使更改生效。重启命令取决于您的操作体系:


  • 对于 Linux/Unix:
    1. sudo systemctl restart mysql
    复制代码
    大概
    1. sudo service mysql restart
    复制代码
4. 创建 CDC 用户

创建一个具有恰当权限的 MySQL 用户,以便 Flink CDC 可以毗连到 MySQL 数据库并监控数据变化:
  1. CREATE USER 'flinkcdc'@'%' IDENTIFIED BY 'FlinkCDC_123456';
  2. GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkcdc' IDENTIFIED BY 'FlinkCDC_123456';
  3. FLUSH PRIVILEGES;
复制代码
5. 创建 MySQL 表和插入示例数据

以下是一个名为 users 的表,包含 id(主键)、name(姓名)和 age(年事)三个字段的创建语句:
  1. -- 创建数据库(如果不存在)
  2. CREATE DATABASE IF NOT EXISTS cdc;
  3. USE cdc;
  4. -- 创建表
  5. CREATE TABLE users (
  6.   id INT AUTO_INCREMENT PRIMARY KEY,
  7.   name VARCHAR(50) NOT NULL,
  8.   age INT NOT NULL
  9. ) ENGINE=InnoDB;
  10. -- 插入示例数据
  11. INSERT INTO users (name, age) VALUES ('Alice', 30);
  12. INSERT INTO users (name, age) VALUES ('Bob', 25);
  13. INSERT INTO users (name, age) VALUES ('Charlie', 35);
  14. INSERT INTO users (name, age) VALUES ('David', 40);
  15. INSERT INTO users (name, age) VALUES ('Eve', 22);
复制代码
6. 下载 Flink CDC JAR 包

下载 Flink CDC JAR 包并放到 Flink 安装目录下的 lib 目录中。您可以从 Maven 中央堆栈下载 flink-sql-connector-mysql-cdc 2.3.0 版本:
Flink SQL Connector for MySQL CDC
请留意,官网提示下载的谁人包缺少一些依赖在执行时会报错,须要下载 20M 以上的 JAR 包。
  1. [ERROR] Could not execute SQL statement. Reason:
  2. java.lang.ClassNotFoundException: com.ververica.cdc.debezium.utils.ResolvedSchemaUtils
复制代码

7. 启动 Flink 集群

须要先启动 Flink 集群,否则背面会提示无法毗连:[ERROR] Could not execute SQL statement. Reason:java.net.ConnectException: 拒绝毗连
  1. start-cluster.sh
复制代码
8. 在 Flink SQL Client 中执行

打开 sql-client.sh 执行以下命令:
  1. CREATE TABLE mysql_source (
  2.   id INT NOT NULL,
  3.   name STRING,
  4.   age INT,
  5.   PRIMARY KEY (id) NOT ENFORCED
  6. ) WITH (
  7.     'connector' = 'mysql-cdc',
  8.     'hostname' = '192.168.56.152',
  9.     'port' = '3306',
  10.     'username' = 'flinkcdc',
  11.     'password' = 'FlinkCDC_123456',
  12.     'database-name' = 'cdc',
  13.     'table-name' = 'users'
  14. );
  15. SELECT * FROM mysql_source;
复制代码

通过以上步骤,您应该能够成功使用 Flink CDC 毗连到 MySQL 数据库并捕获数据变动。如果在执行过程中遇到任何标题,请检查配置和网络设置,确保所有服务正常运行。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

南飓风

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表