PostgreSQL10 逻辑复制实战:构建高可用数据同步架构!

打印 上一主题 下一主题

主题 1887|帖子 1887|积分 5661

PostgreSQL10 逻辑复制实战:打造高可用数据同步架构!

概述

PostgreSQL 10 引入了逻辑复制(Logical Replication),为数据库高可用和数据同步提供了更机动的选择。PostgreSQL 复制机制主要分为物理复制和逻辑复制两种:物理复制(又称流复制/物理块复制)在实例级别同步数据,而逻辑复制则支持更精细的复制粒度。逻辑复制通过逻辑解码插件剖析 WAL 日记,提取 DML 语句并在订阅端执行,从而实现表级别的数据同步,适用于分库分表、实时数据同步、异构数据同步等高可用场景。
具备特性

逻辑复制过程中使用限定:
  1. 1. 不支持复制DDL。
  2. 2. 不支持复制序列、索引。
  3. 3. 不支持双向复制。
  4. 4. 发布节点和订阅节点表的模式名、表名必须一致,订阅节点允许表有额外字段。
复制代码
逻辑复制与物理复制区别:
  1. 1. 物理复制不能垮操作系统(Linux-Windows),而逻辑复制可以。
  2. 2. 无法在不同的PG版本之间进行物理复制(例如10-12),逻辑复制可以支持。因此PostgreSQL大版本升级可以使用逻辑复制。
  3. 3. 物理复制是实例级别的复制,而逻辑复制可以基于对象级别(具体到某个表)。
  4. 4. 物理复制备库只能读,逻辑复制的备库可以写入。
复制代码
应用场景

可基于表级别复制,是一种粒度可细的复制,主要用在以下场景
  1. 1. 满足业务上需求,实现某些指定表数据同步。
  2. 2. PostgreSQL 跨版本数据同步。
  3. 3. PostgreSQL 大版本升级。
复制代码
具体流程

逻辑复制的流程图

PostgreSQL数据库逻辑复制使用发布者/订阅者模子,使用订阅复制槽技术,可并行的传输WAL日记,通过在订阅端回放WAL日记中的逻辑条目,保持复制表的数据同步,订阅端通过逻辑解码对数据进行REDO。
PUBLICATION对象
  1. CREATE PUBLICATION 名称
  2. [ FOR TABLE [ ONLY ] 表名 [ * ] [, ...]
  3. | FOR ALL TABLES ]
  4. [ WITH ( publication_parameter [= 值] [, ... ] ) ]
复制代码
参数说明
  1. FOR TABLE:表示要复制的表,可以通过’,’定义多个表。
  2. FOR ALL TABLES:表示数据库的所有表都要复制。
  3. WITH:表的DML操作行为,忽略表示全部DML操作。
复制代码
一个PUBLICATION对象可以注册一个或多个表,也可以选择DML操纵进行复制,一个表同时也可以被多个PUBLICATION注册。
SUBSCRIPTION对象
  1. CREATE SUBSCRIPTION subscription_name
  2. CONNECTION 'conninfo'
  3. PUBLICATION publication_name [, ...]
  4. [ WITH ( subscription_parameter [= 值] [, ... ] ) ]
复制代码
参数说明
  1. CONNECTION:连接master节点的字符串信息。eg. 'host=ip port=5432 user=xxx dbname=xxx'
  2. PUBLICATION:对应发布端的PUBLICATION对象
  3. WITH:表示DML操作,忽略表示全部DML操作
复制代码
SUBSCRIPTION对象是逻辑复制过程汇总,由订阅节点创建的对象,用于连接发布节点的PUBLICATION对象。
逻辑解码

逻辑解码是使用一个输出插件将 Postgres 的预写日记 (WAL) 转换为可读格式。逻辑解码过程如图:

