IT评测·应用市场-qidao123.com技术社区

标题: Debezium + Kafka-connect 实现Postgres实时同步Hologres [打印本页]

作者: 飞不高    时间: 2025-3-20 15:45
标题: Debezium + Kafka-connect 实现Postgres实时同步Hologres
基于 Debezium + Kafka 的方案实现 PostgreSQL 到 Hologres 的实时数据同步,是一种高可靠性、高扩展性的办理方案。以下是详细的实现步调:
1. 方案架构

Debezium:捕获 PostgreSQL 的变更数据(CDC),并将变更变乱发送到 Kafka。
Kafka:作为消息队列,缓冲和分发变更数据。
Kafka 消费者:消费 Kafka 中的变更数据,并将其写入 Hologres。
Hologres:作为目的数据库,存储同步的数据。
2. 情况准备

2.1 组件版本

PostgreSQL:9.6 及以上(支持逻辑复制)。
Debezium:1.9 及以上。
Kafka:2.8 及以上。
Hologres:兼容 PostgreSQL 协议,支持 JDBC 毗连。
2.2 设置 PostgreSQL

启用逻辑复制:
  1. ALTER SYSTEM SET wal_level = logical;
复制代码
创建复制槽:
  1. # 创建复制槽(replication slot):
  2. SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');
复制代码
  1. # 查看复制插槽
  2. SELECT * FROM pg_replication_slots;
复制代码
创建发布:
  1. # 创建发布(配置连接器可自动创建)
  2. CREATE PUBLICATION debezium_pub FOR ALL TABLES;
  3. CREATE PUBLICATION debezium_pub FOR TABLE my_table1, my_table2;
  4. # 删除发布
  5. DROP PUBLICATION IF EXISTS debezium_pub;
复制代码
确保 PostgreSQL 用户具有复制权限:
  1. # 最好使用superuser
  2. CREATE ROLE debezium_user WITH REPLICATION LOGIN PASSWORD 'your_password';
复制代码
3. 设置 Debezium

3.1 下载 Debezium

链接下载 Debezium 的 PostgreSQL 毗连器:
下载地址:https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/
  1. wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.2.0.Final/debezium-connector-postgres-2.2.0.Final-plugin.tar.gz
复制代码
3.2 添加插件到 Kafka Connect

手动复制插件 tar.gz 文件
将下载的 tar.gz 文件解压复制到 Kafka Connect 的插件目录中。 默认情况下,
插件目录为: /usr/share/java/kafka-connect-plugins/
  1. # 创建插件目录(如果不存在)
  2. sudo mkdir -p /usr/share/java/kafka-connect/
  3. # 复制 JAR 文件
  4. tar -zxvf /etc/kafka-connect/debezium-connector-postgres-2.2.0.Final-plugin.tar.gz -C /usr/share/java/kafka-connect/
复制代码
使用 Kubernetes 挂载插件
如果你在 Kubernetes 情况中运行 Kafka Connect, 可以在 Deployment 文件中挂载插件目录:
  1. volumes:
  2.         - emptyDir: {}
  3.           name: plugins-volume
  4.         - emptyDir: {}
  5.           name: config-volume
  6. ​ volumeMounts:
  7.         - name: plugins-volume
  8.           mountPath: /usr/share/java/kafka-connect/
  9.         - name: config-volume
  10.           mountPath: /etc/kafka-connect/
