MySQL数据归档小工具推荐及优化--mysql_archiver

打印 上一主题 下一主题

主题 502|帖子 502|积分 1506

一.主要概述

MySQL数据库归档历史数据主要可以分为三种方式:一.创建编写SP、设置Event;二.通过dump导入导出;三.通过pt-archiver工具举行归档。第一种方式每每受限于同实例要求,每每被各人舍弃。第二种,性能相对较好,但是归档表较多时运维也是比力头疼的事。所以许多DBA每每接纳第三种方式--pt-archiver。
pt-archiver是Percona-Toolkit工具集中的一个组件,是一个主要用于对MySQL表数据举行归档和清除的工具。它可以将数据归档到另一张表大概是一个文件中。pt-archiver在清除表数据的过程中并不会影响OLTP事务的查询性能。对于数据的归档,它可以归档到另一台服务器上的另一张表,也可归档到一个文件中。
有一个网友通过Python开辟了将pt-archiver打包成了一个小工具—mysql_archiver,它基本实现了归档作业的设置化部署,使用起来挺不错。"MySQL_archiver基本上实现了数据归档的自动运转,统一的归档任务调理管理、自动监控和预警、自动生成报表。在一定程度上节省了生产力,提高了运维效率。"
github地址:GitHub - dbarun/mysql_archiver: MySQL数据归档小工具

二.pt-archiver主要参数

参数默认值参数表明
--check-slave-lag指定主从复制延迟大于选项'--max-lag'指定的值之后暂停归档利用。默认情况下,工具会检查全部的从库,但该选项只作用于指定的从库(通过DSN连接方式)。
--check-interval1s假如同时指定了选项'--check-slave-lag',则该选项指定的时间为工具发现主从复制延迟时暂停的时间。每举行利用100行时举行一次检查。
--[no]check-charsetyes指定检查确保数据库连接时字符集和表字符集相同。
--commit-each指定按每次获取和归档的行数举行提交,该选项会禁用选项'--txn-size'。 在每次获取表数据并举行归档之后,在获取下一次数据和选项'--sleep'指定的休眠时间之前,举行事务提交和刷新选项'--file'指定的文件,通过选项'--limit'控制事务的大小。
--host,-h指定连接的数据库IP地址。
--port,-P指定连接的数据库Port端口。
--user,-u指定连接的数据库用户。
--password,-p指定连接的数据库用户密码。
--socket,-S指定使用SOCKET文件连接。
--databases,-d指定连接的数据库
--source指定需要举行归档利用的表,该选项是必须指定的选项,使用DSN方式表示。
--dest指定要归档到的目的端表,使用DSN方式表示。 假如该选项没有指定的话,则默认与选项'--source'指定源端表为相同表。
--where指定通过WHERE条件语句指定需要归档的数据,该选项是必须指定的选项。不需要加上'WHERE'关键字,假如确实不需要WHERE条件举行限制,则指定'--where 1=1'。
--file指定表数据需要归档到的文件。使用类似MySQL DATE_FORMAT()格式化命名方式。 文件内容与MySQL中SELECT INTO OUTFILE语句使用相同的格式,文件命名选项如下所示: ' %Y:年,4位数(Year, numeric, four digits) %m:月,2位数(Month, numeric (01..12)) %d:日,2位数(Day of the month, numeric (01..31)) %H:小时(Hour (00..23)) %i:分钟(Minutes, numeric (00..59)) %s:秒(Seconds (00..59)) %D:数据库名(Database name) %t:表名(Table name) 例如:--file '/var/log/archive/%Y-%m-%d-%D.%t' '
--output-format指定选项'--file'文件内容输出的格式。 默认不指定该选项是以制表符举行字段的分隔符,假如指定该选项,则使用','(逗号)作为字段分隔符,使用'"'(双引号)将字段括起。用法示例:'--output-format=dump'。
--limit1指定每条语句获取表和归档表的行数。
--max-lag1s指定答应主从复制延迟时长的最大值,单位秒。假如在每次获取行数据之后主从延迟高出指定的值,则归档利用将暂停执行,暂停休眠时间为选项'--check-interval'指定的值。待休眠时间结束之后再次检查主从延迟时长,检查方法是通过从库查询的'Seconds_Behind_Master'值来确定。假如主从复制延迟一直大于该参数指定值大概从库制止复制,则利用将一直等候直到从库重新启动并且延迟小于该参数指定值.
--no-delete指定不删除已被归档的表数据。
--progress指定每多少行打印进度信息,打印当前时间,已用时间以及多少行举行归档。
--purge指定执行的清除利用而不是归档利用。答应忽略选项'--dest'和'--file'举行利用,假如只是清除利用可以结合选项'--primary-key-only'会更高效。
--replace指定写入选项'--dest'指定目的端表时改写INSERT语句为REPLACE语句。
--retries1指定归档利用碰到死锁或超时的重试次数。当重试次数高出该选项指定的值时,工具将报错退出。
--run-time指定工具归档利用在退出之前需要运行的时间。答应的时间后缀名为s=秒,m=分,h=小时,d=天,假如没指定,默以为s。
--[no]safe-auto-incrementyes指定不使用自增列(AUTO_INCREMENT)最大值对应的行举行归档。 该选项在举行归档清除时会额外添加一条WHERE子句以防止工具删除单列升序字段具有的具有AUTO_INCREMENT属性最大值的数据行,为了在数据库重启之后还能使用到AUTO_INCREMENT对应的值,但这会引起无法归档或清除字段对应最大值的行。
--sleep指定工具在通过SELECT语句获取归档数据需要休眠的时间,默认值是不举行休眠。在休眠之前事务并不会提交,并且选项'--file'指定的文件不会被刷新。假如指定选项'--commit-each',则在休眠之前会举行事务提交和文件刷新。
--statistics指定工具网络并打印利用的时间统计信息。
--txn-size1指定每个事务处置惩罚的行数。假如是0则禁用事务功能。
--why-quit指定工具打印当非因完成归档行数退出的原因。 在执行一个自动归档任务时该选项与选项'--run-time'一起使用非常方便,这样可以确定归档任务是否在指定的时间内完成。假如同时指定了选项'--statistics',则会打印全部退出的原因。
--skip-foreign-key-checks指定使用语句SET FOREIGN_KEY_CHECKS = 0禁用外键检查。

