Flink实战 | PostgresCDC整库同步Kafka最佳实践

打印 上一主题 下一主题

主题 854|帖子 854|积分 2562


Flink实战 | PostgresCDC整库同步Kafka最佳实践


1. 环境预备

   本篇内容不涉及组件的安装(Flink,Kafka,Dinky)
利用的组件版本如下
  

  • Flink 1.18.1
  • kafka2.8.2(假如你的flink也是flink1.18.X建议利用 kafka2.8.X以上版本,详见本文1.5.2)
  • Dinky 1.1.0
  • PostgreSql12.5
  1.1 Docker中部署Dinky

1.2 安装Flink环境

1.3 安装PostgreSql环境

1.3.1 创建pg容器



  • 拉取镜像
  1. docker pull  postgres:12.5
复制代码


  • 创建并启动 PostgreSQL 容器设置密码和端口
  1. docker run -d -p 6432:5432 --name postgres-12.5 -e POSTGRES_PASSWORD=postgres postgres:12.5
复制代码


  • 查看容器是否创建成功:
  1. docker ps | grep postgres-12.5
复制代码


  • 进入容器:
  1. docker exec -it postgres-12.5  bash
复制代码
1.3.2 安装安装decoderbufs插件

否则会报错如下,详见本文3.1:
   Caused by: org.postgresql.util.PSQLException: ERROR: could not access file “decoderbufs”: No such file or directory
  可以参考debezium/postgres-decoderbufs的README.md举行安装,不过也有一些坑,详细如下。
1.3.2.1 docker更换apt镜像源和PG镜像源



  • 更换apt镜像源
    建议中科大的源(试了其他源都有些许问题)
  1. tee /etc/apt/sources.list <<-'EOF'
  2. deb http://mirrors.ustc.edu.cn/debian buster main non-free
  3. deb http://mirrors.ustc.edu.cn/debian buster-updates main non-free
  4. deb http://mirrors.ustc.edu.cn/debian-security/ buster/updates main non-free
  5. EOF
复制代码


  • 更换PG的apt源

    • 安装 lsb-release
    1. [/code]
    2. [/list] apt-get install -y lsb-release
    3.   - 添加[华为PG镜像站](https://mirrors.huaweicloud.com/mirrorDetail/5ee08ea8cdd8d3491d77c4af?mirrorName=postgresql&catalog=tool)地点配置: shell
    4. sh -c ‘echo “deb https://mirrors.huaweicloud.com/postgresql/repos/apt $(lsb_release -cs)-pgdg main” > /etc/apt/sources.list.d/pgdg.list’
    5.   - 导入镜像源署名key: shell
    6. wget --quiet -O - https://mirrors.huaweicloud.com/postgresql/repos/apt/ACCC4CF8.asc | apt-key add -
    7. ```
    8. [size=1]1.3.2.2 安装依赖[/size]
    9. [code]# Core build utilities
    10. apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git
    11. # PostGIS依赖
    12. apt-get install -f -y libproj-dev liblwgeom-dev
    13. # Protobuf-c依赖
    14. apt-get install -y libprotobuf-c-dev
    15. # 安装 ProtoBuf C 编译器
    16. apt-get install -y protobuf-c-compiler
    复制代码
    1.3.2.3 构建和安装postgres-decoderbufs



    • 拉取源码
    1. ```sh
    2. git clone https://github.com/debezium/postgres-decoderbufs.git
    3. cd postgres-decoderbufs
    复制代码


    • 构建和安装
    1. export PATH=/usr/lib/postgresql/12/bin:$PATH
    2. make
    3. make install
    复制代码
    1.3.3 PG中启用插件和逻辑复制



    • 编辑postgresql.conf配置文件:
    1. cat << EOF >> /var/lib/postgresql/data/postgresql.conf
    2. # 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
    3. wal_level = logical  
    4. # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
    5. max_replication_slots = 20
    6. # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
    7. max_wal_senders = 20
    8. # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
    9. wal_sender_timeout = 180s
    10. # MODULES
    11. shared_preload_libraries = 'decoderbufs'
    12. EOF
    复制代码


    • 重启容器:
    1. docker restart postgres-12.5
    复制代码


    • 连接数据库
    1. psql -h localhost -p 6432 -U postgres
    复制代码


    • 假如查询返回logical体现修改成功:
    1. SHOW wal_level;
    复制代码


    • 建一些表
    1. -- 创建数据库 test_db
    2. CREATE DATABASE test_db;
    3. -- 列出数据库
    4. \l
    5. -- 连接到新创建的数据库 test_db
    6. \c test_db
    7. -- 建表语句不做演示
    复制代码


    • 发布表
    1. -- 设置发布为true
    2. update pg_publication set puballtables=true where pubname is not null;
    3. -- 把所有表进行发布
    4. CREATE PUBLICATION dbz_publication FOR ALL TABLES;
    5. -- 查询哪些表已经发布
    6. select * from pg_publication_tables;
    复制代码


    • 更改表的复制标识包罗更新和删除的值:
    1. -- 更改复制标识包含更新和删除之前值,默认为NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
    2. --修改单表
    3. ALTER TABLE table1 REPLICA IDENTITY FULL;
    4. --修改schema下的所有表
    5. DO $$
    6. DECLARE
    7.     rec RECORD;
    8.     schema_name TEXT := 'public'; -- 请用你的模式名替换
    9. BEGIN
    10.     -- 为每个表生成动态ALTER TABLE命令并执行
    11.     FOR rec IN
    12.         SELECT tablename
    13.         FROM pg_tables
    14.         WHERE schemaname = schema_name
    15.     LOOP
    16.         EXECUTE format('ALTER TABLE %I.%I REPLICA IDENTITY FULL;', schema_name, rec.tablename);
    17.         RAISE NOTICE 'Altered table: %.%', schema_name, rec.tablename;
    18.     END LOOP;
    19. END $$;
    20. -- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
    21. --查询单表
    22. select relreplident from pg_class where relname='table1';
    23. --查询schema下的所有表
    24. SELECT relname, relreplident FROM pg_class WHERE relnamespace = 'public'::regnamespace;
    复制代码
    2. 开辟实战

    2.1 在Dinky中开辟FlinkSql



    • 根据CDC整库同步概述,上传好jar包
    • 在Dinky中创建flinksql任务,编写CDC整库同步sql
    • 我这里参考MySQLCDC 整库到 Kafka | Dinky,不指定 sink.topic 参数,所有 Change Log 会被写入对应库表名的 topic。
    1. ADD CUSTOMJAR 'rs:/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar';
    2. ADD CUSTOMJAR 'rs:/flink/lib/flink-sql-connector-postgres-cdc-3.2.0.jar';
    3. EXECUTE CDCSOURCE cdc_m3ods_kafka_all WITH (
    4.     'connector' = 'postgres-cdc',
    5.     'hostname' = 'your_pg_Hostname',
    6.     'port' = 'your_pg_port',
    7.     'username' = 'postgres',
    8.     'password' = 'postgres',
    9.     'database-name' = 'test_db',
    10.     'schema-name' = 'your_pg_schema',
    11.     'checkpoint' = '10000',
    12.     'scan.startup.mode' = 'initial',
    13.     'parallelism' = '1',
    14.     'sink.connector'='datastream-kafka',
    15.     'sink.brokers'='broker1:9092,broker2:9092,broker3:9092',
    16.     'sink.properties.transaction.timeout.ms'='600000'
    17. )
    复制代码
    2.2 查看Dinky运维界面


    2.3 查看Flink WebUI


    2.3 查看kafka消息

    1. kafka-topics.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list
    2. kafka-console-consumer.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --from-beginning --topic your_topic
    复制代码
    可以看到消息内容大抵如下
    1. {
    2.   "before": null,
    3.   "after": {
    4.     "col1": "1",
    5.     "col2": "1"
    6.   },
    7.   "op": "c",
    8.   "source": {
    9.     "db": "default_namespace",
    10.     "table": "table1"
    11.   }
    12. }
    复制代码
    3. 错误集锦

    3.1 缺少decoderbufs插件

    1. Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory
    2.     at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
    3.     at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
    4.     at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
    5.     at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
    6.     at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
    7.     at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:341) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
    8.     at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:326) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
    9.     at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:302) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
    10.     at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:297) ~[flink-sql-connector-postgres-cdc-3.2.0.jar:3.2.0]
    11.     at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:459) ~
    复制代码
    办理:安装decoderbufs插件
    3.2 没有allTopicNames()方法


    办理:更换kafka版本至2.8.2+


    • 2.7源码查询allTopicNames结果

    • 2.8.2源码查询allTopicNames结果

       Tip:Apache Kafka3.0.0开始不再支持java8
      

    3.3 Dinky包缺失


    办理 :将dinky/jar/下的dinky-app-1.18-1.1.0-jar-with-dependencies.jar包放入flink/lib下

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

去皮卡多

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表