ToB企服应用市场:ToB评测及商务社交产业平台
标题:
使用flink-sql同步表数据的记录
[打印本页]
作者:
乌市泽哥
时间:
昨天 07:48
标题:
使用flink-sql同步表数据的记录
作为一个单纯的数分小白,总是在理想有一个完善的同步工具。在简单折腾之后就能把那些烦人的运营数据实时拉取过来,供我的分析使用。
这个工具要有以下有点:
1.不能对源业务数据库造成压力,不然拉挂的第二天就得上吐槽大会
2.我可以选择本身想要的表,别搞什么梭哈整库同步。本来业务运营都不乐意了,这么梭哈,只会让业务同砚更近绷紧神经。
3.简单设置,最好别写许多多少代码。我懂的不多,也不敢瞎搞。
想了一圈,我看中了flink这个浓眉大眼的家伙。。。。
第一步:环境搭建
首先确保你的机器安装了java环境。
/opt/flink-1.18.1/bin# java -version
openjdk version "17.0.13" 2024-10-15
OpenJDK Runtime Environment (build 17.0.13+11-Debian-2deb12u1)
OpenJDK 64-Bit Server VM (build 17.0.13+11-Debian-2deb12u1, mixed mode, sharing)
复制代码
然后去网上下载一个flink代码包:Apache Download Mirrors
https://www.apache.org/dyn/closer.lua/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
解压之后,看看flink-1.18.1/conf/flink-conf.yaml 中有没有想改的,要不就改个这行方便访问web页面:
historyserver.web.address: 1x.1x.xx.xx
复制代码
直接启动
./bin/start-cluster.sh
复制代码
访问IP:8081就能看到web页面了。
制止命令:
./bin/stop-cluster.sh
复制代码
跑个demo试试水
/opt/flink-1.18.1# ./bin/flink run examples/streaming/WordCount.jar
Executing example with default input data.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 51b6303d317d0ddba2db34692e33318e
Program execution finished
Job with JobID 51b6303d317d0ddba2db34692e33318e has finished.
Job Runtime: 486 ms
复制代码
查看实验效果:
/opt/flink-1.18.1# tail log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)
复制代码
好的跑起来问题不大。接下来思绪就是如何使用flink-cdc的本领,将mysql表的数据抽取同步到postgres。
1.mysql要开启log_bin
相干设置:
这是开启了的标志
show variables like '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+--------------------------------+
6 rows in set (0.06 sec)
复制代码
详细设置:
[mysqld]
# By default we only accept connections from localhost
bind-address = 0.0.0.0
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
character-set-server=utf8mb4
#设置日志三种格式:STATEMENT、ROW、MIXED 。
binlog_format = ROW
#设置日志路径,注意路经需要mysql用户有权限写,这里可以写绝对路径,也可以直接写mysql-bin(后者默认就是在/var/lib/mysql目录下)
log-bin=mysql-bin
#设置binlog清理时间
expire_logs_days = 7
#binlog每个日志文件大小
max_binlog_size = 100m
#binlog缓存大小
binlog_cache_size = 4m
#最大binlog缓存大小
max_binlog_cache_size = 512m
#配置serverid
server-id=1
复制代码
2.postgresql 的详细设置
# 更改wal日志方式为logical(方式有:minimal、replica 、logical )
wal_level = logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s
复制代码
数据库改完了都要重启一下才能使用。如果生产里没有这个设置,抱歉搞不了。
3.使用flink-sql
**********************注意********************************************
使用flink-sql同步mysql和postgresql的前置条件,在lib目录下放置flink-sql-connector-postgres-cdc-2.4.1.jar 和flink-sql-connector-mysql-cdc-2.4.1.jar 两个jar包。
下载地点:Maven Repository: com.ververica (mvnrepository.com)
前置条件:在mysql中创建一个products表,随便插入点数据。postgresql中也创建一个同名同结构的表。
在flink-sql中实验
CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'xxxx', 'database-name' = 'test_feng', 'table-name' = 'products' , 'scan.startup.mode' = 'latest-offset' );
复制代码
CREATE TABLE products_sink (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/test_feng',
'table-name' = 'products',
'username' = 'postgres',
'password' = 'xxxxx',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '2s'
);
复制代码
INSERT INTO products_sink
SELECT id, name, decription
FROM products;
复制代码
实验完这三个,在web界面中能看到一个正在运行的使命。同时你查看postgres中的表应该已经同步了一部分数据。
只要这个job不制止工作。数据两张表就会实时同步了。。。。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4