三,安装部署mysql_archiver常见错误

1. 执行python db_archive_exec.py 127.0.0.1 db123 报错
  1. Traceback (most recent call last):
  2.   File "/data/XXXXXX/mysql_archiver-master/db_archive_exec.py", line 7, in <module>
  3.     import db_conn
  4.   File "/data/XXXXXX/mysql_archiver-master/db_conn.py", line 4, in <module> import MySQLdb ImportError: No module named MySQLdb
复制代码
 办理方案:
Step 1 底子情况
  1. rm -rf /etc/yum.repos.d/epel*
  2. yum install postgresql-devel python3-devel postgresql-devel python-devel
复制代码
Step 2 安装setuptools
  1. (1)    下载    setuptools-39.2.0.zip
  2. (2)    解压安装 python setup.py build
  3.                 python setup.py install
复制代码
step 3 安装pip
  1. (1)    下载 pip-1.5.4.tar.gz
  2. (2)    安装 python setup.py install
复制代码
step 4 安装MySQL-python
  1. yum install MySQL-python
复制代码

2.报错缺少pt-archiver工具
  1. sh: pt-archiver: command not found
复制代码
办理方案
下载percona-toolkit-3.2.1_x86_64.tar.gz,安装,并且设置个软连接。
例如:
  1. ln -s /data/XXXXXXXX/percona-toolkit-3.2.1/bin/pt-archiver /usr/local/bin/pt-archiver
复制代码

3.缺少文档
  1. <type 'exceptions.Exception'>
  2. [Errno 2] No such file or directory: u'/software/python_script/db_archive_XXXXdb_XXXtablename.log'
复制代码
办理方案:
创建文档: /software/python_script

4.字符集问题
  1. Character set mismatch: --source DSN uses utf8, table uses utf8mb4.  You can disable this check by specifying --no-check-charset.