复制代码
Kafka-connect对应的Service、StatefulSets对象的yaml
  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4.   name: prod-kafka-connect
  5.   namespace: prod-core-jobs-shuguan
  6. spec:
  7.   ports:
  8.     - port: 8083
  9.       targetPort: 8083
  10.       protocol: TCP
  11.       name: http
  12.   selector:
  13.     app: prod-kafka-connect
  14.   type: ClusterIP
  15. ---
  16. apiVersion: apps/v1
  17. kind: StatefulSet
  18. metadata:
  19.   name: prod-kafka-connect
  20.   namespace: prod-core-jobs-shuguan
  21. spec:
  22.   serviceName: "prod-kafka-connect"
  23.   replicas: 1
  24.   selector:
  25.     matchLabels:
  26.       app: prod-kafka-connect
  27.   template:
  28.     metadata:
  29.       labels:
  30.         app: prod-kafka-connect
  31.     spec:
  32.       volumes:
  33.         - emptyDir: {}
  34.           name: plugins-volume
  35.         - emptyDir: {}
  36.           name: config-volume
  37.       initContainers:
  38.         - name: extract-plugins
  39.           image: pkg.geely.com/docker/busybox:1.31   # 初始化容器,作下载拷贝Debezium插件使用
  40.           command:
  41.             - sh
  42.             - -c
  43.             - |
  44.               wget https://pkg.geely.com/artifactory/IMD-pypi-dev-hz/resource/uploadTemp/debezium-connector-postgres-2.2.0.Final-plugin.tar.gz -O /etc/kafka-connect/debezium-connector-postgres-2.2.0.Final-plugin.tar.gz
  45.               tar -zxvf /etc/kafka-connect/debezium-connector-postgres-2.2.0.Final-plugin.tar.gz -C /usr/share/java/kafka-connect/
  46.           volumeMounts:
  47.             - name: plugins-volume
  48.               mountPath: /usr/share/java/kafka-connect/
  49.             - name: config-volume
  50.               mountPath: /etc/kafka-connect/
  51.       containers:
  52.         - name: prod-kafka-connect
  53.           image: pkg.geely.com/docker/confluentinc/cp-kafka-connect:7.2.0
  54.           ports:
  55.             - containerPort: 8083
  56.               name: http
  57.           env:
  58.             - name: CONNECT_BOOTSTRAP_SERVERS
  59.               value: "prod-kafka:9092"  # 替换为你的 Kafka 集群地址
  60.             - name: CONNECT_GROUP_ID
  61.               value: "connect-cluster"
  62.             - name: CONNECT_KEY_CONVERTER
  63.               value: "org.apache.kafka.connect.storage.StringConverter"
  64.             - name: CONNECT_VALUE_CONVERTER
  65.               value: "org.apache.kafka.connect.storage.StringConverter"
  66.             - name: CONNECT_CONFIG_STORAGE_TOPIC
  67.               value: "connect-configs"
  68.             - name: CONNECT_OFFSET_STORAGE_TOPIC
  69.               value: "connect-offsets"
  70.             - name: CONNECT_STATUS_STORAGE_TOPIC
  71.               value: "connect-statuses"
  72.             - name: CONNECT_REST_ADVERTISED_HOST_NAME
  73.               valueFrom:
  74.                 fieldRef:
  75.                   fieldPath: status.podIP
  76.             - name: CONNECT_REST_ADVERTISED_PORT
  77.               value: "8083"
  78.           volumeMounts:
  79.             - name: plugins-volume
  80.               mountPath: /usr/share/java/kafka-connect/
  81.             - name: config-volume
  82.               mountPath: /etc/kafka-connect/
  83.           readinessProbe:
  84.             httpGet:
  85.               path: /connectors
  86.               port: http
  87.             initialDelaySeconds: 30
  88.             periodSeconds: 10
  89.           livenessProbe:
  90.             httpGet:
  91.               path: /connectors
  92.               port: http
  93.             initialDelaySeconds: 300
  94.             periodSeconds: 10
复制代码
3.3 设置 Debezium 毗连器

