hedfs和hive数据迁移后校验脚本

打印 上一主题 下一主题

主题 1032|帖子 1032|积分 3096

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
先谈论校验方法,本人腾讯云大数据工程师。
1、hdfs的校验

这个通常就是distcp校验,hdfs通过distcp迁移到另一个集群,怎么校验你的对不对。
有人会说,默认会有校验CRC校验。我们关闭了,为什么关闭?全量迁移,如果当前表再写数据,开自动校验就会失败。数据量大(PB级)迁移流程是先迁移全量,后面在定时补近来几天增量,再找个时间点,进行业务割接

那么怎么知道你迁移的hdfs是否有题目呢?
2个文件,一个是脚本,一个是必要校验的目录
data_checksum.py

  1. # -*- coding: utf-8 -*-
  2. # @Time    : 2025/1/16 22:52
  3. # @Author  : fly-wlx
  4. # @Email   : xxx@163.com
  5. # @File    : data_compare.py
  6. # @Software: PyCharm
  7. import subprocess
  8. #output_file = 'data_checksum_result.txt'
  9. def load_file_paths_from_conf(conf_file):
  10.     file_list = []
  11.     with open(conf_file, 'r') as file:
  12.         lines = file.readlines()
  13.         for line in lines:
  14.             path = line.strip()
  15.             if path and not path.startswith('#'):  # 跳过空行和注释
  16.                 full_path = f"{path}"
  17.                 file_list.append(full_path)
  18.     return file_list
  19. #def write_sizes_to_file(filepath,source_namenode,source_checksum,target_namenode,target_checksum,status, output_file):
  20. #    with open(output_file, 'w') as file:
  21. #file.write(f"{source_namenode}/{filepath},{source_checksum},{target_namenode}/{filepath},{target_checksum},{status}\n")
  22. def write_sizes_to_file(source_path, src_info, destination_path, target_info, status,output_file):
  23.     with open(output_file, 'a') as file:
  24.          file.write(f"{source_path},{src_info},{destination_path}, {target_info}, {status}\n")
  25. def run_hadoop_command(command):
  26.     """运行 Hadoop 命令并返回输出"""
  27.     try:
  28.         result = subprocess.check_output(command, shell=True, text=True)
  29.         return result.strip()
  30.     except subprocess.CalledProcessError as e:
  31.         print(f"Command failed: {e}")
  32.         return None
  33. def get_hdfs_count(hdfs_filepath):
  34.     """获取 HDFS 路径的文件和目录统计信息"""
  35.     command = f"hadoop fs -count {hdfs_filepath}"
  36.     output = run_hadoop_command(command)
  37.     if output:
  38.         parts = output.split()
  39.         if len(parts) >= 3:
  40.             dir_count, file_count, content_size = parts[-3:]
  41.             return dir_count, file_count, content_size
  42.     return None, None, None
  43. def get_hdfs_size(hdfs_filepath):
  44.     """获取 HDFS 路径的总文件大小"""
  45.     command = f"hadoop fs -du -s {hdfs_filepath}"
  46.     output = run_hadoop_command(command)
  47.     if output:
  48.         parts = output.split()
  49.         if len(parts) >= 1:
  50.             return parts[0]
  51.     return None
  52. def validate_hdfs_data(source_namenode, target_namenode,filepath):
  53.     output_file = 'data_checksum_result.txt'
  54.     source_path=f"{source_namenode}/{filepath}"
  55.     destination_path = f"{target_namenode}/{filepath}"
  56.     """校验 HDFS 源路径和目标路径的数据一致性"""
  57.     print("Fetching source path statistics...")
  58.     src_dir_count, src_file_count, src_content_size = get_hdfs_count(source_path)
  59.     src_total_size = get_hdfs_size(source_path)
  60.     print("Fetching destination path statistics...")
  61.     dest_dir_count, dest_file_count, dest_content_size = get_hdfs_count(destination_path)
  62.     dest_total_size = get_hdfs_size(destination_path)
  63.     src_info={}
  64.     src_info["src_dir_count"] = src_dir_count
  65.     src_info["src_file_count"] = src_file_count
  66.     #src_info["src_content_size"] = src_content_size
  67.     src_info["src_total_size"] = src_total_size
  68.     target_info = {}
  69.     target_info["src_dir_count"] = dest_dir_count
  70.     target_info["src_file_count"] = dest_file_count
  71.     #target_info["src_content_size"] = dest_content_size
  72.     target_info["src_total_size"] = dest_total_size
  73.     print("\nValidation Results:")
  74.     if (src_dir_count == dest_dir_count and
  75.         src_file_count == dest_file_count and
  76.        # src_content_size == dest_content_size and
  77.         src_total_size == dest_total_size):
  78.         print("✅ Source and destination paths are consistent!")
  79.         write_sizes_to_file(source_path, src_info, destination_path,target_info, 0,
  80.                             output_file)
  81.     else:
  82.         print("❌ Source and destination paths are inconsistent!")
  83.         write_sizes_to_file(source_path, src_info, destination_path, target_info, 1,
  84.                             output_file)
  85.         #print(f"Source: DIR_COUNT={src_dir_count}, FILE_COUNT={src_file_count}, CONTENT_SIZE={src_content_size}, TOTAL_SIZE={src_total_size}")
  86.         #print(f"Destination: DIR_COUNT={dest_dir_count}, FILE_COUNT={dest_file_count}, CONTENT_SIZE={dest_content_size}, TOTAL_SIZE={dest_total_size}")
  87. # 设置源路径和目标路径
  88. #source_path = "hdfs://namenode1:8020/"
  89. #destination_path = "hdfs://namenode2:8020/path/to/destination"
  90. # 定义源和目标集群的 namenode 地址
  91. source_namenode = "hdfs://10.xx.xx.6:8020"
  92. target_namenode= "hdfs://10.xx.xx.106:4007"
  93. def main():
  94.     # 配置文件路径和输出文件路径
  95.     conf_file = 'distcp_paths.conf'
  96.     # 定义源和目标集群的 namenode 地址
  97.     # 设置源路径和目标路径
  98.     #source_namenode = "hdfs://source-namenode:8020"
  99.     #target_namenode = "hdfs://target-namenode:8020"
  100.     # 文件列表
  101.     file_paths = load_file_paths_from_conf(conf_file)
  102.     # 对每个目录进行校验
  103.     for filepath in file_paths:
  104.         validate_hdfs_data(source_namenode, target_namenode, filepath)
  105.    
  106. if __name__ == "__main__":
  107.     main()
  108. # 执行校验
  109. #validate_hdfs_data(source_path, destination_path)