复制代码
办理方案:
修改db_archive_exec.py可执行文件,针对pt-archiver添加参数 --no-check-charset

 5.参数设置报错


  1. Usage: pt-archiver [OPTIONS] --source DSN --where WHERE
  2. Errors in command-line arguments:
  3.   * --txn-size and --commit-each are mutually exclusive.
  4. pt-archiver nibbles records from a MySQL table.  The --source and --dest
  5. arguments use DSN syntax; if COPY is yes, --dest defaults to the key's value
  6. from --source.  For more details, please use the --help option, or try 'perldoc
  7. /usr/local/bin/pt-archiver' for complete documentation.
复制代码


 办理方案:
pt-archiver去掉参数 --commit-each

 6.Cannot find encoding "utf8mb4"
报错信息:
  1. Cannot find encoding "utf8mb4" at /usr/local/bin/pt-archiver line 6711.
复制代码


 办理方案:
pt-archiver去掉参数 --bulk-insert --bulk-delete

 7.执行 db_archive_exec.py 报错
报错信息
  1. <type 'exceptions.Exception'>
  2. 'ascii' codec can't decode byte 0xe6 in position 0: ordinal not in range(128)
复制代码
办理方案
修改优化的代码部分,不要有中文,即剔除添加的中文,修改为英文即可。

8.执行db_archive_monitor.py报错
  1. Traceback (most recent call last):
  2.   File "/data/XXXXX/mysql_archiver-master/db_archive_monitor.py", line 5, in <module>
  3.     from db_log import db_archive_monitor_log
  4. ImportError: No module named db_log
复制代码
办理方案
转到目录db_log,执行如下脚本
  1. echo '' > __init__.py
复制代码
生成__init__.py文件
9. crontab执行python脚本问题
调试和手动执行,都没有问题,但是发现没有成功执行备份。
为了定位问题,我们可以将cron执行情况,打印初始处置惩罚。将crontab调解如下:
调解前:
  1. 34 11 * * * python /data/mysql_archiver/mysql_archiver-master/db_archive_exec.py 122.122.122.122 qqorderdb
复制代码
调解后
  1. 34 11 * * * python /data/mysql_archiver/mysql_archiver-master/db_archive_exec.py 122.122.122.122 qqorder >> /data/mysql_archiver/mysql_archiver-master/db_log/logs/crontab_archiver.log 2>&1
复制代码
查看执行log文件,报错信息如下:


  1. [laobao@qqorder]# tail -100 crontab_archiver.log
  2. sh: pt-archiver: command not found
  3. sh: pt-archiver: command not found
  4. sh: pt-archiver: command not found
  5. sh: pt-archiver: command not found
  6. sh: pt-archiver: command not found
复制代码


问题明白了,办理方案如下:
调解前:


调解后:




10.执行 db_archive_exec.py 大概碰到其他的报错
A.报错如下
  1. Can't locate Digest/MD5.pm in @INC (@INC contains: /usr/local/lib64/perl5 /usr/local/share/perl5 /usr/lib64/perl5/vendor_perl /usr/share/perl5/vendor_perl /usr/lib64/perl5 /usr/share/perl5 .) at /usr/local/bin/pt-archiver line 5410.
  2. BEGIN failed--compilation aborted at /usr/local/bin/pt-archiver line 5410.
复制代码
办理方案:
  1. yum -y install perl-Digest-MD5
复制代码
B.报错如下:
  1. Cannot connect to MySQL because the Perl DBI module is not installed or not found.  Run 'perl -MDBI' to see the directories that Perl searches for DBI.  If DBI is not installed, try:
  2.   Debian/Ubuntu  apt-get install libdbi-perl
  3.   RHEL/CentOS    yum install perl-DBI
  4.   OpenSolaris    pkg install pkg:/SUNWpmdbi
复制代码
办理方案:
  1. yum install perl-DBI
复制代码
C.报错如下:
  1. Cannot connect to MySQL because the Perl DBD::mysql module is not installed or not found.  Run 'perl -MDBD::mysql' to see the directories that Perl searches for DBD::mysql.  If DBD::mysql is not installed, try:
  2.   Debian/Ubuntu  apt-get install libdbd-mysql-perl
  3.   RHEL/CentOS    yum install perl-DBD-MySQL
  4.   OpenSolaris    pgk install pkg:/SUNWapu13dbd-mysql
复制代码
办理方案:
  1. yum install perl-DBD-MySQL
复制代码
四.功能优化 

为工具mysql_archiver添加只删除不备份的功能,即只删除源库上的数据,而不用迁移。
1.优化设置表db_archive_info的建表语句


  1. DROP TABLE IF EXISTS `db_archive_info`;
  2. CREATE TABLE `db_archive_info` (
  3.   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '表id',
  4.   `server_source` varchar(640) DEFAULT NULL COMMENT '源服务器',
  5.   `port_source` varchar(64) DEFAULT NULL COMMENT '源服务器端口',
  6.   `user_source` varchar(64) DEFAULT NULL COMMENT '源数据库用户',
  7.   `password_source` varchar(128) DEFAULT NULL COMMENT '源数据库密码',
  8.   `db_source` varchar(64) DEFAULT NULL COMMENT '源数据库schema',
  9.   `table_source` varchar(128) DEFAULT NULL COMMENT '源数据库表',
  10.   <strong>`server_dest` </strong><strong>varchar(640) DEFAULT '' COMMENT '目标服务器',
  11.   `port_dest` varchar(64) DEFAULT '' COMMENT '目标服务器端口',
  12.   `user_dest` varchar(64) DEFAULT '' COMMENT '目标数据库用户',
  13.   `password_dest` varchar(128) DEFAULT '' COMMENT '目标数据库密码',
  14.   `db_dest` varchar(64) DEFAULT '' COMMENT '目标数据库schema',
  15.   `table_dest` varchar(128) DEFAULT '' COMMENT '目标数据库表'</strong><strong>,</strong>
  16.   `archive_condition` varchar(1000) DEFAULT NULL COMMENT '数据归档条件',
  17.   `last_archive_date` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '最近归档时间',
  18.   `last_archive_qty` int(10) DEFAULT NULL COMMENT '最近归档数量',
  19.   `state` varchar(255) DEFAULT 'A' COMMENT '数据行状态',
  20.   `user_created` varchar(128) DEFAULT 'SYS' COMMENT '数据行创建人',
  21.   `datetime_created` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '数据行创建时间',
  22.   `user_modified` varchar(128) DEFAULT 'SYS' COMMENT '数据行修改人',
  23.   `datetime_modified` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据行修改时间',
  24.   PRIMARY KEY (`id`)
  25. ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8 COMMENT='数据库归档基础信息表';
复制代码



2.修改备份脚本



  1. #!/usr/bin/python
  2. # -*- coding: UTF-8 -*-
  3. import sys
  4. import os
  5. import time
  6. import db_conn
  7. # get db connection
  8. db = db_conn.db
  9. # use cursor
  10. cursor = db.cursor()
  11. # 获取命令行参数
  12. server_source = sys.argv[1]
  13. db_source = sys.argv[2]
  14. try:
  15.     # SQL 查询语句
  16.     sql = "select id, server_source, port_source, user_source, password_source, db_source, table_source," \
  17.           "server_dest, port_dest, user_dest, password_dest, db_dest, table_dest, archive_condition " \
  18.           "from db_archive_info " \
  19.           "where server_source = '%s' and db_source = '%s' " % (server_source, db_source)
  20.     # 执行SQL语句
  21.     cursor.execute(sql)
  22.     # 获取所有记录列表
  23.     results = cursor.fetchall()
  24.     for row in results:
  25.         id = row[0]
  26.         server_source = row[1]
  27.         port_source = row[2]
  28.         user_source = row[3]
  29.         password_source = row[4]
  30.         db_source = row[5]
  31.         table_source = row[6]
  32.         server_dest = row[7]
  33.         port_dest = row[8]
  34.         user_dest = row[9]
  35.         password_dest = row[10]
  36.         db_dest = row[11]
  37.         table_dest = row[12]
  38.         archive_condition = row[13]
  39.         # 归档开始时间
  40.         archive_starttime = time.strftime('%Y-%m-%d %H:%M:%S')
  41.         # 生成pt-archive命令
  42.         ## 兼容只删除不归档功能,
  43.         if server_dest != '' and port_dest != '' and port_dest != '':
  44.             archive_cmd = "/usr/local/bin/pt-archiver " \
  45.                           "--source h='%s',P='%s',u='%s',p='%s',D='%s',t='%s' " \
  46.                           "--dest h='%s',P='%s',u='%s',p='%s',D='%s',t='%s' " \
  47.                           "--no-check-charset --charset=utf8mb4 --where '%s' --progress 50000 --limit 10000 --txn-size 10000 " \
  48.                           "--statistics --purge " % \
  49.                           (server_source, port_source, user_source, password_source, db_source, table_source, \
  50.                            server_dest, port_dest, user_dest, password_dest, db_dest, table_dest, \
  51.                            archive_condition)
  52.         elif server_dest == '' and port_dest == '' and port_dest == '':
  53.              archive_cmd = "/usr/local/bin/pt-archiver " \
  54.                   "--source h='%s',P='%s',u='%s',p='%s',D='%s',t='%s' " \
  55.                   "--no-check-charset --charset=utf8mb4 --where '%s' --progress 50000 --limit 10000 --txn-size 10000 " \
  56.                   "--statistics --purge " % \
  57.                   (server_source, port_source, user_source, password_source, db_source, table_source,archive_condition)
  58.         # print archive_cmd
  59.         # make a copy of original stdout route
  60.         stdout_archive = sys.stdout
  61.         # define the log file that receives your log info
  62.         log_file = open("/software/python_script/db_archive_%s_%s.log" % (db_source, table_source), "w")
  63.         # redirect print output to log file
  64.         sys.stdout = log_file
  65.         # archive_cmd = os.popen(pt_archive)
  66.         with os.popen(archive_cmd) as c:
  67.             # with open("db_archive1.log", "r") as c:
  68.             archive_log = c.read()
  69.         print(archive_log)
  70.         # close log file
  71.         log_file.close()
  72.         # restore the output to initial pattern
  73.         sys.stdout = stdout_archive
  74.         # 定义归档相关变量
  75.         inserted_qty = 0
  76.         deleted_qty = 0
  77.         # 归档结束时间
  78.         archive_endtime = time.strftime('%Y-%m-%d %H:%M:%S')
  79.         with open("/software/python_script/db_archive_%s_%s.log" % (db_source, table_source), "r") as f:
  80.             for line in f:
  81.                 if 'INSERT' in line:
  82.                     i = line.index(" ")
  83.                     inserted_qty = line[i + 1:]
  84.                 elif 'DELETE' in line:
  85.                     i = line.index(" ")
  86.                     deleted_qty = line[i + 1:]
  87.         # 判断归档是否失败
  88.         if server_dest != '' and port_dest != '' and port_dest != '':
  89.             if inserted_qty == deleted_qty:
  90.                 archive_status = 'Y'
  91.                 archive_error = ''
  92.             else:
  93.                 archive_status = 'N'
  94.                 archive_error = 'inserted_qty and deleted_qty are not equal'
  95.         elif server_dest == '' and port_dest == '' and port_dest == '':
  96.              archive_status = 'Y'
  97.              archive_error = 'Delete Data OP'
  98.         # insert sql
  99.         sql_insert = "insert into db_archive_log(server_source, db_source, table_source, server_dest, " \
  100.                      "db_dest, table_dest, archive_qty, archive_cmd, archive_log, archive_start, archive_end, " \
  101.                      "archive_status, archive_error ) " \
  102.                      "values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % \
  103.                      (server_source, db_source, table_source, server_dest, \
  104.                       db_dest, table_dest, inserted_qty, db.escape_string(archive_cmd), archive_log, archive_starttime,
  105.                       archive_endtime, \
  106.                       archive_status, archive_error)
  107.         # exec sql
  108.         cursor.execute(sql_insert)
  109.         # exec commit
  110.         db.commit()
  111.         if archive_status == 'Y':
  112.             sql_update = "update db_archive_info " \
  113.                          "set datetime_modified = '%s', last_archive_date = '%s', last_archive_qty = %s " \
  114.                          "where id = %d" % \
  115.                          (archive_starttime, archive_endtime, inserted_qty, id)
  116.             cursor.execute(sql_update)
  117.             # exec commit
  118.             db.commit()
  119. except  Exception, e:
  120.         print str(Exception)
  121.         print str(e)
