作为一个单纯的数分小白,总是在理想有一个完善的同步工具。在简单折腾之后就能把那些烦人的运营数据实时拉取过来,供我的分析使用。
这个工具要有以下有点:
1.不能对源业务数据库造成压力,不然拉挂的第二天就得上吐槽大会
2.我可以选择本身想要的表,别搞什么梭哈整库同步。本来业务运营都不乐意了,这么梭哈,只会让业务同砚更近绷紧神经。
3.简单设置,最好别写许多多少代码。我懂的不多,也不敢瞎搞。
想了一圈,我看中了flink这个浓眉大眼的家伙。。。。
第一步:环境搭建
首先确保你的机器安装了java环境。
- /opt/flink-1.18.1/bin# java -version
- openjdk version "17.0.13" 2024-10-15
- OpenJDK Runtime Environment (build 17.0.13+11-Debian-2deb12u1)
- OpenJDK 64-Bit Server VM (build 17.0.13+11-Debian-2deb12u1, mixed mode, sharing)
复制代码 然后去网上下载一个flink代码包:Apache Download Mirrorshttps://www.apache.org/dyn/closer.lua/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
解压之后,看看flink-1.18.1/conf/flink-conf.yaml 中有没有想改的,要不就改个这行方便访问web页面:
- historyserver.web.address: 1x.1x.xx.xx
复制代码 直接启动
访问IP:8081就能看到web页面了。
制止命令:
跑个demo试试水
- /opt/flink-1.18.1# ./bin/flink run examples/streaming/WordCount.jar
- Executing example with default input data.
- Use --input to specify file input.
- Printing result to stdout. Use --output to specify output path.
- Job has been submitted with JobID 51b6303d317d0ddba2db34692e33318e
- Program execution finished
- Job with JobID 51b6303d317d0ddba2db34692e33318e has finished.
- Job Runtime: 486 ms
复制代码 查看实验效果:
- /opt/flink-1.18.1# tail log/flink-*-taskexecutor-*.out
- (nymph,1)
- (in,3)
- (thy,1)
- (orisons,1)
- (be,4)
- (all,2)
- (my,1)
- (sins,1)
- (remember,1)
- (d,4)
复制代码 好的跑起来问题不大。接下来思绪就是如何使用flink-cdc的本领,将mysql表的数据抽取同步到postgres。
1.mysql要开启log_bin
相干设置:
这是开启了的标志
- show variables like '%log_bin%';
- +---------------------------------+--------------------------------+
- | Variable_name | Value |
- +---------------------------------+--------------------------------+
- | log_bin | ON |
- | log_bin_basename | /var/lib/mysql/mysql-bin |
- | log_bin_index | /var/lib/mysql/mysql-bin.index |
- | log_bin_trust_function_creators | OFF |
- | log_bin_use_v1_row_events | OFF |
- | sql_log_bin | ON |
- +---------------------------------+--------------------------------+
- 6 rows in set (0.06 sec)
复制代码 详细设置:
- [mysqld]
- # By default we only accept connections from localhost
- bind-address = 0.0.0.0
- # Disabling symbolic-links is recommended to prevent assorted security risks
- symbolic-links=0
- character-set-server=utf8mb4
- #设置日志三种格式:STATEMENT、ROW、MIXED 。
- binlog_format = ROW
- #设置日志路径,注意路经需要mysql用户有权限写,这里可以写绝对路径,也可以直接写mysql-bin(后者默认就是在/var/lib/mysql目录下)
- log-bin=mysql-bin
- #设置binlog清理时间
- expire_logs_days = 7
- #binlog每个日志文件大小
- max_binlog_size = 100m
- #binlog缓存大小
- binlog_cache_size = 4m
- #最大binlog缓存大小
- max_binlog_cache_size = 512m
- #配置serverid
- server-id=1
复制代码 2.postgresql 的详细设置
- # 更改wal日志方式为logical(方式有:minimal、replica 、logical )
- wal_level = logical
- # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
- max_replication_slots = 20
- # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
- max_wal_senders = 20
- # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
- wal_sender_timeout = 180s
复制代码 数据库改完了都要重启一下才能使用。如果生产里没有这个设置,抱歉搞不了。
3.使用flink-sql
**********************注意********************************************
使用flink-sql同步mysql和postgresql的前置条件,在lib目录下放置flink-sql-connector-postgres-cdc-2.4.1.jar 和flink-sql-connector-mysql-cdc-2.4.1.jar 两个jar包。
下载地点:Maven Repository: com.ververica (mvnrepository.com)
前置条件:在mysql中创建一个products表,随便插入点数据。postgresql中也创建一个同名同结构的表。
在flink-sql中实验
- CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'xxxx', 'database-name' = 'test_feng', 'table-name' = 'products' , 'scan.startup.mode' = 'latest-offset' );
复制代码
- CREATE TABLE products_sink (
- id INT,
- name STRING,
- description STRING,
- PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:postgresql://localhost:5432/test_feng',
- 'table-name' = 'products',
- 'username' = 'postgres',
- 'password' = 'xxxxx',
- 'sink.buffer-flush.max-rows' = '1000',
- 'sink.buffer-flush.interval' = '2s'
- );
复制代码- INSERT INTO products_sink
- SELECT id, name, decription
- FROM products;
复制代码 实验完这三个,在web界面中能看到一个正在运行的使命。同时你查看postgres中的表应该已经同步了一部分数据。
只要这个job不制止工作。数据两张表就会实时同步了。。。。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |