Flink实战 | PostgresCDC整库同步Kafka最佳实践
https://i-blog.csdnimg.cn/img_convert/a194b21d9b99953444172c4d1ffbb02b.pngFlink实战 | PostgresCDC整库同步Kafka最佳实践
https://i-blog.csdnimg.cn/img_convert/588a8ac6ed66cfadfd2631f7938673cc.png
1. 环境预备
本篇内容不涉及组件的安装(Flink,Kafka,Dinky)
利用的组件版本如下
[*]Flink 1.18.1
[*]kafka2.8.2(假如你的flink也是flink1.18.X建议利用 kafka2.8.X以上版本,详见本文1.5.2)
[*]Dinky 1.1.0
[*]PostgreSql12.5
1.1 Docker中部署Dinky
1.2 安装Flink环境
1.3 安装PostgreSql环境
1.3.1 创建pg容器
[*]拉取镜像
docker pullpostgres:12.5
[*]创建并启动 PostgreSQL 容器设置密码和端口
docker run -d -p 6432:5432 --name postgres-12.5 -e POSTGRES_PASSWORD=postgres postgres:12.5
[*]查看容器是否创建成功:
docker ps | grep postgres-12.5
[*]进入容器:
docker exec -it postgres-12.5bash
1.3.2 安装安装decoderbufs插件
否则会报错如下,详见本文3.1:
Caused by: org.postgresql.util.PSQLException: ERROR: could not access file “decoderbufs”: No such file or directory
可以参考debezium/postgres-decoderbufs的README.md举行安装,不过也有一些坑,详细如下。
1.3.2.1 docker更换apt镜像源和PG镜像源
[*]更换apt镜像源
建议中科大的源(试了其他源都有些许问题)
tee /etc/apt/sources.list <<-'EOF'
deb http://mirrors.ustc.edu.cn/debian buster main non-free
deb http://mirrors.ustc.edu.cn/debian buster-updates main non-free
deb http://mirrors.ustc.edu.cn/debian-security/ buster/updates main non-free
EOF
[*]更换PG的apt源
[*]安装 lsb-release
apt-get install -y lsb-release
- 添加[华为PG镜像站](https://mirrors.huaweicloud.com/mirrorDetail/5ee08ea8cdd8d3491d77c4af?mirrorName=postgresql&catalog=tool)地点配置: shell
sh -c ‘echo “deb https://mirrors.huaweicloud.com/postgresql/repos/apt $(lsb_release -cs)-pgdg main” > /etc/apt/sources.list.d/pgdg.list’
- 导入镜像源署名key: shell
wget --quiet -O - https://mirrors.huaweicloud.com/postgresql/repos/apt/ACCC4CF8.asc | apt-key add -
```
1.3.2.2 安装依赖
# Core build utilities
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git
# PostGIS依赖
apt-get install -f -y libproj-dev liblwgeom-dev
# Protobuf-c依赖
apt-get install -y libprotobuf-c-dev
# 安装 ProtoBuf C 编译器
apt-get install -y protobuf-c-compiler
1.3.2.3 构建和安装postgres-decoderbufs
[*]拉取源码
```sh
git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
[*]构建和安装
export PATH=/usr/lib/postgresql/12/bin:$PATH
make
make install
1.3.3 PG中启用插件和逻辑复制
[*]编辑postgresql.conf配置文件:
cat << EOF >> /var/lib/postgresql/data/postgresql.conf
# 更改wal日志方式为logical(方式有:minimal、replica 、logical)
wal_level = logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s
# MODULES
shared_preload_libraries = 'decoderbufs'
EOF
[*]重启容器:
docker restart postgres-12.5
[*]连接数据库
psql -h localhost -p 6432 -U postgres
[*]假如查询返回logical体现修改成功:
SHOW wal_level;
[*]建一些表
-- 创建数据库 test_db
CREATE DATABASE test_db;
-- 列出数据库
\l
-- 连接到新创建的数据库 test_db
\c test_db
-- 建表语句不做演示
[*]发布表
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
[*]更改表的复制标识包罗更新和删除的值:
-- 更改复制标识包含更新和删除之前值,默认为NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
--修改单表
ALTER TABLE table1 REPLICA IDENTITY FULL;
--修改schema下的所有表
DO $$
DECLARE
rec RECORD;
schema_name TEXT := 'public'; -- 请用你的模式名替换
BEGIN
-- 为每个表生成动态ALTER TABLE命令并执行
FOR rec IN
SELECT tablename
FROM pg_tables
WHERE schemaname = schema_name
LOOP
EXECUTE format('ALTER TABLE %I.%I REPLICA IDENTITY FULL;', schema_name, rec.tablename);
RAISE NOTICE 'Altered table: %.%', schema_name, rec.tablename;
END LOOP;
END $$;
-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
--查询单表
select relreplident from pg_class where relname='table1';
--查询schema下的所有表
SELECT relname, relreplident FROM pg_class WHERE relnamespace = 'public'::regnamespace;
2. 开辟实战
2.1 在Dinky中开辟FlinkSql
[*]根据CDC整库同步概述,上传好jar包
[*]在Dinky中创建flinksql任务,编写CDC整库同步sql
[*]我这里参考MySQLCDC 整库到 Kafka | Dinky,不指定 sink.topic 参数,所有 Change Log 会被写入对应库表名的 topic。
ADD CUSTOMJAR 'rs:/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar';
ADD CUSTOMJAR 'rs:/flink/lib/flink-sql-connector-postgres-cdc-3.2.0.jar';
EXECUTE CDCSOURCE cdc_m3ods_kafka_all WITH (
'connector' = 'postgres-cdc',
'hostname' = 'your_pg_Hostname',
'port' = 'your_pg_port',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'test_db',
'schema-name' = 'your_pg_schema',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'sink.connector'='datastream-kafka',
'sink.brokers'='broker1:9092,broker2:9092,broker3:9092',
'sink.properties.transaction.timeout.ms'='600000'
)
2.2 查看Dinky运维界面
https://i-blog.csdnimg.cn/img_convert/f4059b9f5ed7aac3d5a5628510a66d63.png
2.3 查看Flink WebUI
https://i-blog.csdnimg.cn/img_convert/c16cd4cea0ca5bb19a7b304c124ed61c.png
2.3 查看kafka消息
kafka-topics.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list
kafka-console-consumer.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --from-beginning --topic your_topic
https://i-blog.csdnimg.cn/img_convert/1a933bbd30d6bf75a71f8b7cc4ad64c2.png可以看到消息内容大抵如下
{
"before": null,
"after": {
"col1": "1",
"col2": "1"
},
"op": "c",
"source": {
"db": "default_namespace",
"table": "table1"
}
}
3. 错误集锦
3.1 缺少decoderbufs插件
Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725) ~
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412) ~
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371) ~
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502) ~
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419) ~
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:341) ~
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:326) ~
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:302) ~
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:297) ~
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:459) ~
办理:安装decoderbufs插件
3.2 没有allTopicNames()方法
https://i-blog.csdnimg.cn/img_convert/7a0f88dc0eff67430270ac280371aba0.png
办理:更换kafka版本至2.8.2+
[*]2.7源码查询allTopicNames结果
https://i-blog.csdnimg.cn/img_convert/3427a6078b393c0082c8619103000fde.png
[*]2.8.2源码查询allTopicNames结果
https://i-blog.csdnimg.cn/img_convert/ff71f7ca9c37d33c14bdefb41c2aea1b.png
Tip:Apache Kafka3.0.0开始不再支持java8
https://i-blog.csdnimg.cn/img_convert/ce6397379358943699b5f11f978487ba.png
3.3 Dinky包缺失
https://i-blog.csdnimg.cn/img_convert/6e26f0f4d594efb2b1193388c352aac1.png
办理 :将dinky/jar/下的dinky-app-1.18-1.1.0-jar-with-dependencies.jar包放入flink/lib下
页:
[1]