在 Kafka Connect 中添加 Debezium 毗连器,分为两部分,一个为postgres-source,从pg监控数据变化发送消息到kafka,另一个为hologres-sink,从kafka消费数据,写到hologres数据库,如下
  1. # 添加source-connector:
  2. curl --location --request POST 'http://kafka-connect-address:8083/connectors' \
  3. --header 'Content-Type: application/json' \
  4. --data-raw '{
  5.   "name": "postgres-source",
  6.   "config": {
  7.     "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  8.     "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  9.     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  10.     "key.converter.schemas.enable": true,
  11.     "value.converter.schemas.enable": true,
  12.     "tasks.max": "3",
  13.     "database.hostname": "prod-postgresql",
  14.     "database.port": "5432",
  15.     "database.user": "postgres",
  16.     "database.password": "123456",
  17.     "database.dbname": "db_dmp",
  18.     "database.server.name": "prod-shuguan-dmp",
  19.     "schema.history.internal.kafka.bootstrap.servers": "prod-kafka:9092",
  20.     "schema.history.internal.kafka.topic": "pgsql-sync-hologres",
  21.     "table.include.list": "public.data_info,public.data_source",
  22.     "primary.key.fields": "id",
  23.     "slot.name": "debezium_slot",
  24.     "plugin.name": "pgoutput",
  25.     "publication.name": "debezium_pub",
  26.     "publication.autocreate.mode": "filtered",
  27.     "topic.prefix": "datatrans"
  28.   }
  29. }'  
  30. # 返回结果: {
  31.     "name": "postgres-source",
  32.     "config": {
  33.         "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  34.         "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  35.         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  36.         "key.converter.schemas.enable": "true",
  37.         "value.converter.schemas.enable": "true",
  38.         "tasks.max": "3",
  39.         "database.hostname": "prod-postgresql",
  40.         "database.port": "5432",
  41.         "database.user": "postgres",
  42.         "database.password": "123456",
  43.         "database.dbname": "db_dmp",
  44.         "database.server.name": "prod-shuguan-dmp",
  45.         "schema.history.internal.kafka.bootstrap.servers": "prod-kafka:9092",
  46.         "schema.history.internal.kafka.topic": "pgsql-sync-hologres",
  47.         "table.include.list": "public.data_info,public.data_source",
  48.         "primary.key.fields": "id",
  49.         "slot.name": "debezium_slot",
  50.         "plugin.name": "pgoutput",
  51.         "publication.name": "debezium_pub",
  52.         "publication.autocreate.mode": "filtered",
  53.         "topic.prefix": "datatrans",
  54.         "name": "postgres-source"
  55.     },
  56.     "tasks": [],
  57.     "type": "source"
  58. }
  59. # 添加sink-connector:
  60. curl --location --request PUT 'http://kafka-connect-address:8083/connectors/hologres-sink/config' \
  61. --header 'Content-Type: application/json' \
  62. --data-raw '{
  63.     "name": "hologres-sink",
  64.     "config": {
  65.         "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
  66.         "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  67.         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  68.         "key.converter.schemas.enable": true,
  69.         "value.converter.schemas.enable": true,
  70.         "schemas.enable": false,
  71.         "tasks.max": "1",
  72.         "connection.url": "jdbc:postgresql://holo-cn-vpc-st.hologres.ops.auto.gee-cloud.com:80/db_dmp?reWriteBatchedInserts=true",
  73.         "connection.username": "admin",
  74.         "connection.password": "123456",
  75.         "insert.mode": "upsert",
  76.         "topics": "datatrans.public.data_info,datatrans.public.data_source",
  77.         "schema.history.internal.kafka.bootstrap.servers": "prod-kafka:9092",
  78.         "schema.history.internal.kafka.topic": "pgsql-sync-hologres",
  79.         "delete.enabled": "true",
  80.         "primary.key.fields": "id",
  81.         "primary.key.mode": "record_key",
  82.         "schema.evolution": "basic",
  83.         "database.time_zone": "UTC",
  84.         "auto.create": "true",
  85.         "auto.evolve": "true"
  86.     }
  87. }'
  88. # 返回结果:
  89. {
  90.     "name": "hologres-sink",
  91.     "config": {
  92.         "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
  93.         "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  94.         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  95.         "key.converter.schemas.enable": "true",
  96.         "value.converter.schemas.enable": "true",
  97.         "schemas.enable": "false",
  98.         "tasks.max": "1",
  99.         "connection.url": "jdbc:postgresql://holo-cn-vpc-st.hologres.ops.auto.gee-cloud.com:80/db_dmp?reWriteBatchedInserts=true",
  100.         "connection.username": "admin",
  101.         "connection.password": "123456",
  102.         "insert.mode": "upsert",
  103.         "topics": "datatrans.public.data_info,datatrans.public.data_source",
  104.         "schema.history.internal.kafka.bootstrap.servers": "prod-kafka:9092",
  105.         "schema.history.internal.kafka.topic": "pgsql-sync-hologres",
  106.         "delete.enabled": "true",
  107.         "primary.key.fields": "id",
  108.         "primary.key.mode": "record_key",
  109.         "schema.evolution": "basic",
  110.         "database.time_zone": "UTC",
  111.         "auto.create": "false",
  112.         "auto.evolve": "false",
  113.         "name": "hologres-sink"
  114.     },
  115.     "tasks": [
  116.         {
  117.             "connector": "hologres-sink",
  118.             "task": 0
  119.         }
  120.     ],
  121.     "type": "sink"
  122. }
复制代码
删除毗连器
  1. curl --location --request DELETE 'http://kafka-connect-address:8083/connectors/postgres-source'