当 Postgres 数据库表中的一行发生更改时,该更改会记载在 WAL 中。如果启用了逻辑解码,则该更改的记载将传递给输出插件。输出插件将记载从 WAL 格式更改为插件的格式(例如 JSON 对象)。然后重新格式化的更改通过复制槽退出 Postgres。末了是消费者。消费者是您选择的任何连接到 Postgres 并接收逻辑解码输出的应用程序。
pgout插件解码wal后效果:
  1. testdb=# select * from pg_logical_slot_get_changes('test', pg_current_wal_lsn(), 10);
  2.     lsn     |  xid   |                                                 data                                                
  3. ------------+--------+------------------------------------------------------------------------------------------------------
  4. 0/3DAE5178 | 377183 | BEGIN 377183
  5. 0/3DAE5178 | 377183 | table public.users: UPDATE: id[character varying]:'4' name[character varying]:'anna' age[integer]:21
  6. 0/3DAE5348 | 377183 | COMMIT 377183
  7. 0/3DAE65F0 | 377184 | BEGIN 377184
  8. 0/3DAE65F0 | 377184 | table public.users: INSERT: id[character varying]:'5' name[character varying]:'5' age[integer]:55
  9. 0/3DAE6728 | 377184 | COMMIT 377184
  10. (6 rows)
复制代码
wal2json 插件可解码为json格式。
  1. https://docs.microsoft.com/zh-cn/azure/postgresql/concepts-logical
复制代码
使用示例

测试构建PostgreSQL10的逻辑复制环境。
角色数据库操纵系统版本和数据库版本复制用户发布节点:172.168.98.107testdbCentos 7/ PostgreSQL 10replication订阅节点:172.168.98.115testdbCentos 7/ PostgreSQL 10replication 1、首先必要在发布角色节点设置 postgresql.conf 相干参数。
  1. wal_level = logical
复制代码
2、配置主和复制节点的pg_hba.conf文件配置replication用户连接不受限
  1. host replication all 0.0.0.0/0 trust
复制代码
3、主库和复制库上都创建replication角色并具有复制权限
  1. [root@localhost ~]# su postgres
  2. bash-4.2$ psql
  3. postgres=# CREATE ROLE replication WITH replication PASSWORD '123456' LOGIN;
复制代码
4、主库和复制库上都创建测试库和测试表
  1. postgres=# CREATE USER testdb WITH ENCRYPTED PASSWORD 'testdb!123';
  2. postgres=# CREATE DATABASE testdb OWNER testdb;
复制代码
执行\c下令切换至testdb数据库
  1. postgres=# \c testdb testdb
  2. You are now connected to database "testdb" as user "testdb".
  3. testdb=>
复制代码
继续执行创建测试表
  1. CREATE TABLE users (
  2.         id varchar(10) NOT NULL,
  3.         name varchar(35) NOT NULL,
  4.         age integer
  5. ) ;
  6. ALTER TABLE public.users ADD CONSTRAINT users_id_pkey PRIMARY KEY ("id");
复制代码
主库插入测试数据
  1. INSERT INTO public.users (id, name, age) VALUES ('1', 'zhangsan', 20);
  2. INSERT INTO public.users (id, name, age) VALUES ('2', 'lisi', 30);
  3. INSERT INTO public.users (id, name, age) VALUES ('3', 'wangwu', 21);
复制代码
5、主库和复制库上都给replication用户授权数据库权限
  1. testdb=> GRANT SELECT ON ALL tables IN SCHEMA PUBLIC TO replication;
复制代码
6、在主库上创建发布并指定users表
  1. testdb=> CREATE PUBLICATION testpub FOR TABLE users;
  2. CREATE PUBLICATION
  3. testdb=> \dRp
  4.                     List of publications
  5.   Name   | Owner  | All tables | Inserts | Updates | Deletes
  6. ---------+--------+------------+---------+---------+---------
  7. testpub | testdb | f          | t       | t       | t
  8. (1 row)
  9. testdb=>
复制代码
7、在复制库上创建订阅
创建订阅,指定连接到主库上的发布。使用superuser来创建订阅,通过下令\c切换至postgres用户。
  1. testdb=> \c testdb postgres
  2. You are now connected to database "testdb" as user "postgres".
  3. testdb=# CREATE SUBSCRIPTION testsub CONNECTION 'host=172.168.98.107 port=5432 dbname=testdb user=replication password=123456' PUBLICATION testpub;
  4. NOTICE:  created replication slot "testsub" on publisher
  5. CREATE SUBSCRIPTION
复制代码
创建订阅时可指定已经存在的slot
  1. CREATE SUBSCRIPTION testsub CONNECTION 'host=172.168.98.107 port=5432 dbname=testdb user=replication password=123456' PUBLICATION testpub WITH (slot_name=test, create_slot=false);
复制代码
复制库上面检察订阅环境
  1. testdb=# \dRs
  2.            List of subscriptions
  3.   Name   |  Owner   | Enabled | Publication
  4. ---------+----------+---------+-------------
  5. testsub | postgres | t       | {testpub}
  6. (1 row)
复制代码
创建乐成之后数据会自动复制过来。
  1. testdb=# SELECT * FROM users;
  2. id |   name   | age
  3. ----+----------+-----
  4. 1  | zhangsan |  20
  5. 2  | lisi     |  30
  6. 3  | wangwu   |  21
  7. (3 rows)
复制代码
8、测试增删改
  1. -- 主库插入记录
  2. testdb=> INSERT INTO users VALUES('4','anna', 17);
  3. INSERT 0 1
  4. -- 从库查询,记录'anna'已插入。
  5. testdb=> select * from users;
  6. id |   name   | age
  7. ----+----------+-----
  8. 1  | zhangsan |  20
  9. 2  | lisi     |  30
  10. 3  | wangwu   |  21
  11. 4  | anna     |  17
  12. (4 rows)
  13. -- 主库修改'anna'
  14. testdb=> update users set age=18 where id='4';
  15. -- 从库查询'anna'age已经同步修改...
  16. -- 主库删除'anna'
  17. testdb=> delete from users where id='4';
  18. -- 从库查询'anna'这行数据同步删除...
复制代码
常用下令

检察当前数据库已有发布
  1. SELECT * FROM pg_publication;
复制代码
检察当前数据库已有订阅
  1. SELECT * FROM pg_subscription;
复制代码
删除发布
  1. DROP PUBLICATION testpub;
复制代码
删除订阅
  1. DROP SUBSCRIPTION testsub;
复制代码
禁用订阅
  1. ALTER SUBSCRIPTION testsub disable;
复制代码
启动订阅
  1. ALTER SUBSCRIPTION testsub enable;
复制代码
如果逻辑复制操纵中一张表缺少主键,就必要执行这条语句,代表使用整行作为标识
  1. ALTER TABLE table REPLICA IDENTITY FULL;
复制代码
解除复制槽与订阅的关联
  1. ALTER SUBSCRIPTION testsub disable;
  2. ALTER SUBSCRIPTION testsub SET (slot_name = NONE);DROP SUBSCRIPTION testsub;
复制代码
显示当前服务的所有复制连接(发布端执行)
  1. SELECT * FROM pg_stat_replication;
复制代码
显示订阅者的状态信息
  1. SELECT * FROM pg_stat_subscription;
复制代码
显示所有复制槽(发布端执行)
  1. SELECT * FROM pg_replication_slots;
复制代码
创建复制槽
  1. select pg_create_logical_replication_slot('test','test_decoding');
复制代码
创建复制槽
  1. SELECT pg_create_logical_replication_slot('sub_iuser', 'pgoutput');
复制代码
在这个示例中:


  • ‘sub_iuser’ 是要创建的复制槽的名称。
  • ‘pgoutput’ 是指定的输出插件名称,它用于将逻辑复制的 WAL 记载转换为适合于逻辑复制的格式。
执行上述 SQL 下令后,将创建名为 sub_iuser 的逻辑复制槽。
请确保在创建复制槽之前,已经启用了逻辑复制,而且已经将逻辑复制参数配置为允许创建复制槽。
删除复制槽
  1. SELECT * FROM pg_drop_replication_slot('test');
复制代码
脚本实践

使用脚本发布订阅相干数据库和相干表
逻辑复制发布脚本

  1. #!/bin/sh# DB暗码PASSWORD=PG_PWD# 查询所有数据库database_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "SELECT d.datname AS database_name FROM pg_database d JOIN pg_user u ON d.datdba = u.usesysid WHERE u.usename = 'iuser';")if [ -z "$database_name_result" ]; then    # 查询效果为空    exit 1fiwhile IFS= read -r database_name; do    database_name=$(echo "$database_name" | sed 's/ //g')    # 删除大概存在的发布、订阅关系    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP PUBLICATION IF EXISTS pub_$database_name;"    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP SUBSCRIPTION IF EXISTS sub_$database_name;"    # 重新创建发布关系    table_names=""    # 主备必要排除表(例如排除日记表)    table_names=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "        SELECT string_agg(table_name, ',') AS table_names            FROM information_schema.tables        WHERE table_schema = 'public'            AND table_type = 'BASE TABLE'            AND table_name NOT LIKE 't_log%'            AND table_name != 't_xxx';")    # 获取要发布的表清单信息    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "CREATE PUBLICATION pub_$database_name FOR TABLE $table_names;"    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "SELECT * FROM pg_publication;
  2. "    echo "PUBLICATION $database_name done."echo "PUBLICATION done."
复制代码
逻辑复制订阅脚本

留意:必要传参主库服务器ip和主库数据库暗码。
  1. #!/bin/sh# 主机MASTER_HOST=$1# 主机数据库暗码MASTER_PASSWORD=$2# 当前备机旧暗码PASSWORD=PG_PWD# 查询iuser用户下的所有数据库database_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "SELECT d.datname AS database_name FROM pg_database d JOIN pg_user u ON d.datdba = u.usesysid WHERE u.usename = 'iuser';")if [ -z "$database_name_result" ]; then# 查询效果为空    exit 1fi# 遍历每个数据库while IFS= read -r database_name; do    database_name=$(echo "$database_name" | sed 's/ //g')    # 删除存在的发布、订阅关系    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP PUBLICATION IF EXISTS pub_$database_name;"    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP SUBSCRIPTION IF EXISTS sub_$database_name;"    # 实验连接数据库    echo "实验连接主机数据库[$database_name]..."    PGPASSWORD=$MASTER_PASSWORD psql -h $MASTER_HOST -p 5432 -U iuser -d $database_name -c "SELECT 1;"    if [ $? -eq 0 ]; then        echo "主机数据库连接[$database_name]乐成..."    else        echo "主机数据库连接[$database_name]失败..."        continue    fi    # 清空表数据    # 主备必要排除表(例如排除日记表)    table_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "        SELECT table_name            FROM information_schema.tables        WHERE table_schema = 'public'            AND table_type = 'BASE TABLE'            AND table_name NOT LIKE 't_log%'            AND table_name != 't_xxx';")    while IFS= read -r table_name; do        echo "Processing table: $table_name"        # 清空表数据        PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "TRUNCATE TABLE $table_name;"    # 重新创建订阅关系    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "    CREATE SUBSCRIPTION sub_$database_name    CONNECTION 'host=$MASTER_HOST port=5432 user=iuser password=$MASTER_PASSWORD dbname=$database_name'    PUBLICATION pub_$database_name;"    # 查询订阅环境    echo "SELECT SUBSCRIPTION"    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "SELECT * FROM pg_subscription;
  2. "    echo "SUBSCRIPTION $database_name done."done <<< "$database_name_result"echo "SUBSCRIPTION done."
复制代码
逻辑复制停止脚本

  1. #!/bin/sh
  2. # 本机密码
  3. PASSWORD=PG_PWD
  4. # 查询iuser用户下的所有数据库
  5. database_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "
  6. SELECT d.datname AS database_name FROM pg_database d JOIN pg_user u ON d.datdba = u.usesysid WHERE u.usename = 'iuser';")
  7. # 检查查询结果是否为空
  8. if [ -z "$database_name_result" ]; then
  9.     # 查询结果为空
  10.     exit 1
  11. fi
  12. # 遍历每个数据库
  13. while IFS= read -r database_name; do
  14.     database_name=$(echo "$database_name" | sed 's/ //g')
  15.     echo "DROP PUBLICATION&SUBSCRIPTION"
  16.     # 删除之前可能存在的发布、订阅关系
  17.     PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP PUBLICATION IF EXISTS pub_$database_name;"
  18.     PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP SUBSCRIPTION IF EXISTS sub_$database_name;"
  19.     echo "DROP $database_name PUBLICATION&SUBSCRIPTION done."
  20. done <<< "$database_name_result"
  21. echo "PUBLICATION done."
复制代码
延长测试

100w测试
  1. delete from users;
  2. insert into users select generate_series(1,1000000), 'anna', 18;
  3. delete from users;
  4. > Affected rows: 1000000
  5. > 时间: 2.024s
复制代码
删除100w行必要时间2s,观测延长
  1. select * from users limit 1;  
  2. \watch 1  
  3. 开始时间
  4. 2022年02月18日 星期五 18时18分03秒 (每 1s)
  5. id | name | age
  6. ----+------+-----
  7. 1  | anna |  18
  8. (1 行记录)
  9. 结束时间  
  10. 2022年02月18日 星期五 18时18分25秒 (每 1s)
  11. id | name | age
  12. ----+------+-----
  13. (0 行记录)
复制代码
延长20s左右
10w测试
  1. insert into users select generate_series(1,100000), 'anna', 18;
  2. delete from users;
  3. > Affected rows: 100000
  4. > 时间: 0.229s
复制代码
删除10w行必要时间0.2s,观测延长
  1. select * from users limit 1;  
  2. \watch 1  
  3. 开始时间
  4. 2022年02月18日 星期五 18时29分20秒 (每 1s)
  5. id | name | age
  6. ----+------+-----
  7. 1  | anna |  18
  8. (1 行记录)
  9. 结束时间  
  10. 2022年02月18日 星期五 18时29分22秒 (每 1s)
  11. id | name | age
  12. ----+------+-----
  13. (0 行记录)
复制代码
延长2s左右,取决于一次性事务巨细。
相干题目

逻辑复制配置双向复制WAL循环

  1. -- 正向发布
  2. CREATE TABLE t(a SERIAL, b CHAR);
  3. create publication testpub1 FOR table t;
  4. -- 正向订阅
  5. CREATE TABLE t(a SERIAL, b CHAR);
  6. create subscription testsub1 connection 'host=172.168.98.107 port=5432 dbname=testdb user=replication' publication testpub1;
  7. -- 反向发布
  8. create publication testpub2 FOR table t;
  9. -- 反正订阅
  10. create subscription testsub2 connection 'host=172.168.98.115 port=5432 dbname=testdb user=replication' publication testpub2;
复制代码
至此已经创建了一个双向循环复制,如图所示。

此时我在发布端插入一条数据就会出现环绕现像。此时就会出现循环。不绝的从A复制到B,再从B复制到A,直到把数据库搞崩。双向复制必要使用不同的表来实现。使用同样的表会产生WAL循环。
相干链接

  1. 1. PostgreSQL10官网文档
  2. https://www.postgresql.org/docs/10/index.html
  3. 2. PostgreSQL10逻辑特性
  4. https://www.postgresql.org/docs/10/logical-replication.html
  5. 3. 逻辑解码
  6. https://docs.microsoft.com/zh-cn/azure/postgresql/concepts-logical
复制代码
总结

逻辑复制是PostgreSQL10引入的重要特性,为数据库提供了更机动的同步方式。在高可用架构中,逻辑复制可用于数据同步、灾备切换、实时分析等场景,提升数据库的可扩展性和业务连续性。合理规划依然可以打造稳定高效的高可用架构。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

南飓风

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表