怎样利用Flink连接openGauss数据库(flink-cdc-connector)

打印 上一主题 下一主题

主题 676|帖子 676|积分 2028

什么是flink-cdc-connector

本文会在最后附上代码修改的git patch
起首简单介绍下flink,Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和管道方式执行恣意流数据程序,Flink的流水线运行时体系可以执行批处理和流处理程序。别的,Flink的运行时自己也支持迭代算法的执行。Flink提供高吞吐量、低延迟的流数据引擎以及对事故-时间处理和状态管理的支持。Flink应用程序在发生气器故障时具有容错本领,并且支持exactly-once语义。程序可以用Java、Scala、Python和SQL等语言编写,并主动编译和优化到在集群或云环境中运行的数据流程序。
常见的场景是,flink将从事故流中获取到的数据举行计算并写入外部的存储体系中以待进一步查询分析。涉及到openGauss这类事件数据库的常见应用场景是,flink使命监听事件数据库(比如MySQL,postgreSQL)的变革数据,实时写入到分析型数据库中以便分析查询可以高效利用分析型数据库性能的同时,也获得数据的实时性。
上面的场景中提到了“变革数据”,那么假如应用怎样捕捉数据库的变革数据呢?这里要引入一个概念叫做change data capture(CDC)。开源项目debezium实现了大量主流数据库的CDC,虽然没有openGauss的对应实现,不外得益于openGauss对postgreSQL良好的兼容性,我们可以在postgreSQL实现的基础举行少量修改。但是我们有了CDC后,怎样把CDC接入flink呢?这就需要用到另一个项目,也是这篇文章将重点介绍的flink-cdc-connectors。
flink-cdc-connectors这个项目包罗多个flink source connector,即把事件数据库作为flink source的connector,而且他依靠了debezium并做了些许修改,我们可以直接在flink-cdc-connectors修改兼容openGauss而不再需要修改debezium了。利用flink-cdc-connectors,就可以通过写SQL来让flink监听openGauss的变革数据,并实时写入ElasticSearch中加快分析查询。
看起来统统都已停当,但是……
测试利用postgres-connector连接openGauss

参考flink-cdc-connectors的postgres测试文档,起首我们尝试直接利用openGauss来更换掉此中的postgres,看看是否会碰到题目。 测试环境的根本信息是:
  1. openGauss 5.0
  2. Flink 1.17.0
  3. flink-sql-connector-elasticsearch7 3.0.1-1.17
  4. flink-sql-connector-mysql-cdc 2.4-SNAPSHOT (build from branch release-2.4)
  5. flink-sql-connector-postgres-cdc 2.4-SNAPSHOT (build from branch release-2.4)
复制代码
想在本地完成测试,需要对测试文档中flink client建表语句举行修改:
  1. CREATE TABLE shipments (
  2.    shipment_id INT,
  3.    order_id INT,
  4.    origin STRING,
  5.    destination STRING,
  6.    is_arrived BOOLEAN,
  7.    PRIMARY KEY (shipment_id) NOT ENFORCED
  8. ) WITH (
  9.    'connector' = 'postgres-cdc',
  10.    'hostname' = 'localhost',
  11.    'port' = 'YOUR_PORT',
  12.    'username' = 'YOUR_USERNAME',
  13.    'password' = 'YOUR_PASSWORD',
  14.    'database-name' = 'postgres',
  15.    'schema-name' = 'public',
  16.    'table-name' = 'shipments',
  17.    'slot.name' = 'your_slot_name',
  18.    'decoding.plugin.name' = 'pgoutput',
  19.    'debezium.publication.autocreate.mode' = 'filtered'
  20. );
复制代码
重要是增加了slot.name,修改decoding.plugin.name插件为pgoutput,参考Debezium connector for PostgreSQL中debezium的建议,修改publication.autocreate.mode为filtered避免权限题目。
为了使flink可以连接到openGauss,也需要修改openGauss的配置
  1. gs_guc set -D ${PGDATA} -c "wal_level=logical"
  2. gs_guc set -D ${PGDATA} -c "listen_addresses='*'"
  3. gs_guc set -D ${PGDATA} -c "password_encryption_type=1"
复制代码
给pg_hba.conf里增加(我这里的用户是gaussdb,根据实际情况修改):
  1. host    replication     gaussdb        0.0.0.0/0            sha256
  2. host    all     all        0.0.0.0/0           sha256
复制代码
openGauss的初始用户无法远程连接,必须新创建一个用户(我这里用户是gaussdb):
  1. create user gaussdb with login password "YOUR_PASSWORD";
  2. grant all privileges to gaussdb;