复制代码
查察毗连器状态:
  1. curl --location --request GET 'http://kafka-connect-address:8083/connectors/postgres-source/status'
  2. #返回结果:
  3. {
  4.     "name": "postgres-source",
  5.     "connector": {
  6.         "state": "RUNNING",
  7.         "worker_id": "10.244.0.116:8083"
  8.     },
  9.     "tasks": [
  10.         {
  11.             "id": 0,
  12.             "state": "RUNNING",
  13.             "worker_id": "10.244.0.116:8083"
  14.         }
  15.     ],
  16.     "type": "source"
  17. }
复制代码
查察毗连器设置
  1. curl --location --request GET 'http://kafka-connect-address:8083/connectors/postgres-source/config' \
  2. #返回结果:
  3. {
  4.     "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  5.     "publication.autocreate.mode": "filtered",
  6.     "database.user": "postgres",
  7.     "database.dbname": "db_dmp",
  8.     "slot.name": "debezium_slot",
  9.     "tasks.max": "3",
  10.     "publication.name": "debezium_pub",
  11.     "database.server.name": "prod-shuguan-dmp",
  12.     "schema.history.internal.kafka.bootstrap.servers": "prod-kafka:9092",
  13.     "database.port": "5432",
  14.     "plugin.name": "pgoutput",
  15.     "topic.prefix": "datatrans",
  16.     "schema.history.internal.kafka.topic": "pgsql-sync-hologres",
  17.     "database.hostname": "prod-postgresql",
  18.     "database.password": "123456",
  19.     "name": "postgres-source",
  20.     "table.include.list": "public.data_info,public.data_source"
  21. }
复制代码
查察毗连器列表
  1. curl --location --request GET 'http://kafka-connect-address:8083/connectors'
  2. #返回结果:
  3. [
  4.     "postgres-source",
  5.     "hologres-sink"
  6. ]
复制代码
其他相关接口
  1. kafka-connect支持接口
  2. GET /connectors #返回活动连接器的列表
  3. POST /connectors #创建一个新的连接器; 请求主体应该是包含字符串name字段和config带有连接器配置参数的对象字段的JSON对象
  4. GET /connectors/{name} #获取有关特定连接器的信息
  5. GET /connectors/{name}/config #获取特定连接器的配置参数
  6. PUT /connectors/{name}/config #更新特定连接器的配置参数
  7. GET /connectors/{name}/status #获取连接器的当前状态,包括连接器是否正在运行,失败,已暂停等,分配给哪个工作者,失败时的错误信息以及所有任务的状态
  8. GET /connectors/{name}/tasks #获取当前为连接器运行的任务列表
  9. GET /connectors/{name}/tasks/{taskid}/status #获取任务的当前状态,包括如果正在运行,失败,暂停等,分配给哪个工作人员,如果失败,则返回错误信息
  10. PUT /connectors/{name}/pause #暂停连接器及其任务,停止消息处理,直到连接器恢复
  11. PUT /connectors/{name}/resume #恢复暂停的连接器(或者,如果连接器未暂停,则不执行任何操作)
  12. POST /connectors/{name}/restart #重新启动连接器(通常是因为失败)
  13. POST /connectors/{name}/tasks/{taskId}/restart #重启个别任务(通常是因为失败)
  14. DELETE /connectors/{name} #删除连接器,停止所有任务并删除其配置
  15. #Kafka Connect还提供了用于获取有关连接器插件信息的REST API:
  16. GET /connector-plugins #返回安装在Kafka Connect集群中的连接器插件列表。请注意,API仅检查处理请求的worker的连接器,这意味着您可能会看到不一致的结果,尤其是在滚动升级期间,如果添加新的连接器jar
  17. PUT /connector-plugins/{connector-type}/config/validate # 根据配置定义验证提供的配置值。此API执行每个配置验证,在验证期间返回建议值和错误消息。
复制代码
4. 测试和验证

4.1 数据新增

在 PostgreSQL 中插入数据:
  1. INSERT INTO public.data_info(
  2.         platform, data_type, data_name, dir_name, bag_exists, clip_exists, create_time, latest_update_time, deleted)
  3.         VALUES('11v-xl', 'camera', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0);
