Flink CDC 监听 Postgresql表的变化

打印 上一主题 下一主题

主题 558|帖子 558|积分 1674

前言

最近看文章说如何把Postgresql的数据同步给别的数据源,可以利用它的WAL,具体怎么操作没有说,我自己找到一篇文章https://www.cnblogs.com/xiongmozhou/p/14817641.html 可以利用Flink CDC。 我自己正好前段时间也看过Flink,把这个知识串起来也很有意义,于是开始动手试了一下,期间也遇到些困难,也尝试解决了,有些原理不是很清晰,记录下来,后面看能不能解决。
Postgresql配置

我们使用上篇文章搭建的Postgresql数据库,要让Postgresql支持同步给其它数据源,一个最关键的配置是更改wal日志方式为logical, 这个配置在postgresql.conf, 而我们docker里面的postgresql.conf这个配置又在哪个目录呢? 网上找到了答案:https://stackoverflow.com/questions/30848670/how-to-customize-the-configuration-file-of-the-official-postgresql-docker-image
进入psql后,使用如下命令
  1. SHOW config_file;
复制代码
得到如下的结果
/var/lib/postgresql/data/postgresql.conf
得到路径后, 我打算像平时一样用vi去修改,发现不行,这个postgresql的Image并没有安装vim。
如何修改呢,继续网上找答案 https://stackoverflow.com/questions/30848670/how-to-customize-the-configuration-file-of-the-official-postgresql-docker-image
方法很多,我们用个简单的,使用sed命令来修改
  1. sed -i -e"s/^#wal_level = replica.*$/wal_level = logical/" /var/lib/postgresql/data/postgresql.conf
复制代码
就是查找到“#wal_level = replica“,把它替换为“wal_level = logical”
修改后需要重启postgresql,执行如下命令
  1. su - postgres -c "PGDATA=$PGDATA /usr/lib/postgresql/15/bin/pg_ctl -w restart"
复制代码
执行后会退出docker,需要重新进入
新建用户和授予权限参考https://www.cnblogs.com/xiongmozhou/p/14817641.html
注意文档中使用CREATE USER user它建的用户是user,我用的这个用户名是不成功的,提示语法错误
感觉是把user当作保留命令参数了,用户名改为user1可以成功。
使用flink-connector-postgres-cdc

我们参考官方文档https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#connector-options
首先在已有的Flink项目中加入如下的pom
  1.         <dependency>
  2.             <groupId>com.ververica</groupId>
  3.             <artifactId>flink-connector-postgres-cdc</artifactId>
  4.             <version>2.3.0</version>
  5.             <scope>provided</scope>
  6.         </dependency>
复制代码
这里代码参考文档
  1.         SourceFunction postgreSQLSource = PostgreSQLSource.<String>builder()
  2.                 .hostname("localhost")
  3.                 .port(5432)
  4.                 .database("postgres") // set captured database
  5.                 .tableList("postgres.market_price") // set captured table
  6.                 .username("user1")
  7.                 .password("pwd")
  8.                 .decodingPluginName("pgoutput")
  9.                 .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
  10.                 .build();
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         env
  13.         .addSource(postgreSQLSource)
  14.         .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
  15.         env.execute("Print Postgres Snapshot + WAL");
复制代码
有一点需要注意,官方文档中没有.decodingPluginName("pgoutput"),使用默认的decoderbufs,运行程序会提示
“PSQLException: ERROR: could not access file "decoderbufs": No such file or directory”, 修改成pgoutput,才能成功。 这里应该是要安装插件decoderbufs在Postgresql里面。这里暂时留下这个疑问,后面还有wal2json,看怎么把wal的值转成json格式显示出来。
程序运行起来后我们往表里插入和删除数据,可以在控制台中打印出变化来。
这里直接贴图

这里也有个疑问,我对表操作了三次,结果控制台打印出超过3条的信息,这里应该和是否commit有关
暂时也没有细究。
程序运行后,我们可以使用这个命令查看这个slot,
SELECT * FROM pg_replication_slots;

如果我们直接修改配置,比如把pgoutput改为别的,会提示slot flink已经存在,我们需要在postgresql里面把它先删除掉。
总结

总体上这个流程是打通了,但是对于里面的细节没有深入,比如flink怎么消费,里面的记录怎么显示出来,它里面实现的原理是什么,都需要花时间去研究,先开个头在这里。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

圆咕噜咕噜

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表