复制代码
为了方便测试,flink-sql-connector从源码的release-2.4分支编译,拷贝到flink的lib目录下后,按照文档操作,假如openGauss可以完全兼容postgres的话,我们可以得到和文档一样的结果,然而没有那么顺遂。
题目定位

起首碰到的题目是版本判断小于postgres 9.4。对于这个题目,我们可以通过删掉版本查抄的代码来绕过:
  1. // flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
  2.     private static void validateServerVersion(Statement statement) throws SQLException {
  3.         DatabaseMetaData metaData = statement.getConnection().getMetaData();
  4.         int majorVersion = metaData.getDatabaseMajorVersion();
  5.         int minorVersion = metaData.getDatabaseMinorVersion();
  6.         if (majorVersion < 9 || (majorVersion == 9 && minorVersion < 4)) {
  7.             // 删掉这个版本检查
  8.             // throw new SQLException("Cannot connect to a version of Postgres lower than 9.4");
  9.         }
  10.     }
复制代码
思量到postgres和openGauss的差异,我们把jdbc的依靠也一并修改掉避免难以预料的错误: flink-connector-postgres-cdc/pom.xml
  1. <dependency>
  2.      <groupId>org.opengauss</groupId>
  3.      <artifactId>opengauss-jdbc</artifactId>
  4.      <version>5.0.0</version>
  5. </dependency>
复制代码
同时修改flink-sql-connector-postgres-cdc/pom.xml里的shade插件增加:
  1. <include>org.opengauss:*</include>
