重要是增加了slot.name,修改decoding.plugin.name插件为pgoutput,参考Debezium connector for PostgreSQL中debezium的建议,修改publication.autocreate.mode为filtered避免权限题目。
为了使flink可以连接到openGauss,也需要修改openGauss的配置
gs_guc set -D ${PGDATA} -c "wal_level=logical"
gs_guc set -D ${PGDATA} -c "listen_addresses='*'"
gs_guc set -D ${PGDATA} -c "password_encryption_type=1"
复制代码
给pg_hba.conf里增加(我这里的用户是gaussdb,根据实际情况修改):
host replication gaussdb 0.0.0.0/0 sha256
host all all 0.0.0.0/0 sha256
复制代码
openGauss的初始用户无法远程连接,必须新创建一个用户(我这里用户是gaussdb):
create user gaussdb with login password "YOUR_PASSWORD";
(e29effdca01963c9b8c2fa1e2d1c9888_605b35e407e90cda15ad084365733fdd_0_0) switched from RUNNING to FAILED with failure cause:
io.debezium.jdbc.JdbcConnectionException: [127.0.0.1:47500/ocalhost/127.0.0.1:5432] socket is not closed; Urgent packet sent to backend successfully; An I/O error occured while sending to the backend.detail:EOF Exception;
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:251) ~[flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:434) ~[flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:137) ~[flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:133) ~[flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:760) ~[flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192) ~[flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.postgresql.util.PSQLException: [127.0.0.1:47500/ocalhost/127.0.0.1:5432] socket is not closed; Urgent packet sent to backend successfully; An I/O error occured while sending to the backend.detail:EOF Exception;
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:369) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.runQueryExecutor(PgStatement.java:570) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:547) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:405) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:347) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:333) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:310) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:250) ~[postgresql.jar:?]
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:182) ~[flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
... 8 more
Caused by: java.io.EOFException: EOF Exception
at org.postgresql.core.PGStream.receiveChar(PGStream.java:309) ~[postgresql.jar:?]
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401) ~[postgresql.jar:?]
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:341) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.runQueryExecutor(PgStatement.java:570) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:547) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:405) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:347) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:333) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:310) ~[postgresql.jar:?]
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:250) ~[postgresql.jar:?]
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:182) ~[flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar:2.4-SNAPSHOT]
gs_guc set -D ${PGDATA} -c "log_statement = 'all'"
复制代码
使得openGauss的日记会输出所有的SQL请求语句。再次跑测试流程,可以看到openGauss确实没有收到请求,所以我们继承观察flink这边。
看代码PostgresReplicationConnection.initPublication中,堕落的代码是stmt.executeQuery(selectPublication)表示发送一个SQL请求,堕落的缘故起因是openGauss没有收到请求进而触发客户端超时。通过上面的配置我们可以在openGauss日记中看到flink的客户端ip发来了一些请求并精确处理,比方select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?这样的sql请求,那这些精确吸收到的请求和非常栈中的请求有何不同呢?进一步阅读代码和对比日记我们可以发现,精确吸收到的请求都是PostgresConnection类中发出的,此中利用的连接是debezium中的JdbcConnection,而错误的请求是PostgresReplicationConnection.initPublication发出的,利用的连接在构造时传入了replication的配置,假如我们在堕落的地方同样利用JdbcConnection来发送请求会怎样呢?
为此,我们需要修改PostgresReplicationConnection的构造函数,增加字段
commit 3d8924b37ed3d25764788b63ada1f48e00a97c4eAuthor: WangYuxuan <wang.yuxuan.xinyu@foxmail.com>Date: Wed Aug 23 16:23:13 2023 +0800 example commitdiff --git a/flink-connector-postgres-cdc/pom.xml b/flink-connector-postgres-cdc/pom.xmlindex a9886dc8..d7637615 100644--- a/flink-connector-postgres-cdc/pom.xml+++ b/flink-connector-postgres-cdc/pom.xml@@ -67,12 +67,18 @@ under the License. </exclusions> </dependency> + <!-- <dependency>- <!-- fix CVE-2022-26520 https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520 --> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.5.1</version> </dependency>+ -->+ <dependency>+ <groupId>org.opengauss</groupId>+ <artifactId>opengauss-jdbc</artifactId>+ <version>5.0.0</version>+ </dependency> <!-- test dependencies on Debezium --> @@ -168,4 +174,4 @@ under the License. </dependencies> -</project>\ No newline at end of file+</project>diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.javaindex 3d3fc996..e30443a8 100644--- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java+++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java@@ -67,7 +67,7 @@ public class PostgresQueryUtils { // NOTE: it requires ANALYZE or VACUUM to be run first in PostgreSQL.
final String query =
String.format(
- "SELECT reltuples::bigint FROM pg_class WHERE oid = to_regclass('%s')",
+ "SELECT reltuples::bigint FROM pg_class WHERE oid = '%s'::regclass",
tableId.toString());
return jdbc.queryAndMap(diff --git a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.javaindex e728f3b4..1c21dfd6 100644--- a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java+++ b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java@@ -636,7 +636,8 @@ public class PostgresConnection extends JdbcConnection { int majorVersion = metaData.getDatabaseMajorVersion(); int minorVersion = metaData.getDatabaseMinorVersion(); if (majorVersion < 9 || (majorVersion == 9 && minorVersion < 4)) {- throw new SQLException("Cannot connect to a version of Postgres lower than 9.4");+ // throw new SQLException("Cannot connect to a version of Postgres lower than 9.4");+ LOGGER.warn("wyx"); } } diff --git a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.javaindex 364cb59b..eeb6a4e8 100644--- a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java+++ b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java@@ -74,6 +74,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep private final MessageDecoder messageDecoder; private final TypeRegistry typeRegistry; private final Properties streamParams;+ private final PostgresConnection jdbcConnection; private Lsn defaultStartingPos; private SlotCreationResult slotCreationInfo;@@ -136,6 +137,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep this.statusUpdateInterval = statusUpdateInterval; this.messageDecoder = plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection);+ this.jdbcConnection = jdbcConnection;
this.typeRegistry = typeRegistry; this.streamParams = streamParams; this.slotCreationInfo = null;@@ -171,7 +173,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep LOGGER.info("Initializing PgOutput logical decoder publication"); try { // Unless the autocommit is disabled the SELECT publication query will stay running- Connection conn = pgConnection();+ Connection conn = jdbcConnection.connection();
conn.setAutoCommit(false); String selectPublication =@@ -255,9 +257,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep private Set<TableId> determineCapturedTables() throws Exception {
Set<TableId> allTableIds =
- this.connect()
- .readTableNames(
- pgConnection().getCatalog(), null, null, new String[] {"TABLE"});
+ this.jdbcConnection.readTableNames(null, null, null, new String[] {"TABLE"});
Set<TableId> capturedTables = new HashSet<>(); diff --git a/flink-sql-connector-postgres-cdc/pom.xml b/flink-sql-connector-postgres-cdc/pom.xmlindex 8beb8bc6..c8974e0e 100644--- a/flink-sql-connector-postgres-cdc/pom.xml+++ b/flink-sql-connector-postgres-cdc/pom.xml@@ -65,6 +65,7 @@ under the License. <include>com.fasterxml.*:*</include> <!-- Include fixed version 30.1.1-jre-14.0 of flink shaded guava --> <include>org.apache.flink:flink-shaded-guava</include>+ <include>org.opengauss:*</include>
</includes> </artifactSet> <filters>@@ -107,4 +108,4 @@ under the License. </plugin> </plugins> </build>-</project>\ No newline at end of file+</project>