.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print Postgres Snapshot + WAL");
复制代码
有一点需要注意,官方文档中没有.decodingPluginName("pgoutput"),使用默认的decoderbufs,运行程序会提示
“PSQLException: ERROR: could not access file "decoderbufs": No such file or directory”, 修改成pgoutput,才能成功。 这里应该是要安装插件decoderbufs在Postgresql里面。这里暂时留下这个疑问,后面还有wal2json,看怎么把wal的值转成json格式显示出来。
程序运行起来后我们往表里插入和删除数据,可以在控制台中打印出变化来。
这里直接贴图
这里也有个疑问,我对表操作了三次,结果控制台打印出超过3条的信息,这里应该和是否commit有关
暂时也没有细究。
程序运行后,我们可以使用这个命令查看这个slot,
SELECT * FROM pg_replication_slots;