复制代码
distcp_paths.conf

  1. /apps/hive/warehouse/xx.db/dws_ixx_features
  2. /apps/hive/warehouse/xx.db/dwd_xx_df
复制代码
用法

直接python3 data_checksum.py(必要改为自己的)
他会实时打印对比结果,并且将结果生成到一个文件中(data_checksum_result.txt)


2、hive文件内容比对

终极客户要的是任务的数据对得上,而不是管你迁移怎么样,所以验证任务的方式:双方同时跑同多个Hive任务流的任务,检察表数据内容是否同等。(因为跑出来的hdfs的文件大小由于mapreduce原因,肯定是不同等的,校验实际数据同等就行了)
方法是先对比表字段,然后对比count数,然后将每行拼起来对比md5
涉及3个文件,单检测脚本,批量入口脚本,必要批量检测的表文件

check_script.sh

  1. #!/bin/bash
  2. #owner:clark.shi
  3. #date:2025/1/22
  4. #背景:用于hive从源端任务和目标端任务,两边跑完结果表的内容校验(因为mapreduce和小文件不同,所以要用数据内容校验)
  5. #     --用trino(presto)会更好,因为可以跨集群使用,目前客户因为资源情况没装,此为使用hive引擎,将数据放到本地进行比对
  6. #输入:源端表,目标表,分区名,分区值
  7. #$0是脚本本身,最低从1开始
  8. #限制脚本运行内存大小,30gb
  9. #ulimit -v 30485760
  10. #---注意,要保证,2个表的字段顺序是一样的(md5是根据顺序拼接的)
  11. echo "================"
  12. echo "注意"
  13. echo "要保证,2个表的字段顺序是一样的(md5是根据顺序拼接的)"
  14. echo "要保证,这2个表是存在的"
  15. echo "要保证,双端是可以互相访问"
  16. echo "要保证,2个hive集群的MD5算法相同"
  17. echo "禁止表,一个分区数据量超过本地磁盘,此脚本会写入本地磁盘(双端数据),对比后删除"
  18. echo "注意,如果分区字段是数字不用加引号,如果是字符串需要加引号,搜partition_value,这里分区是int如20250122是没有引号"
  19. echo "================"
  20. a_table=$1
  21. b_table=$2
  22. partition_column=$3
  23. partition_value=$4
  24. if [ $# -ne 4 ]; then
  25.     echo "错误:必须输入 4 个参数,源端表,目标表,分区名,分区值"
  26.     exit 1
  27. fi
  28. #------------函数
  29. check_value() {
  30.     # 第一个参数是布尔值,第二个参数是要 echo 的内容
  31.     local value=$1
  32.     local message=$2
  33.    
  34.     # 检查第一个参数的值
  35.     if [ "$value" == "false" ]; then
  36.         echo "校验失败:$message" >> rs.txt
  37.         exit
  38.     fi
  39. }
  40. #-----------函数结束
  41. echo "需要对比表的数据内容是$a_table和$b_table--,需要对比分区$partition_column是$partition_value--"
  42. sleep 2
  43. echo "===============开始校验============="
  44. #todo改成自己的,kerbers互信认证(也可以用ldap)
  45. `kinit -kt /root/s_xx_tbds.keytab s_xx_tbds@TBDS-V12X10CS`
  46. #校验字段类型
  47. echo "1.开始校验字段类型"
  48.        
  49. #todo这里要改成自己的
  50.   beeline -u "jdbc:hive2://10.xx.xx.4:10001/XXdatabase;principal=hive/tbds-10-xx-xx-4.hadooppdt.xxjin.srv@TBDS-V12X10CS;transportMode=http;httpPath=cliservice" -e "DESCRIBE $b_table" > 1_a_column.txt
  51.   beeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" -e "DESCRIBE $a_table" > 1_b_column.txt
  52.   if diff 1_a_column.txt 1_b_column.txt > /dev/null; then
  53.     echo "表结构一致"
  54.   else
  55.     echo "表结构不一致"
  56.     check_value false "$a_table和$b_table字段类型不一致"
  57.   fi
  58. echo "------------1.表字段,校验完毕,通过-------------"
  59. #校验count数
  60. echo "2.开始count校验"
  61.   beeline -u "jdbc:hive2://10.xx.xx.4:10001/XXdatabase;principal=hive/tbds-10-xx-xx-4.hadooppdt.xxjin.srv@TBDS-V12X10CS;transportMode=http;httpPath=cliservice" -e "select count(*) from $b_table where $partition_column=$partition_value" > 2_a_count.txt
  62.     beeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" -e "select count(*) from $a_table where $partition_column=$partition_value" > 2_b_count.txt
  63.   if diff 2_a_count.txt 2_b_count.txt > /dev/null; then
  64.     echo "数据行一致"
  65.   else
  66.     echo "数据行不一致"
  67.     check_value false "$a_table和$b_table的数据行不一致"
  68.   fi
  69. echo "------------2.数据行,校验完毕,通过-------------"
  70. #拼接每一行的值,作为唯一值,创建2个临时表
  71. echo "3.生成每条数据唯一标识"
  72.   #1.获取表列名
  73.   #使用awk,去除第一行字段名,,删除#字号以及他后面的内容(一般是分区的描述),根据分隔符|取第一列数据,去掉空的行
  74.   beeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" --outputformat=dsv -e "DESCRIBE $a_table" |awk 'NR > 1' |awk '!/^#/ {print} /^#/ {exit}'|awk 'BEGIN {FS="|"} {print $1}'|awk 'NF > 0' > 3_table_field_name.txt
  75.   #2.拼接表列名,生成md5的表 (第一步已经检测过双方的表结构了,这里用同一个拼接字段即可)
  76.   # 使用 while 循环逐行读取文件内容
  77.   name_fields=""
  78.   while IFS= read -r line; do
  79.     if [ -z "$name_fields" ]; then
  80.       name_fields="$line"
  81.     else
  82.       name_fields="$name_fields,$line"
  83.     fi
  84.   done < "3_table_field_name.txt"
  85.   echo "$name_fields"
  86.   #将每行数据进行拼接,并且生成含一个字段的md5表
  87.   md5_sql="SELECT distinct(MD5(CONCAT($name_fields))) AS md5_value "
  88.   a_md5_sql="$md5_sql from (select * from dim_user_profile_df where $partition_column=$partition_value  limit 100)a;"
  89.   b_md5_sql="$md5_sql from $a_table where $partition_column=$partition_value;"
  90.   echo "a表的sql是:$a_md5_sql"
  91.   echo "b表的sql是:$b_md5_sql"
  92.   #源端是生产环境,这里做了特殊处理,源端就取100条(没使用order by rand(),客户主要是检测函数,order by 会占用他们集群资源)
  93.   beeline -u "jdbc:hive2://10.xx.xx.4:10001/XXdatabase;principal=hive/tbds-10-xx-xx-4.hadooppdt.xxjin.srv@TBDS-V12X10CS;transportMode=http;httpPath=cliservice" --outputformat=dsv -e "$a_md5_sql" > 4_a_md5_data.txt
  94.   beeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" --outputformat=dsv -e "$b_md5_sql" > 4_b_md5_data.txt
  95.   #3.(由于不是同集群,需要下载到本地,再进行导入--如果耗费资源时长太长,再导入到hive,否则直接shell脚本搞定)
  96.   # 设置large_file和small_file的路径
  97.   large_file="4_b_md5_data.txt"
  98.   small_file="4_a_md5_data.txt"
  99.   # 遍历small_file中的每一行
  100.   while IFS= read -r line; do
  101.       # 检查line是否存在于large_file中
  102.       if grep -qxF "$line" "$large_file"; then
  103.           # 如果line存在于large_file中,输出1
  104.           #echo "1"
  105.           a=1
  106.       else
  107.           # 如果line不存在于large_file中,输出2
  108.           echo "2"
  109.           check_value false "$a_table和$b_table抽样存在数据内容不一致"
  110.       fi
  111.   done < "$small_file"
  112.   echo echo "------------3.数据内容,校验完毕,通过-------------"
  113. #抽样核对md5(取数据时已抽样,否则数据太大容易跑挂生产环境)
复制代码

input_file.txt必要校验的表文件

源端表名,目标端表名,分区字段(写1级分区就可以),分区值
   ods_xxnfo_di ods_xxnfo_dii dt 20250106
  ods_asxx_log_di ods_asxx_log_dii dt 20250106
  ods_xxog_di ods_xxog_di dt 20250106
  dwd_xxx dwd_xxx dt 20250106
  run.sh

  1. #!/bin/bash
  2. # 设置文件路径
  3. input_file="input_file.txt"
  4. # 遍历文件中的每一行
  5. while IFS= read -r line; do
  6.     # 调用另一个脚本并传递当前行的参数
  7.     echo $line
  8.     ./check_script.sh $line
  9.     # 在每次执行完后间隔一小段时间,避免系统过载(可选)
  10.     sleep 1
  11. done < "$input_file"
复制代码

使用方法

sh run.sh(必要把check_scripe和run里的内容改成自己的哈)
他会把不通过的,生成一个rs.txt



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户云卷云舒

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表