复制代码
执行mvn clean package -DskipTests重新打包后,将postgres connector的jar包拷贝到flink的lib目录下重新测试,这次出现了新的题目,flink job的日记报错显示connector在与openGauss建立连接后没有收到openGauss返回的任何网络包:
  1. (e29effdca01963c9b8c2fa1e2d1c9888_605b35e407e90cda15ad084365733fdd_0_0) switched from RUNNING to FAILED with failure cause:
  2. io.debezium.jdbc.JdbcConnectionException: [127.0.0.1:47500/ocalhost/127.0.0.1:5432] socket is not closed; Urgent packet sent to backend successfully; An I/O error occured while sending to the backend.detail:EOF Exception;
  3.         at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:251) ~[flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
  4.         at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:434) ~[flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
  5.         at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:137) ~[flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
  6.         at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:133) ~[flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
  7.         at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:760) ~[flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
  8.         at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192) ~[flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
  9.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
  10.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
  11.         at java.lang.Thread.run(Thread.java:829) [?:?]
  12. Caused by: org.postgresql.util.PSQLException: [127.0.0.1:47500/ocalhost/127.0.0.1:5432] socket is not closed; Urgent packet sent to backend successfully; An I/O error occured while sending to the backend.detail:EOF Exception;
  13.         at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:369) ~[postgresql.jar:?]
  14.         at org.postgresql.jdbc.PgStatement.runQueryExecutor(PgStatement.java:570) ~[postgresql.jar:?]
  15.         at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:547) ~[postgresql.jar:?]
  16.         at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:405) ~[postgresql.jar:?]
  17.         at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:347) ~[postgresql.jar:?]
  18.         at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:333) ~[postgresql.jar:?]
  19.         at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:310) ~[postgresql.jar:?]
  20.         at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:250) ~[postgresql.jar:?]
  21.         at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:182) ~[flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
  22.         ... 8 more
  23. Caused by: java.io.EOFException: EOF Exception
  24.         at org.postgresql.core.PGStream.receiveChar(PGStream.java:309) ~[postgresql.jar:?]
  25.         at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401) ~[postgresql.jar:?]
  26.         at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:341) ~[postgresql.jar:?]
  27.         at org.postgresql.jdbc.PgStatement.runQueryExecutor(PgStatement.java:570) ~[postgresql.jar:?]
  28.         at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:547) ~[postgresql.jar:?]
  29.         at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:405) ~[postgresql.jar:?]
  30.         at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:347) ~[postgresql.jar:?]
  31.         at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:333) ~[postgresql.jar:?]
  32.         at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:310) ~[postgresql.jar:?]
  33.         at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:250) ~[postgresql.jar:?]
  34.         at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:182) ~[flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
  35.         ... 8 more
复制代码
而openGauss的日记(pg_logs)同样显示openGauss在建立连接后没有收到任何网络包:
  1. walsender thread started
  2. No message received from standby for maximum time
  3. walsender thread shut down
复制代码
双方都在等候对方发送请求,但双方都没发送任何请求,这是为什么呢?
仔细观察flink的非常栈,可以看到此时是flink发送了请求但是没有比及openGauss的返回,而openGauss显示没有收到请求。为了进一步排查openGauss究竟是否收到请求,我们修改配置
  1. gs_guc set -D ${PGDATA} -c "log_statement = 'all'"
复制代码
使得openGauss的日记会输出所有的SQL请求语句。再次跑测试流程,可以看到openGauss确实没有收到请求,所以我们继承观察flink这边。
看代码PostgresReplicationConnection.initPublication中,堕落的代码是stmt.executeQuery(selectPublication)表示发送一个SQL请求,堕落的缘故起因是openGauss没有收到请求进而触发客户端超时。通过上面的配置我们可以在openGauss日记中看到flink的客户端ip发来了一些请求并精确处理,比方select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?这样的sql请求,那这些精确吸收到的请求和非常栈中的请求有何不同呢?进一步阅读代码和对比日记我们可以发现,精确吸收到的请求都是PostgresConnection类中发出的,此中利用的连接是debezium中的JdbcConnection,而错误的请求是PostgresReplicationConnection.initPublication发出的,利用的连接在构造时传入了replication的配置,假如我们在堕落的地方同样利用JdbcConnection来发送请求会怎样呢?
为此,我们需要修改PostgresReplicationConnection的构造函数,增加字段
  1. this.jdbcConnection = jdbcConnection;
复制代码
这个jdbcConnection的生命周期是在调用构造函数的类中维护的,所以这里我们不需要close等管理他的生命周期,仅作赋值和直接利用即可。
然后在PostgresReplicationConnection.initPublication中修改获取连接的方式为:
  1. Connection conn = jdbcConnection.connection();
复制代码
这样下面执行的SQL请求会通过JdbcConnection发送了。
重新测试,已经可以精确的在ES中查询到openGauss里的数据并实时更新了。
更进一步

为了支持参数'scan.incremental.snapshot.enabled' = 'true',需要修改flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java做sql语法兼容
  1.          // NOTE: it requires ANALYZE or VACUUM to be run first in PostgreSQL.
  2.          final String query =
  3.                  String.format(
  4. -                        "SELECT reltuples::bigint FROM pg_class WHERE oid = to_regclass('%s')",
  5. +                        "SELECT reltuples::bigint FROM pg_class WHERE oid = '%s'::regclass",
  6.                          tableId.toString());
复制代码
对于openGauss 3.0的版本,由于在Replicaion Connection建立后执行SQL命令,所以需要额外修改去掉pgConnection().getCatalog()避免发送sql请求
  1.      private Set<TableId> determineCapturedTables() throws Exception {
  2.          Set<TableId> allTableIds =
  3. -                this.connect()
  4. -                        .readTableNames(
  5. -                                pgConnection().getCatalog(), null, null, new String[] {"TABLE"});
  6. +                this.jdbcConnection.readTableNames(null, null, null, new String[] {"TABLE"});
复制代码
假如要将openGauss作为sink端,那么还需要修改flink-connector-jdbc项目的代码并自己重新打包 branch: v3.1 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
  1. @@ -71,14 +71,12 @@ public class PostgresDialect extends AbstractDialect {
  2.                          .collect(Collectors.joining(", "));
  3.          String updateClause =
  4.                  Arrays.stream(fieldNames)
  5. +                        .filter(s -> !uniqueColumns.contains(s))
  6.                          .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
  7.                          .collect(Collectors.joining(", "));
  8.          return Optional.of(
  9.                  getInsertIntoStatement(tableName, fieldNames)
  10. -                        + " ON CONFLICT ("
  11. -                        + uniqueColumns
  12. -                        + ")"
  13. -                        + " DO UPDATE SET "
  14. +                        + " ON DUPLICATE KEY UPDATE "
  15.                          + updateClause);
  16.      }
复制代码
假如要将flink-cdc-connectors中的jdbc更换为厂商提供的驱动jar包而非openGauss,无法从maven堆栈拉取,那么需要修改maven的依靠为:
  1. <dependency>
  2.     <groupId>xxx.xxx</groupId>
  3.     <artifactId>xxx</artifactId>
  4.     <version>your_version</version>
  5.     <scope>system</scope>
  6.     <systemPath>/path/to/jdbc.jar</systemPath>
  7. </dependency>
复制代码
但是这样shade插件不会将类打包进connector的jar里,所以我们还需要在拷贝jar包到flink的同时把jdbc的jar包也一并拷已往。
别的,堕落的Sql请求是为了创建replication slot和publication,假如我们提前手动创建好,也是可以避免这个题目而不用修改的代码
  1. select pg_create_logical_replication_slot('your_slot_name','pgoutput');
  2. create publication dbz_publication for all tables;
复制代码
总结

颠末对connector代码的修改,openGauss 3.0和5.0版本均可以作为flink的source端和sink端,融入flink的流式计算体系。
   branch: release-2.4
commit: 823f5a1e21009e2e0ba36cd44ec15353e3cbcd67
  1. commit 3d8924b37ed3d25764788b63ada1f48e00a97c4eAuthor: WangYuxuan <wang.yuxuan.xinyu@foxmail.com>Date:   Wed Aug 23 16:23:13 2023 +0800    example commitdiff --git a/flink-connector-postgres-cdc/pom.xml b/flink-connector-postgres-cdc/pom.xmlindex a9886dc8..d7637615 100644--- a/flink-connector-postgres-cdc/pom.xml+++ b/flink-connector-postgres-cdc/pom.xml@@ -67,12 +67,18 @@ under the License.             </exclusions>         </dependency> +                                <!--         <dependency>-            <!-- fix CVE-2022-26520 https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520  -->             <groupId>org.postgresql</groupId>             <artifactId>postgresql</artifactId>             <version>42.5.1</version>         </dependency>+                                -->+                                <dependency>+                                                <groupId>org.opengauss</groupId>+                                                <artifactId>opengauss-jdbc</artifactId>+                                                <version>5.0.0</version>+                                </dependency>          <!-- test dependencies on Debezium --> @@ -168,4 +174,4 @@ under the License.      </dependencies> -</project>\ No newline at end of file+</project>diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.javaindex 3d3fc996..e30443a8 100644--- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java+++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java@@ -67,7 +67,7 @@ public class PostgresQueryUtils {         // NOTE: it requires ANALYZE or VACUUM to be run first in PostgreSQL.
  2.          final String query =
  3.                  String.format(
  4. -                        "SELECT reltuples::bigint FROM pg_class WHERE oid = to_regclass('%s')",
  5. +                        "SELECT reltuples::bigint FROM pg_class WHERE oid = '%s'::regclass",
  6.                          tableId.toString());
  7.           return jdbc.queryAndMap(diff --git a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.javaindex e728f3b4..1c21dfd6 100644--- a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java+++ b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java@@ -636,7 +636,8 @@ public class PostgresConnection extends JdbcConnection {         int majorVersion = metaData.getDatabaseMajorVersion();         int minorVersion = metaData.getDatabaseMinorVersion();         if (majorVersion < 9 || (majorVersion == 9 && minorVersion < 4)) {-            throw new SQLException("Cannot connect to a version of Postgres lower than 9.4");+            // throw new SQLException("Cannot connect to a version of Postgres lower than 9.4");+            LOGGER.warn("wyx");         }     } diff --git a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.javaindex 364cb59b..eeb6a4e8 100644--- a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java+++ b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java@@ -74,6 +74,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep     private final MessageDecoder messageDecoder;     private final TypeRegistry typeRegistry;     private final Properties streamParams;+    private final PostgresConnection jdbcConnection;      private Lsn defaultStartingPos;     private SlotCreationResult slotCreationInfo;@@ -136,6 +137,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep         this.statusUpdateInterval = statusUpdateInterval;         this.messageDecoder =                 plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection);+        this.jdbcConnection = jdbcConnection;
  8.          this.typeRegistry = typeRegistry;         this.streamParams = streamParams;         this.slotCreationInfo = null;@@ -171,7 +173,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep             LOGGER.info("Initializing PgOutput logical decoder publication");             try {                 // Unless the autocommit is disabled the SELECT publication query will stay running-                Connection conn = pgConnection();+                Connection conn = jdbcConnection.connection();
  9.                  conn.setAutoCommit(false);                  String selectPublication =@@ -255,9 +257,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep      private Set<TableId> determineCapturedTables() throws Exception {
  10.          Set<TableId> allTableIds =
  11. -                this.connect()
  12. -                        .readTableNames(
  13. -                                pgConnection().getCatalog(), null, null, new String[] {"TABLE"});
  14. +                this.jdbcConnection.readTableNames(null, null, null, new String[] {"TABLE"});
  15.           Set<TableId> capturedTables = new HashSet<>(); diff --git a/flink-sql-connector-postgres-cdc/pom.xml b/flink-sql-connector-postgres-cdc/pom.xmlindex 8beb8bc6..c8974e0e 100644--- a/flink-sql-connector-postgres-cdc/pom.xml+++ b/flink-sql-connector-postgres-cdc/pom.xml@@ -65,6 +65,7 @@ under the License.                                     <include>com.fasterxml.*:*</include>                                     <!--  Include fixed version 30.1.1-jre-14.0 of flink shaded guava  -->                                     <include>org.apache.flink:flink-shaded-guava</include>+                                    <include>org.opengauss:*</include>
  16.                                  </includes>                             </artifactSet>                             <filters>@@ -107,4 +108,4 @@ under the License.             </plugin>         </plugins>     </build>-</project>\ No newline at end of file+</project>
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大号在练葵花宝典

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表