复制代码



3.阐明
修改db_archive_info的建表语句,将默认值null 调解为‘’,主要是我们通过 server_dest == '' and port_dest == '' and port_dest == '' 来判断是归档还是直接删除。
假如按照修改前的默认值NULL,这几列值不维护,则报错。
  1. DBI connect('None;host=None;port=None;charset=utf8mb4;mysql_read_default_group=client','None',...) failed: Unknown MySQL server host 'None' (2) at /usr/local/bin/pt-archiver line 2525.
复制代码
五.功能持续优化(异常)

5.1 代码优化

运行过程中,我们发现步伐不能捕获一些异常,例如当源表和目的表的表结构不一致时,步伐没有报错,异常没有记载到log表中。
还有就是try..except的异常也没有记载下来,针对这些异常情况,我们将代码调解优化如下:


  1. #!/usr/bin/python
  2. # -*- coding: UTF-8 -*-
  3. import sys
  4. import os
  5. import time
  6. import db_conn
  7. ### add 部署pt执行过程中的问题
  8. import subprocess  ###os.popen不能返回error,故需要使用这个方法
  9. #### 以下两行代码是为了解决错误:<type 'exceptions.Exception'>  'ascii' codec can't decode byte 0xe6 in position 0: ordinal not in range(128)
  10. reload(sys)
  11. sys.setdefaultencoding('utf8')
  12. ###
  13. # get db connection
  14. db = db_conn.db
  15. # use cursor
  16. cursor = db.cursor()
  17. # 获取命令行参数
  18. server_source = sys.argv[1]
  19. db_source = sys.argv[2]
  20. try:
  21.     # SQL 查询语句
  22.     sql = "select id, server_source, port_source, user_source, password_source, db_source, table_source," \
  23.           "server_dest, port_dest, user_dest, password_dest, db_dest, table_dest, archive_condition " \
  24.           "from db_archive_info " \
  25.           "where server_source = '%s' and db_source = '%s' " % (server_source, db_source)
  26.     # 执行SQL语句
  27.     cursor.execute(sql)
  28.     # 获取所有记录列表
  29.     results = cursor.fetchall()
  30.     for row in results:
  31.         id = row[0]
  32.         server_source = row[1]
  33.         port_source = row[2]
  34.         user_source = row[3]
  35.         password_source = row[4]
  36.         db_source = row[5]
  37.         table_source = row[6]
  38.         server_dest = row[7]
  39.         port_dest = row[8]
  40.         user_dest = row[9]
  41.         password_dest = row[10]
  42.         db_dest = row[11]
  43.         table_dest = row[12]
  44.         archive_condition = row[13]
  45.         # 归档开始时间
  46.         archive_starttime = time.strftime('%Y-%m-%d %H:%M:%S')
  47.         # 生成pt-archive命令
  48.         ## 兼容只删除不归档功能,
  49.         if server_dest != '' and port_dest != '' and port_dest != '':
  50.             archive_cmd = "/usr/local/bin/pt-archiver " \
  51.                           "--source h='%s',P='%s',u='%s',p='%s',D='%s',t='%s' " \
  52.                           "--dest h='%s',P='%s',u='%s',p='%s',D='%s',t='%s' " \
  53.                           "--no-check-charset --charset=utf8mb4 --where '%s' --progress 50000 --limit 10000 --txn-size 10000 " \
  54.                           "--statistics --purge " % \
  55.                           (server_source, port_source, user_source, password_source, db_source, table_source, \
  56.                            server_dest, port_dest, user_dest, password_dest, db_dest, table_dest, \
  57.                            archive_condition)
  58.         elif server_dest == '' and port_dest == '' and port_dest == '':
  59.              archive_cmd = "/usr/local/bin/pt-archiver " \
  60.                   "--source h='%s',P='%s',u='%s',p='%s',D='%s',t='%s' " \
  61.                   "--no-check-charset --charset=utf8mb4 --where '%s' --progress 50000 --limit 10000 --txn-size 10000 " \
  62.                   "--statistics --purge " % \
  63.                   (server_source, port_source, user_source, password_source, db_source, table_source,archive_condition)
  64.         # print archive_cmd
  65.         # make a copy of original stdout route
  66.         stdout_archive = sys.stdout
  67.         # define the log file that receives your log info
  68.         log_file = open("/software/python_script/db_archive_%s_%s.log" % (db_source, table_source), "w")
  69.         # redirect print output to log file
  70.         sys.stdout = log_file
  71.         #### os.popen 不能访问error,故用subprocess.Popen来替换。修改执行shell命令的方法。
  72.         #修改前
  73.         ##with os.popen(archive_cmd) as c:
  74.         ##    archive_log = c.read()
  75.         ## print(archive_log)
  76.         #修改后
  77.         sp = subprocess.Popen(archive_cmd,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  78.         sp_out, sp_err = sp.communicate()
  79.         archive_log = sp_out + sp_err
  80.         print(archive_log)
  81.         ##### end
  82.         # close log file
  83.         log_file.close()
  84.         # restore the output to initial pattern
  85.         sys.stdout = stdout_archive
  86.         ### start 如果出现出现源表和目标表结构不一致的情况,存在数据库中
  87.         if 'The following columns exist in' in archive_log and 'but not' in archive_log :
  88.           archive_status = 'N'
  89.           archive_error = '源表和目标表数据结构不一致'
  90.           archive_starttime = time.strftime('%Y-%m-%d %H:%M:%S')
  91.           archive_endtime = time.strftime('%Y-%m-%d %H:%M:%S')
  92.           sql_insert = "insert into db_archive_log(server_source, db_source, table_source, server_dest, " \
  93.                      "db_dest, table_dest,archive_cmd, archive_log, archive_start, archive_end, " \
  94.                      "archive_status, archive_error ) " \
  95.                      "values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % \
  96.                      (server_source, db_source, table_source, server_dest, \
  97.                       db_dest, table_dest, cursor._connection.converter.escape(archive_cmd), archive_log, archive_starttime,
  98.                       archive_endtime, \
  99.                       archive_status, archive_error)
  100.           # exec sql
  101.           cursor.execute(sql_insert)
  102.           # exec commit
  103.           db.commit()
  104.           continue  ###跳出本循环,开始下一个表 ####其实认为这个continue 可有可无,因为不是不错。
  105.         ######  end  异常插入表结束
  106.         # 定义归档相关变量
  107.         inserted_qty = 0
  108.         deleted_qty = 0
  109.         # 归档结束时间
  110.         archive_endtime = time.strftime('%Y-%m-%d %H:%M:%S')
  111.         with open("/software/python_script/db_archive_%s_%s.log" % (db_source, table_source), "r") as f:
  112.             for line in f:
  113.                 if 'INSERT' in line:
  114.                     i = line.index(" ")
  115.                     inserted_qty = line[i + 1:]
  116.                 elif 'DELETE' in line:
  117.                     i = line.index(" ")
  118.                     deleted_qty = line[i + 1:]
  119.         # 判断归档是否失败
  120.         if server_dest != '' and port_dest != '' and port_dest != '':
  121.             if inserted_qty == deleted_qty:
  122.                 archive_status = 'Y'
  123.                 archive_error = ''
  124.             else:
  125.                 archive_status = 'N'
  126.                 archive_error = 'inserted_qty and deleted_qty are not equal'
  127.         elif server_dest == '' and port_dest == '' and port_dest == '':
  128.              archive_status = 'Y'
  129.              ###移除(不归档只删除,没有inserted_qty,,只有deleted_qty,因此赋值)
  130.              inserted_qty = deleted_qty
  131.              archive_error = 'Delete Data OP'
  132.         # insert sql
  133.         sql_insert = "insert into db_archive_log(server_source, db_source, table_source, server_dest, " \
  134.                      "db_dest, table_dest, archive_qty, archive_cmd, archive_log, archive_start, archive_end, " \
  135.                      "archive_status, archive_error ) " \
  136.                      "values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % \
  137.                      (server_source, db_source, table_source, server_dest, \
  138.                       db_dest, table_dest, inserted_qty, cursor._connection.converter.escape(archive_cmd), archive_log, archive_starttime,
  139.                       archive_endtime, \
  140.                       archive_status, archive_error)
  141.         # exec sql
  142.         cursor.execute(sql_insert)
  143.         # exec commit
  144.         db.commit()
  145.         if archive_status == 'Y':
  146.             sql_update = "update db_archive_info " \
  147.                          "set datetime_modified = '%s', last_archive_date = '%s', last_archive_qty = %s " \
  148.                          "where id = %d" % \
  149.                          (archive_starttime, archive_endtime, inserted_qty, id)
  150.             cursor.execute(sql_update)
  151.             # exec commit
  152.             db.commit()
  153. except  Exception, e:
  154.     ###修改前,print() 一定要放到最后,否则print()后面的代码都不执行了
  155.     #print str(Exception)
  156.     #print str(e)
  157.     #### start 部分错误信息没有记录在log文件中,try...catch 捕获到的错误在此记录,例如:ValueError: I/O operation on closed file。
  158.     archive_status = 'N'
  159.     archive_error = str(e)
  160.     ##print(archive_error)
  161.     archive_starttime = time.strftime('%Y-%m-%d %H:%M:%S')
  162.     archive_endtime = time.strftime('%Y-%m-%d %H:%M:%S')
  163.     sql_insert = "insert into db_archive_log(server_source, db_source,archive_start, archive_end,archive_status, archive_error ) " \
  164.                   "values('%s','%s','%s','%s','%s','%s')" % \
  165.                   (server_source, db_source, archive_starttime,archive_endtime,archive_status, archive_error)
  166.     cursor.execute(sql_insert)
  167.     # exec commit
  168.     db.commit()
  169.     print str(Exception)
  170.     print str(e)
  171.     #### end
