使用flink-sql同步表数据的记录

打印 上一主题 下一主题

主题 901|帖子 901|积分 2703

作为一个单纯的数分小白,总是在理想有一个完善的同步工具。在简单折腾之后就能把那些烦人的运营数据实时拉取过来,供我的分析使用。
这个工具要有以下有点:
1.不能对源业务数据库造成压力,不然拉挂的第二天就得上吐槽大会
2.我可以选择本身想要的表,别搞什么梭哈整库同步。本来业务运营都不乐意了,这么梭哈,只会让业务同砚更近绷紧神经。
3.简单设置,最好别写许多多少代码。我懂的不多,也不敢瞎搞。
想了一圈,我看中了flink这个浓眉大眼的家伙。。。。
第一步:环境搭建
首先确保你的机器安装了java环境。
  1. /opt/flink-1.18.1/bin# java -version
  2. openjdk version "17.0.13" 2024-10-15
  3. OpenJDK Runtime Environment (build 17.0.13+11-Debian-2deb12u1)
  4. OpenJDK 64-Bit Server VM (build 17.0.13+11-Debian-2deb12u1, mixed mode, sharing)
复制代码
然后去网上下载一个flink代码包:Apache Download Mirrors
https://www.apache.org/dyn/closer.lua/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
解压之后,看看flink-1.18.1/conf/flink-conf.yaml 中有没有想改的,要不就改个这行方便访问web页面:
  1. historyserver.web.address: 1x.1x.xx.xx
复制代码
直接启动
  1. ./bin/start-cluster.sh
复制代码
访问IP:8081就能看到web页面了。

制止命令:
  1. ./bin/stop-cluster.sh
复制代码
跑个demo试试水
  1. /opt/flink-1.18.1# ./bin/flink run examples/streaming/WordCount.jar
  2. Executing example with default input data.
  3. Use --input to specify file input.
  4. Printing result to stdout. Use --output to specify output path.
  5. Job has been submitted with JobID 51b6303d317d0ddba2db34692e33318e
  6. Program execution finished
  7. Job with JobID 51b6303d317d0ddba2db34692e33318e has finished.
  8. Job Runtime: 486 ms
复制代码
查看实验效果: 
  1. /opt/flink-1.18.1# tail log/flink-*-taskexecutor-*.out
  2. (nymph,1)
  3. (in,3)
  4. (thy,1)
  5. (orisons,1)
  6. (be,4)
  7. (all,2)
  8. (my,1)
  9. (sins,1)
  10. (remember,1)
  11. (d,4)
复制代码
好的跑起来问题不大。接下来思绪就是如何使用flink-cdc的本领,将mysql表的数据抽取同步到postgres。
1.mysql要开启log_bin 
相干设置:
这是开启了的标志
  1. show variables like '%log_bin%';
  2. +---------------------------------+--------------------------------+
  3. | Variable_name                   | Value                          |
  4. +---------------------------------+--------------------------------+
  5. | log_bin                         | ON                             |
  6. | log_bin_basename                | /var/lib/mysql/mysql-bin       |
  7. | log_bin_index                   | /var/lib/mysql/mysql-bin.index |
  8. | log_bin_trust_function_creators | OFF                            |
  9. | log_bin_use_v1_row_events       | OFF                            |
  10. | sql_log_bin                     | ON                             |
  11. +---------------------------------+--------------------------------+
  12. 6 rows in set (0.06 sec)
复制代码
详细设置:
  1. [mysqld]
  2. # By default we only accept connections from localhost
  3. bind-address    = 0.0.0.0
  4. # Disabling symbolic-links is recommended to prevent assorted security risks
  5. symbolic-links=0
  6. character-set-server=utf8mb4
  7. #设置日志三种格式:STATEMENT、ROW、MIXED 。
  8. binlog_format = ROW
  9. #设置日志路径,注意路经需要mysql用户有权限写,这里可以写绝对路径,也可以直接写mysql-bin(后者默认就是在/var/lib/mysql目录下)
  10. log-bin=mysql-bin
  11. #设置binlog清理时间
  12. expire_logs_days = 7
  13. #binlog每个日志文件大小
  14. max_binlog_size = 100m
  15. #binlog缓存大小
  16. binlog_cache_size = 4m
  17. #最大binlog缓存大小
  18. max_binlog_cache_size = 512m
  19. #配置serverid
  20. server-id=1
复制代码
2.postgresql 的详细设置
  1. # 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
  2. wal_level = logical  
  3. # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
  4. max_replication_slots = 20
  5. # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
  6. max_wal_senders = 20     
  7. # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
  8. wal_sender_timeout = 180s
复制代码
 数据库改完了都要重启一下才能使用。如果生产里没有这个设置,抱歉搞不了。
3.使用flink-sql
**********************注意********************************************
使用flink-sql同步mysql和postgresql的前置条件,在lib目录下放置flink-sql-connector-postgres-cdc-2.4.1.jar 和flink-sql-connector-mysql-cdc-2.4.1.jar 两个jar包。
下载地点:Maven Repository: com.ververica (mvnrepository.com)

前置条件:在mysql中创建一个products表,随便插入点数据。postgresql中也创建一个同名同结构的表。
在flink-sql中实验
  1. CREATE TABLE products (    id INT,    name STRING,    description STRING,    PRIMARY KEY (id) NOT ENFORCED  ) WITH (    'connector' = 'mysql-cdc',    'hostname' = 'localhost',    'port' = '3306',    'username' = 'root',    'password' = 'xxxx',    'database-name' = 'test_feng',    'table-name' = 'products' , 'scan.startup.mode' = 'latest-offset' );
复制代码
 
  1. CREATE TABLE products_sink (
  2.     id INT,
  3.     name STRING,
  4.     description STRING,
  5.     PRIMARY KEY (id) NOT ENFORCED
  6. ) WITH (
  7.     'connector' = 'jdbc',
  8.     'url' = 'jdbc:postgresql://localhost:5432/test_feng',
  9.     'table-name' = 'products',
  10.     'username' = 'postgres',
  11.     'password' = 'xxxxx',
  12.     'sink.buffer-flush.max-rows' = '1000',
  13.     'sink.buffer-flush.interval' = '2s'
  14. );
复制代码
  1. INSERT INTO products_sink
  2. SELECT id, name, decription
  3. FROM products;
复制代码
实验完这三个,在web界面中能看到一个正在运行的使命。同时你查看postgres中的表应该已经同步了一部分数据。 

只要这个job不制止工作。数据两张表就会实时同步了。。。。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

乌市泽哥

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表