复制代码
查察kafka-topic消息
  1. I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
  2. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":18,"platform":"11v-xl","data_type":"camera","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741311929781870,"latest_update_time":1741311929781870,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741311929783,"snapshot":"false","db":"db_dmp","sequence":"["12057353312","12057353472"]","ts_us":1741311929783308,"ts_ns":1741311929783308000,"schema":"public","table":"data_info","txId":3433486,"lsn":12057353472,"xmin":null},"transaction":null,"op":"c","ts_ms":1741311929873,"ts_us":1741311929873166,"ts_ns":1741311929873166000}}  
复制代码
检查 Hologres 中的数据是否同步
  1. SELECT * FROM public.data_info ORDER BY id DESC;
复制代码
4.2 数据更新
在 PostgreSQL 中更新数据:
  1. UPDATE public.data_info SET platform = '6v-xl', data_name = 'aroundBack_xl' WHERE platform = '11v-xl';
复制代码
查察Kafka-topic消息:
  1. I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
  2. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":18,"platform":"6v-xl","data_type":"camera","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741311929781870,"latest_update_time":1741311929781870,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741312464134,"snapshot":"false","db":"db_dmp","sequence":"["12065166032","12065166088"]","ts_us":1741312464134145,"ts_ns":1741312464134145000,"schema":"public","table":"data_info","txId":3434153,"lsn":12065166088,"xmin":null},"transaction":null,"op":"u","ts_ms":1741312464229,"ts_us":1741312464229054,"ts_ns":1741312464229054000}}
复制代码
检查Hologres数据:
  1. SELECT * FROM public.data_info ORDER BY id DESC;
复制代码
4.3 数据删除
在 PostgreSQL 中更新数据:
  1. DELETE FROM public.data_info WHERE platform = '6v-xl';
复制代码
查察Kafka-topic消息:
  1. I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
  2. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":18,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741312717303,"snapshot":"false","db":"db_dmp","sequence":"["12068950904","12068950960"]","ts_us":1741312717303137,"ts_ns":1741312717303137000,"schema":"public","table":"data_info","txId":3434449,"lsn":12068950960,"xmin":null},"transaction":null,"op":"d","ts_ms":1741312717748,"ts_us":1741312717748938,"ts_ns":1741312717748938000}}
  3. null
复制代码
检查Hologres数据:
  1. SELECT * FROM public.data_info ORDER BY id DESC;
复制代码
4.3 批量插入
在 PostgreSQL 中插入数据:
  1. INSERT INTO public.data_info(
  2.         platform, data_type, data_name, dir_name, bag_exists, clip_exists, create_time, latest_update_time, deleted)
  3.         VALUES ('11v-xl', 'camera0', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0),
  4.         ('11v-xl', 'camera1', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0),
  5.         ('11v-xl', 'camera2', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0),
  6.         ('11v-xl', 'camera3', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0),
  7.         ('11v-xl', 'camera5', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0);
复制代码
查察Kafka-topic消息:
  1. I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
  2. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":26,"platform":"11v-xl","data_type":"camera0","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313539694,"snapshot":"false","db":"db_dmp","sequence":"["12072782920","12072782920"]","ts_us":1741313539694079,"ts_ns":1741313539694079000,"schema":"public","table":"data_info","txId":3435484,"lsn":12072782920,"xmin":null},"transaction":null,"op":"c","ts_ms":1741313540003,"ts_us":1741313540003647,"ts_ns":1741313540003647000}}
  3. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":27,"platform":"11v-xl","data_type":"camera1","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313539694,"snapshot":"false","db":"db_dmp","sequence":"["12072782920","12072783208"]","ts_us":1741313539694079,"ts_ns":1741313539694079000,"schema":"public","table":"data_info","txId":3435484,"lsn":12072783208,"xmin":null},"transaction":null,"op":"c","ts_ms":1741313540005,"ts_us":1741313540005062,"ts_ns":1741313540005062000}}
  4. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":29,"platform":"11v-xl","data_type":"camera3","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313539694,"snapshot":"false","db":"db_dmp","sequence":"["12072782920","12072783784"]","ts_us":1741313539694079,"ts_ns":1741313539694079000,"schema":"public","table":"data_info","txId":3435484,"lsn":12072783784,"xmin":null},"transaction":null,"op":"c","ts_ms":1741313540006,"ts_us":1741313540006726,"ts_ns":1741313540006726000}}
  5. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":28,"platform":"11v-xl","data_type":"camera2","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313539694,"snapshot":"false","db":"db_dmp","sequence":"["12072782920","12072783496"]","ts_us":1741313539694079,"ts_ns":1741313539694079000,"schema":"public","table":"data_info","txId":3435484,"lsn":12072783496,"xmin":null},"transaction":null,"op":"c","ts_ms":1741313540005,"ts_us":1741313540005913,"ts_ns":1741313540005913000}}
  6. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":30,"platform":"11v-xl","data_type":"camera5","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313539694,"snapshot":"false","db":"db_dmp","sequence":"["12072782920","12072784072"]","ts_us":1741313539694079,"ts_ns":1741313539694079000,"schema":"public","table":"data_info","txId":3435484,"lsn":12072784072,"xmin":null},"transaction":null,"op":"c","ts_ms":1741313540007,"ts_us":1741313540007475,"ts_ns":1741313540007475000}}
