Flink实战 | PostgresCDC整库同步Kafka最佳实践
1. 环境预备
本篇内容不涉及组件的安装(Flink,Kafka,Dinky)
利用的组件版本如下
- Flink 1.18.1
- kafka2.8.2(假如你的flink也是flink1.18.X建议利用 kafka2.8.X以上版本,详见本文1.5.2)
- Dinky 1.1.0
- PostgreSql12.5
1.1 Docker中部署Dinky
1.2 安装Flink环境
1.3 安装PostgreSql环境
1.3.1 创建pg容器
- docker pull postgres:12.5
复制代码
- 创建并启动 PostgreSQL 容器设置密码和端口
- docker run -d -p 6432:5432 --name postgres-12.5 -e POSTGRES_PASSWORD=postgres postgres:12.5
复制代码
- docker ps | grep postgres-12.5
复制代码
- docker exec -it postgres-12.5 bash
复制代码 1.3.2 安装安装decoderbufs插件
否则会报错如下,详见本文3.1:
Caused by: org.postgresql.util.PSQLException: ERROR: could not access file “decoderbufs”: No such file or directory
可以参考debezium/postgres-decoderbufs的README.md举行安装,不过也有一些坑,详细如下。
1.3.2.1 docker更换apt镜像源和PG镜像源
- 更换apt镜像源
建议中科大的源(试了其他源都有些许问题)
- tee /etc/apt/sources.list <<-'EOF'
- deb http://mirrors.ustc.edu.cn/debian buster main non-free
- deb http://mirrors.ustc.edu.cn/debian buster-updates main non-free
- deb http://mirrors.ustc.edu.cn/debian-security/ buster/updates main non-free
- EOF
复制代码
- 更换PG的apt源
- [/code]
- [/list] apt-get install -y lsb-release
- - 添加[华为PG镜像站](https://mirrors.huaweicloud.com/mirrorDetail/5ee08ea8cdd8d3491d77c4af?mirrorName=postgresql&catalog=tool)地点配置: shell
- sh -c ‘echo “deb https://mirrors.huaweicloud.com/postgresql/repos/apt $(lsb_release -cs)-pgdg main” > /etc/apt/sources.list.d/pgdg.list’
- - 导入镜像源署名key: shell
- wget --quiet -O - https://mirrors.huaweicloud.com/postgresql/repos/apt/ACCC4CF8.asc | apt-key add -
- ```
- [size=1]1.3.2.2 安装依赖[/size]
- [code]# Core build utilities
- apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git
- # PostGIS依赖
- apt-get install -f -y libproj-dev liblwgeom-dev
- # Protobuf-c依赖
- apt-get install -y libprotobuf-c-dev
- # 安装 ProtoBuf C 编译器
- apt-get install -y protobuf-c-compiler
复制代码 1.3.2.3 构建和安装postgres-decoderbufs
- ```sh
- git clone https://github.com/debezium/postgres-decoderbufs.git
- cd postgres-decoderbufs
复制代码
- export PATH=/usr/lib/postgresql/12/bin:$PATH
- make
- make install
复制代码 1.3.3 PG中启用插件和逻辑复制
- cat << EOF >> /var/lib/postgresql/data/postgresql.conf
- # 更改wal日志方式为logical(方式有:minimal、replica 、logical )
- wal_level = logical
- # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
- max_replication_slots = 20
- # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
- max_wal_senders = 20
- # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
- wal_sender_timeout = 180s
- # MODULES
- shared_preload_libraries = 'decoderbufs'
- EOF
复制代码
- docker restart postgres-12.5
复制代码
- psql -h localhost -p 6432 -U postgres
复制代码
- -- 创建数据库 test_db
- CREATE DATABASE test_db;
- -- 列出数据库
- \l
- -- 连接到新创建的数据库 test_db
- \c test_db
- -- 建表语句不做演示
复制代码
- -- 设置发布为true
- update pg_publication set puballtables=true where pubname is not null;
- -- 把所有表进行发布
- CREATE PUBLICATION dbz_publication FOR ALL TABLES;
- -- 查询哪些表已经发布
- select * from pg_publication_tables;
复制代码
- -- 更改复制标识包含更新和删除之前值,默认为NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
- --修改单表
- ALTER TABLE table1 REPLICA IDENTITY FULL;
- --修改schema下的所有表
- DO $$
- DECLARE
- rec RECORD;
- schema_name TEXT := 'public'; -- 请用你的模式名替换
- BEGIN
- -- 为每个表生成动态ALTER TABLE命令并执行
- FOR rec IN
- SELECT tablename
- FROM pg_tables
- WHERE schemaname = schema_name
- LOOP
- EXECUTE format('ALTER TABLE %I.%I REPLICA IDENTITY FULL;', schema_name, rec.tablename);
- RAISE NOTICE 'Altered table: %.%', schema_name, rec.tablename;
- END LOOP;
- END $$;
- -- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
- --查询单表
- select relreplident from pg_class where relname='table1';
- --查询schema下的所有表
- SELECT relname, relreplident FROM pg_class WHERE relnamespace = 'public'::regnamespace;
复制代码 2. 开辟实战
2.1 在Dinky中开辟FlinkSql
- 根据CDC整库同步概述,上传好jar包
- 在Dinky中创建flinksql任务,编写CDC整库同步sql
- 我这里参考MySQLCDC 整库到 Kafka | Dinky,不指定 sink.topic 参数,所有 Change Log 会被写入对应库表名的 topic。
- ADD CUSTOMJAR 'rs:/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar';
- ADD CUSTOMJAR 'rs:/flink/lib/flink-sql-connector-postgres-cdc-3.2.0.jar';
- EXECUTE CDCSOURCE cdc_m3ods_kafka_all WITH (
- 'connector' = 'postgres-cdc',
- 'hostname' = 'your_pg_Hostname',
- 'port' = 'your_pg_port',
- 'username' = 'postgres',
- 'password' = 'postgres',
- 'database-name' = 'test_db',
- 'schema-name' = 'your_pg_schema',
- 'checkpoint' = '10000',
- 'scan.startup.mode' = 'initial',
- 'parallelism' = '1',
- 'sink.connector'='datastream-kafka',
- 'sink.brokers'='broker1:9092,broker2:9092,broker3:9092',
- 'sink.properties.transaction.timeout.ms'='600000'
- )
复制代码 2.2 查看Dinky运维界面
2.3 查看Flink WebUI
2.3 查看kafka消息
- kafka-topics.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list
- kafka-console-consumer.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --from-beginning --topic your_topic
复制代码 可以看到消息内容大抵如下
- {
- "before": null,
- "after": {
- "col1": "1",
- "col2": "1"
- },
- "op": "c",
- "source": {
- "db": "default_namespace",
- "table": "table1"
- }
- }
复制代码 3. 错误集锦
3.1 缺少decoderbufs插件
- Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory
- at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
- at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
- at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
- at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
- at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
- at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:341) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
- at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:326) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
- at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:302) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
- at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:297) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
- at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:459) ~
复制代码 办理:安装decoderbufs插件
3.2 没有allTopicNames()方法
办理:更换kafka版本至2.8.2+
- 2.7源码查询allTopicNames结果
- 2.8.2源码查询allTopicNames结果
Tip:Apache Kafka3.0.0开始不再支持java8
3.3 Dinky包缺失
办理 :将dinky/jar/下的dinky-app-1.18-1.1.0-jar-with-dependencies.jar包放入flink/lib下
|