复制代码


5.2 增补阐明

阐明1.
popen 也时引用了subprocess方法,但是 驱动shell命令,没有接受error,只有stdout,我们查看源码。如下:


  1. def popen(cmd, mode="r", buffering=-1):
  2.     if not isinstance(cmd, str):
  3.         raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
  4.     if mode not in ("r", "w"):
  5.         raise ValueError("invalid mode %r" % mode)
  6.     if buffering == 0 or buffering is None:
  7.         raise ValueError("popen() does not support unbuffered streams")
  8.     import subprocess, io
  9.     if mode == "r":
  10.         proc = subprocess.Popen(cmd,
  11.                                 shell=True,
  12.                                 stdout=subprocess.PIPE,
  13.                                 bufsize=buffering)
  14.         return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
复制代码


阐明2.
假如源表和目的表,表结构不一致,pt-archiver 报错信息大概不完全相同。
例如:
  1. The following columns exist in --dest but not --source:XXXXXXXXXXXXXXXX
复制代码
  1. The following columns exist in --source but not --dest:XXXXXXXXXXXXXXXX
复制代码
同样的问题,大概报错不一样。
原因是大概是以源表区比力目的表,也大概是以目的表去反向比力源表。
六.主要参考资料

1.MySQL数据归档实战-Python和pt-archiver的强强结合
https://blog.csdn.net/n88lpo/article/details/78905528?utm_medium=distribute.pc_relevant.none-task-blog-baidulandingword-14&spm=1001.2101.3001.4242
2.MySQL数据归档小工具
https://github.com/dbarun/mysql_archiver#readme
3.在linux下安装MySQLdb及基本利用
https://www.cnblogs.com/blogsme/archive/2012/12/12/2814588.html
4.pt工具之pt-archiver
https://www.cnblogs.com/yhq1314/p/10601801.html
5.linux下pip的安装步骤及使用详解
https://www.jb51.net/article/118035.htm
6.Percona-Toolkit 之 pt-archiver 总结
https://www.cnblogs.com/dbabd/p/10721857.html

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

星球的眼睛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表