复制代码
检查Hologres数据:
  1. SELECT * FROM public.data_info ORDER BY id DESC;
复制代码
4.3 批量更新
在 PostgreSQL 中更新数据:
  1. UPDATE public.data_info SET  platform = '6v-xl', data_name = 'aroundBack_xl' WHERE platform = '11v-xl';
复制代码
查察Kafka-topic消息:
  1. I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
  2. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":26,"platform":"6v-xl","data_type":"camera0","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313734647,"snapshot":"false","db":"db_dmp","sequence":"["12073766384","12073766440"]","ts_us":1741313734647557,"ts_ns":1741313734647557000,"schema":"public","table":"data_info","txId":3435710,"lsn":12073766440,"xmin":null},"transaction":null,"op":"u","ts_ms":1741313734826,"ts_us":1741313734826063,"ts_ns":1741313734826063000}}
  3. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":27,"platform":"6v-xl","data_type":"camera1","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313734647,"snapshot":"false","db":"db_dmp","sequence":"["12073766384","12073775280"]","ts_us":1741313734647557,"ts_ns":1741313734647557000,"schema":"public","table":"data_info","txId":3435710,"lsn":12073775280,"xmin":null},"transaction":null,"op":"u","ts_ms":1741313734827,"ts_us":1741313734827700,"ts_ns":1741313734827700000}}
  4. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":29,"platform":"6v-xl","data_type":"camera3","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313734647,"snapshot":"false","db":"db_dmp","sequence":"["12073766384","12073775872"]","ts_us":1741313734647557,"ts_ns":1741313734647557000,"schema":"public","table":"data_info","txId":3435710,"lsn":12073775872,"xmin":null},"transaction":null,"op":"u","ts_ms":1741313734829,"ts_us":1741313734829449,"ts_ns":1741313734829449000}}
  5. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":28,"platform":"6v-xl","data_type":"camera2","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313734647,"snapshot":"false","db":"db_dmp","sequence":"["12073766384","12073775576"]","ts_us":1741313734647557,"ts_ns":1741313734647557000,"schema":"public","table":"data_info","txId":3435710,"lsn":12073775576,"xmin":null},"transaction":null,"op":"u","ts_ms":1741313734828,"ts_us":1741313734828522,"ts_ns":1741313734828522000}}
  6. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":30,"platform":"6v-xl","data_type":"camera5","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313734647,"snapshot":"false","db":"db_dmp","sequence":"["12073766384","12073776168"]","ts_us":1741313734647557,"ts_ns":1741313734647557000,"schema":"public","table":"data_info","txId":3435710,"lsn":12073776168,"xmin":null},"transaction":null,"op":"u","ts_ms":1741313734830,"ts_us":1741313734830236,"ts_ns":1741313734830236000}}
复制代码
检查Hologres数据:
  1. SELECT * FROM public.data_info ORDER BY id DESC;
复制代码
4.3 批量删除
在 PostgreSQL 中删除数据:
  1. DELETE FROM public.data_info WHERE platform = '6v-xl';
