目录
1.背景
2.设计思路
3.使用测试
1.背景
如今很多当局项目都在往信创上面迁移,及时处理这块就会涉及到信创的数据库,本文就是通过FlinkCDC来对人大金仓(kingbase)数据库进行处理。
FlinkCDC自己不支持对Kingbase数据库的CDC,但是Kingbase底层是基于Postgresql数据开发,FlinkCDC已经支持了Postgresql的CDC,以是基于 Postgresql的CDC进行修改。
2.设计思路
获取 flink-cdc-connectors 项目
拷贝一份 flink-connector-postgres-cdc 重命名 flink-connector-kb-cdc 进行修改
获取 debezium-v1.9.8.Final 项目
拷贝一份 debezium-connector-postgres 重命名 debezium-connector-kb 进行修改
具体修改哪些这里不详细介绍,最后附上jar包
3.使用测试
本方法是基于逻辑复制的方式实现
PostgreSQL 如今不支持DDL分析,只能分析DML(INSERT、UPDATE、DELETE).
3.1 在Kingbase数据库设置
3.1.1 建测试表
- CREATE TABLE "public"."test_001" (
- "a" int8 NOT NULL,
- "b" timestamp,
- PRIMARY KEY ("a")
- )
- ;
复制代码 3.1.2 设置 wal_level
*实行语句SHOW wal_level,检察返回值是否为'logical',否则进行修改wal日志方式wal_level = logical
3.1.3 创建复制槽
- -- 创建逻辑复制槽
- SELECT * FROM pg_create_logical_replication_slot('replica', 'decoderbufs');
- -- 逻辑解码
- SELECT * FROM pg_logical_slot_peek_changes('replica', NULL, 4096, 'debug-mode', '1');
- 目前测试不支持pgoutput,decoderbufs,wal2json
复制代码 3.1.4 设置发布
- -- 设置发布为true
- update pg_publication set puballtables=true where pubname is not null;
- -- 把所有表进行发布
- CREATE PUBLICATION dbz_publication FOR ALL TABLES;
- -- 查询哪些表已经发布
- select * from pg_publication_tables;
复制代码 3.1.5 更改复制标识
- -- 更改复制标识包含更新和删除之前值(目的是为了确保表 test_001 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
- ALTER TABLE test_001 REPLICA IDENTITY FULL;
- -- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
- select relreplident from pg_class where relname='test_001';
复制代码 3.2 在原来的FlinkCdc项目标pom.xml中添加
- <!-- kingbasees驱动 -->
- <dependency>
- <groupId>cn.com.kingbase</groupId>
- <artifa
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |