PostgreSQL主从流复制状态监控和自动故障转移的轻量级实现 ...

打印 上一主题 下一主题

主题 1938|帖子 1938|积分 5814

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
如何实现PostgreSQL的高可用,之前研究过repmgr以及pg_auto_failover,起作用都是起到主节点故障时,实现“自动故障转移”的目的。
但是repmgr以及pg_auto_failover得缺点是对数据库侵入过多,必要在监控的数据库内部进行一系列的设置操作,同时必要启动第三方服务实现节点的可用性监控,这又引入了额外的不确定性因素。
如果是单纯地为了故障转移,repmgr以及pg_auto_failover都显得过于极重,于是快速尝试了一个轻量级的实现方案
实现思绪

紧张思绪如下:
  1. 1,自动故障转移:该方式在正常情况下,作为一个服务持续监控主从复制状态
  2. while 1:
  3.         轮训PostgreSQL流复制的状态:
  4.         if 如果主节点不可达:
  5.             if 判断从节点是否可达:
  6.             if 从节点状态是是in_recovery:
  7.                         promote 从节点,从节点接管读写服务
  8.             
  9.        
  10. 2,手动故障转移:如果是主从节点都正常的情况下,为了演练灾备切换,人工故障转移
  11. if 判断从节点是否可达:
  12.     if 从节点状态是是in_recovery:
  13.             promote 从节点,从节点接管读写服务
  14. #把原始主节点作为从节点加入集群中运行
  15. if 如果主节点不可达:
  16.         if 从节点状态是不是in_recovery:
  17.                 1,关闭原始主库
  18.                 2,尝试修复时间线
  19.                 3,尝试以standby的模式启动主库
复制代码
优点

快速尝试了一个轻量级的实现方案,优点如下:
1,不引入或者依靠于任何第三方中间件,可以实现多组流复制集群的监控和故障转移。
2,不对现有PostgreSQL的复制环境做任何修改和侵入,以“第三方”的视角来监控现有PostgreSQL流复制,可以自行实现PostgreSQL流复制状态监控和故障转移。
3,支持在主节点故障时实现自动故障转移,或者手动故障转移视线灾备切换演练。
4,简单轻量,可以自定义故障转移的模式以及自定义日记等信息,通知信息等 详细实现

详细实现如下,目前在两台EC2上测试环境上,一台腾讯云,一台阿里云上搭建PostgreSQL流复制。经测试:可以一键快速实现主从切换,或者一连故障转移(A节点切换到B节点,B节点切换到A节点,必要修改主从毗连信息)
  1. import psycopg2
  2. import time
  3. import paramiko
  4. # 连接测试
  5. def conn_to_postgres(db_config):
  6.     try:
  7.         conn = psycopg2.connect(**db_config)
  8.         conn.close()
  9.         return True
  10.     except Exception as e:
  11.         print(f"Error connecting to master: {e}")
  12.         return False
  13. # 判断节点身份
  14. def is_postgresql_recovery(db_config):
  15.     try:
  16.         with psycopg2.connect(**db_config) as conn:
  17.             with conn.cursor() as cur:
  18.                 cur.execute("SELECT pg_is_in_recovery();")
  19.                 in_recovery = cur.fetchone()[0]
  20.         return in_recovery
  21.     except Exception as e:
  22.         print(f"Error connecting to master: {e}")
  23.         return False
  24. # 探测节点是否可达
  25. def is_postgresql_reachable(db_config,retry_times):
  26.     # 这里仅仅判断主节点是否可连通
  27.     # 判断节点是否可达的时候,作为仲裁方,可以增加其他判断,比如当前节点是否能ping的通DNS服务器等,标明当前节点的判断是没有问题的
  28.     while retry_times > 0:
  29.         if not conn_to_postgres(db_config):
  30.             print('the postgres cannot reachable,retrying......')
  31.             time.sleep(10)
  32.             retry_times = retry_times -1
  33.         else:
  34.             return True
  35.     else:
  36.         return False
  37. def ssh_conn(host, username, password, port):
  38.     ssh = paramiko.SSHClient()
  39.     ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  40.     try:
  41.         ssh.connect(hostname=host, port=port, username=username, password=password)
  42.         return ssh
  43.     except Exception as e:
  44.         return None, str(e)
  45. def create_replication_slot_if_not_exists(db_config, slot_name):
  46.     try:
  47.         with psycopg2.connect(**db_config) as conn:
  48.             with conn.cursor() as cur:
  49.                 cur.execute("SELECT slot_name FROM pg_replication_slots WHERE slot_name = '{0}';".format(slot_name))
  50.                 var_slot_name = cur.fetchall()
  51.                 if not var_slot_name:
  52.                     cur.execute("SELECT * FROM pg_create_physical_replication_slot('{0}');".format(slot_name))
  53.                     print(f"Replication slot '{slot_name}' created.")
  54.                 else:
  55.                     print(f"Replication slot '{slot_name}' already exists.")
  56.     except Exception as e:
  57.         print(f"Error connecting to master: {e}")
  58.         return False
  59. def promote_standby_to_primary(db_config):
  60.     try:
  61.         with psycopg2.connect(**db_config) as conn:
  62.             conn.autocommit = True
  63.             with conn.cursor() as cur:
  64.                 cur.execute("SELECT pg_promote(true);")
  65.         print("{0} Standby promoted to primary successfully.".format(db_config['host']))
  66.     except Exception as e:
  67.         print("{0} Standby promoted to primary failed : {1}".format(db_config[0],e))
  68. # 时间线修复
  69. def run_pg_rewind(primary_node, standby_node):
  70.     # 新的主服务器的连接信息(pg_rewind需要这些信息)
  71.     primary_db_config = "host={0} port={1} user={2} password={3}".format(primary_node['db_config']['host'],
  72.                                                                          primary_node['db_config']['port'],
  73.                                                                          primary_node['db_config']['user'],
  74.                                                                          primary_node['db_config']['password'])
  75.     # 构建pg_rewind命令
  76.     pg_rewind_cmd = ("sudo -u postgres {0}/pg_rewind --target-pgdata={1} --source-server='host={2} port={3} user={4} dbname='postgres' password={5}'"
  77.                      .format(standby_node['pg_base_dir'],
  78.                              standby_node['pg_data_dir'],
  79.                              primary_node['db_config']['host'],
  80.                              primary_node['db_config']['port'],
  81.                              primary_node['db_config']['user'],
  82.                              primary_node['db_config']['password'] ))
  83.     ssh = ssh_conn(host=standby_node['host'], username=standby_node['user_name'], port=standby_node['port'], password=standby_node['password'])
  84.     try:
  85.         stdin, stdout, stderr = ssh.exec_command('sudo systemctl stop postgresql9000')
  86.         output = stdout.read().decode('utf-8')
  87.         error = stderr.read().decode('utf-8')
  88.         if stderr.channel.recv_exit_status() != 0:
  89.             raise Exception(f"Failed to stop postgresql9000")
  90.             print(error)
  91.         print('######begin to rewind standby node')
  92.         print(pg_rewind_cmd)
  93.         stdin, stdout, stderr = ssh.exec_command(pg_rewind_cmd)
  94.         output = stdout.read().decode('utf-8')
  95.         error = stderr.read().decode('utf-8')
  96.         if stderr.channel.recv_exit_status() != 0:
  97.             print('standby node pg_rewind failed')
  98.             raise Exception(f"{0} Failed to rewind commend".format(standby_node['db_config']['host']))
  99.         print('######standby node pg_rewind successfully')
  100.         return True
  101.     except Exception as err:
  102.         print('pg_rewind failed ' + str(err))
  103.         return False
  104.     finally:
  105.         ssh.close()
  106. def startup_as_replica(primary_node,standby_node):
  107.     standby_auto_conf_path = '{0}/postgresql.auto.conf'.format(standby_node['pg_data_dir'])
  108.     standby_signal_path = '{0}/standby.signal'.format(standby_node['pg_data_dir'])
  109.     ssh = ssh_conn(host=standby_node['host'], username=standby_node['user_name'], port=standby_node['port'], password=standby_node['password'])
  110.     # 要写入postgresql.auto.conf的内容(模版)
  111.     auto_conf_content = """
  112.         primary_conninfo = 'user={2} password=''{3}'' channel_binding=prefer host={0} port={1} sslmode=prefer sslcompression=0 sslcertmode=allow sslsni=1 ssl_min_protocol_version=TLSv1.2 gssencmode=disable krbsrvname=postgres gssdelegation=0 target_session_attrs=any load_balance_hosts=disable'
  113.         primary_slot_name = '{4}'
  114.         """.format(primary_node['db_config']['host'],
  115.                    primary_node['db_config']['port'],
  116.                    primary_node['repl_user'],
  117.                    primary_node['repl_password'],
  118.                    primary_node['slot_name']).lstrip()
  119.     # 要创建的standby.signal文件(空文件即可,表示该节点为备用节点)
  120.     try:
  121.         stdin, stdout, stderr = ssh.exec_command('sudo systemctl stop postgresql9000')
  122.         output = stdout.read().decode('utf-8')
  123.         error = stderr.read().decode('utf-8')
  124.         if stderr.channel.recv_exit_status() != 0:
  125.             raise Exception(f"Failed to stop postgresql9000")
  126.             print(error)
  127.         try:
  128.             # 创建standby.signal文件(如果尚不存在)
  129.             print('###### {0} touch {1}'.format(standby_node['host'],standby_signal_path))
  130.             stdin, stdout, stderr = ssh.exec_command(f'sudo -u postgres touch {standby_signal_path}')
  131.             output = stdout.read().decode('utf-8')
  132.             error = stderr.read().decode('utf-8')
  133.             # 检查命令是否成功执行
  134.             if stderr.channel.recv_exit_status() != 0:
  135.                 raise Exception(f"Failed to create {standby_signal_path}: {stderr.read().decode()}")
  136.             print('###### {0} touch {1}'.format(standby_node['host'], standby_auto_conf_path))
  137.             stdin, stdout, stderr = ssh.exec_command(f'sudo -u postgres touch {standby_auto_conf_path}')
  138.             stdin, stdout, stderr = ssh.exec_command('''sudo  echo "{0}"  > {1}'''.format(auto_conf_content,standby_auto_conf_path))
  139.             output = stdout.read().decode('utf-8')
  140.             error = stderr.read().decode('utf-8')
  141.             # 检查命令是否成功执行
  142.             if stderr.channel.recv_exit_status() != 0:
  143.                 print(error)
  144.                 raise Exception(f"Failed to create {standby_signal_path}: {stderr.read().decode()}")
  145.         except Exception as err:
  146.             print(err)
  147.         stdin, stdout, stderr = ssh.exec_command('sudo systemctl restart postgresql9000')
  148.         output = stdout.read().decode('utf-8')
  149.         error = stderr.read().decode('utf-8')
  150.         # 检查命令是否成功执行
  151.         if stderr.channel.recv_exit_status() != 0:
  152.             print(error)
  153.             raise Exception(f"{0} restart postgresql failed".format(standby_node['host']))
  154.     finally:
  155.         ssh.close()
  156. # 人工故障转移:在主节点可达的情况,把主节点作为从节点运行,可以是灾备切换的预演
  157. def execute_failover(primary_node,standby_node):
  158.     print('promote 从节点为主节点')
  159.     # 判断standby节点是否可达,如果不可达,重试5次
  160.     if is_postgresql_reachable(standby_node['db_config'],retry_times=5):
  161.         # 判断standby节点是否处于recovery模式
  162.         if is_postgresql_recovery(standby_node['db_config']):
  163.             # 创建复制槽,为从节点复制做准备
  164.             create_replication_slot_if_not_exists(standby_node['db_config'], standby_node['slot_name'])
  165.             # promote standby节点
  166.             promote_standby_to_primary(standby_node['db_config'])
  167.         else:
  168.             print('当前节点非recovery模式,不可执行promote')
  169.     # 将旧的主节点作为从节点加入新的主节点复制
  170.     # 判断原主节点是否可达
  171.     print('将原始主节点,重新以从节点身份加入复制集群')
  172.     if is_postgresql_reachable(primary_node['db_config'],retry_times=5):
  173.         # 执行pg_rewind修复原始主节点的时间线,请注意:此时从节点提升为主节点,主节点已经变为从节点,所以需要注意节点身份的变化和参数的变化
  174.         pg_rewind_result = run_pg_rewind(primary_node=standby_node,standby_node=primary_node)
  175.         if pg_rewind_result:
  176.             # 注意此时主从节点身份已经变换,所以这里参数也变化,原来的从节点已经成为主节点,所以注意参数的交换
  177.             startup_as_replica(primary_node=standby_node, standby_node=primary_node)
  178. # 自动检测和故障转移
  179. def auto_failover(primary_node,standby_node):
  180.     while True:
  181.         if not is_postgresql_reachable(primary_node['db_config'], retry_times=5):
  182.             # 告警通知,说明当前配置的主节点故障,即将开始主从切换
  183.             execute_failover(primary_node,standby_node)
  184.             # 执行完切换之后,进行写日志,邮件/短信告警通知等
  185.             # 因为主从身份发生了变化,不再对当前的主从配置进行监控,应该第一时间人工介入:1,确认原始主节点是否正常,2,原始主节点是否以从节点的身份加入集群
  186.             exit(0)
  187.         # 定期检查间隔
  188.         time.sleep(15)
  189. if __name__ == "__main__":
  190.    
  191.     # 数据库连接配置
  192.     primary_node  = {
  193.         'host':'***.***.***.***',                                #主机名
  194.         'user_name':"root",                                      #系统用户名
  195.         'password': "******",                                    #系统密码
  196.         'port':22,                                               #ssh端口号
  197.         'pg_base_dir':'/usr/local/pgsql16/server/bin',           #PostgreSQL Server路径
  198.         'pg_data_dir':'/usr/local/pgsql16/pg9000/data',          #PostgreSQL 数据路径
  199.         'repl_user':'replica_user',                              #流复制用户名
  200.         'repl_password':'******',                                #流复制用户密码
  201.         'slot_name': 'pgstandby_slave01',                        #流复制slot名字
  202.         'db_config': {                                           #数据库配置
  203.             'host': '***.***.***.***',
  204.             'user': 'postgres',
  205.             'password': '******',
  206.             'dbname': 'postgres',
  207.             'port': '******'
  208.         },
  209.     }
  210.     standby_node1 = {
  211.         'host':'***.***.***.***',
  212.         'user_name':"root",
  213.         'password': "******",
  214.         'port':22,
  215.         'pg_base_dir':'/usr/local/pgsql16/server/bin',
  216.         'pg_data_dir':'/usr/local/pgsql16/pg9000/data',
  217.         'repl_user':'replica_user',
  218.         'repl_password':'******',
  219.         'slot_name': 'pgstandby_slave01',
  220.         'db_config': {
  221.             'host': '***.***.***.***',
  222.             'user': 'postgres',
  223.             'password': '******',
  224.             'dbname': 'postgres',
  225.             'port': '******'
  226.         },
  227.     }
  228.     # 手动故障转移,主从节点以及复制均正常的情况下,实现参数里的主从节点交换身份
  229.     execute_failover(primary_node=primary_node, standby_node=standby_node1)
  230.     # 单独的时间线修复操作
  231.     #pg_rewind_result = run_pg_rewind(primary_node=standby_node1, standby_node=primary_node)
  232.     # 单独的讲一个节点作为从节点运行
  233.     # 注意此时主从节点身份已经变换,所以这里参数也变化,原来的从节点已经成为主节点,所以注意参数的交换
  234.     # startup_as_replica(primary_node=standby_node1, standby_node=primary_node)
复制代码
 
待改进

1,用户名密码均已明文方式写在主机信息中
2,pg_rewind的时候必要重启服务器,这里把PostgreSQL的服务写死了,笔者的测试环境PostgreSQL服务名是postgresql9000
3,如果是自动切换模式,自动故障转移后,尚未修改主从设置信息,此时节点的身份信息还是切换前的
4,判定节点是否正常的逻辑中,仅仅通过是否可以毗连来实现,如果监控服务自己无法与被监控节点通信,可能会出现误判的环境

 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我可以不吃啊

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表