复制代码
查察Kafka-topic消息:
  1. I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
  2. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":26,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313871896,"snapshot":"false","db":"db_dmp","sequence":"["12074168712","12074168768"]","ts_us":1741313871896885,"ts_ns":1741313871896885000,"schema":"public","table":"data_info","txId":3435860,"lsn":12074168768,"xmin":null},"transaction":null,"op":"d","ts_ms":1741313871946,"ts_us":1741313871946325,"ts_ns":1741313871946325000}}
  3. null
  4. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":27,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313871896,"snapshot":"false","db":"db_dmp","sequence":"["12074168712","12074168840"]","ts_us":1741313871896885,"ts_ns":1741313871896885000,"schema":"public","table":"data_info","txId":3435860,"lsn":12074168840,"xmin":null},"transaction":null,"op":"d","ts_ms":1741313871947,"ts_us":1741313871947402,"ts_ns":1741313871947402000}}
  5. null
  6. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":29,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313871896,"snapshot":"false","db":"db_dmp","sequence":"["12074168712","12074168984"]","ts_us":1741313871896885,"ts_ns":1741313871896885000,"schema":"public","table":"data_info","txId":3435860,"lsn":12074168984,"xmin":null},"transaction":null,"op":"d","ts_ms":1741313871948,"ts_us":1741313871948261,"ts_ns":1741313871948261000}}
  7. null
  8. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":28,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313871896,"snapshot":"false","db":"db_dmp","sequence":"["12074168712","12074168912"]","ts_us":1741313871896885,"ts_ns":1741313871896885000,"schema":"public","table":"data_info","txId":3435860,"lsn":12074168912,"xmin":null},"transaction":null,"op":"d","ts_ms":1741313871947,"ts_us":1741313871947868,"ts_ns":1741313871947868000}}
  9. null
  10. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":30,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313871896,"snapshot":"false","db":"db_dmp","sequence":"["12074168712","12074169056"]","ts_us":1741313871896885,"ts_ns":1741313871896885000,"schema":"public","table":"data_info","txId":3435860,"lsn":12074169056,"xmin":null},"transaction":null,"op":"d","ts_ms":1741313871948,"ts_us":1741313871948688,"ts_ns":1741313871948688000}}
  11. null
复制代码
检查Hologres数据:
  1. SELECT * FROM public.data_info ORDER BY id DESC;
复制代码
4.4 事务操作
Pgsql先插入数据,再做更新,然后回滚更新:
  1. BEGIN;INSERT INTO public.data_info (        platform, data_type, data_name, dir_name, bag_exists, clip_exists, create_time, latest_update_time, deleted)        VALUES ('11v-xl', 'camera0', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0),        ('11v-xl', 'camera1', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0),         ('11v-xl', 'camera2', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0);SAVEPOINT my_savepoint;UPDATE public.data_info SET  platform = '6v-xl', data_name = 'aroundBack_xl' WHERE platform = '11v-xl';
  2. ROLLBACK TO my_savepoint;COMMIT;
复制代码
查察Kafka-topic消息,只有一次插入操作:
  1. I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
  2. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":37,"platform":"11v-xl","data_type":"camera0","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741327869321334,"latest_update_time":1741327869321334,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741327869322,"snapshot":"false","db":"db_dmp","sequence":"["12141193520","12141193624"]","ts_us":1741327869322708,"ts_ns":1741327869322708000,"schema":"public","table":"data_info","txId":3453096,"lsn":12141193624,"xmin":null},"transaction":null,"op":"c","ts_ms":1741327869719,"ts_us":1741327869719110,"ts_ns":1741327869719110000}}
  3. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":38,"platform":"11v-xl","data_type":"camera1","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741327869321334,"latest_update_time":1741327869321334,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741327869322,"snapshot":"false","db":"db_dmp","sequence":"["12141193520","12141204296"]","ts_us":1741327869322708,"ts_ns":1741327869322708000,"schema":"public","table":"data_info","txId":3453096,"lsn":12141204296,"xmin":null},"transaction":null,"op":"c","ts_ms":1741327869720,"ts_us":1741327869720542,"ts_ns":1741327869720542000}}
  4. {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":39,"platform":"11v-xl","data_type":"camera2","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741327869321334,"latest_update_time":1741327869321334,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741327869322,"snapshot":"false","db":"db_dmp","sequence":"["12141193520","12141204584"]","ts_us":1741327869322708,"ts_ns":1741327869322708000,"schema":"public","table":"data_info","txId":3453096,"lsn":12141204584,"xmin":null},"transaction":null,"op":"c","ts_ms":1741327869721,"ts_us":1741327869721271,"ts_ns":1741327869721271000}}
复制代码
检查Hologres数据,生存UPDATE语句之前的数据:
  1. SELECT * FROM public.data_info ORDER BY id DESC;
复制代码
5. 总结



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4