离线数仓数据导出-hive数据同步到mysql

滴水恩情  金牌会员 | 2024-8-6 17:00:06 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 550|帖子 550|积分 1650

为方便报表应用利用数据,需将ads各指标的统计结果导出到MySQL数据库中。
datax支持hive同步MySQL:仅仅支持hive存储的hdfs文件导出。以是reader选hdfs-reader,writer选mysql-writer。

null值 在hive和mysql里的存储格式不一样,必要告诉DataX应该如何转换。

MySQL建库建表

12.1.1 创建数据库
  1. CREATE DATABASE IF NOT EXISTS gmall_report DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
复制代码
建mysql表的,
1字段个数要和hive中的ads层数据保持一致,
2字段类型要和hive对等替换,
3字段次序也要一致
每张表要有主键
1)各活动补贴率
dt activity_id activity_name 三个主键联合而成
  1. DROP TABLE IF EXISTS `ads_activity_stats`;
  2. CREATE TABLE `ads_activity_stats`  (
  3.   `dt` date NOT NULL COMMENT '统计日期',
  4.   `activity_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '活动ID',
  5.   `activity_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动名称',
  6.   `start_date` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动开始日期',
  7.   `reduce_rate` decimal(16, 2) NULL DEFAULT NULL COMMENT '补贴率',
  8.   PRIMARY KEY (`dt`, `activity_id`) USING BTREE
  9. ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '活动统计' ROW_FORMAT = Dynamic;
复制代码
数据导出

DataX配置文件天生脚本
方便起见,此处提供了DataX配置文件批量天生脚本,脚本内容及利用方式如下。
1)在~/bin目次下创建gen_export_config.py脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.py
脚本内容如下
  1. # coding=utf-8
  2. import json
  3. import getopt
  4. import os
  5. import sys
  6. import MySQLdb
  7. #MySQL相关配置,需根据实际情况作出修改
  8. mysql_host = "hadoop102"
  9. mysql_port = "3306"
  10. mysql_user = "root"
  11. mysql_passwd = "000000"
  12. #HDFS NameNode相关配置,需根据实际情况作出修改
  13. hdfs_nn_host = "hadoop102"
  14. hdfs_nn_port = "8020"
  15. #生成配置文件的目标路径,可根据实际情况作出修改
  16. output_path = "/opt/module/datax/job/export"
  17. def get_connection():
  18.     return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)
  19. def get_mysql_meta(database, table):
  20.     connection = get_connection()
  21.     cursor = connection.cursor()
  22.     sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
  23.     cursor.execute(sql, [database, table])
  24.     fetchall = cursor.fetchall()
  25.     cursor.close()
  26.     connection.close()
  27.     return fetchall
  28. def get_mysql_columns(database, table):
  29.     return map(lambda x: x[0], get_mysql_meta(database, table))
  30. def generate_json(target_database, target_table):
  31.     job = {
  32.         "job": {
  33.             "setting": {
  34.                 "speed": {
  35.                     "channel": 3
  36.                 },
  37.                 "errorLimit": {
  38.                     "record": 0,
  39.                     "percentage": 0.02
  40.                 }
  41.             },
  42.             "content": [{
  43.                 "reader": {
  44.                     "name": "hdfsreader",
  45.                     "parameter": {
  46.                         "path": "${exportdir}",
  47.                         "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
  48.                         "column": ["*"],
  49.                         "fileType": "text",
  50.                         "encoding": "UTF-8",
  51.                         "fieldDelimiter": "\t",
  52.                         "nullFormat": "\\N"
  53.                     }
  54.                 },
  55.                 "writer": {
  56.                     "name": "mysqlwriter",
  57.                     "parameter": {
  58.                         "writeMode": "replace",
  59.                         "username": mysql_user,
  60.                         "password": mysql_passwd,
  61.                         "column": get_mysql_columns(target_database, target_table),
  62.                         "connection": [
  63.                             {
  64.                                 "jdbcUrl":
  65.                                     "jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8",
  66.                                 "table": [target_table]
  67.                             }
  68.                         ]
  69.                     }
  70.                 }
  71.             }]
  72.         }
  73.     }
  74.     if not os.path.exists(output_path):
  75.         os.makedirs(output_path)
  76.     with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:
  77.         json.dump(job, f)
  78. def main(args):
  79.     target_database = ""
  80.     target_table = ""
  81.     options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl='])
  82.     for opt_name, opt_value in options:
  83.         if opt_name in ('-d', '--targetdb'):
  84.             target_database = opt_value
  85.         if opt_name in ('-t', '--targettbl'):
  86.             target_table = opt_value
  87.     generate_json(target_database, target_table)
  88. if __name__ == '__main__':
  89.     main(sys.argv[1:])
复制代码
在~/bin目次下创建gen_export_config.sh脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.sh
脚本内容如下。
  1. #!/bin/bash
  2. python ~/bin/gen_export_config.py -d gmall_report -t ads_activity_stats
  3. python ~/bin/gen_export_config.py -d gmall_report -t ads_coupon_stats
  4. python ~/bin/gen_export_config.py -d gmall_report -t ads_new_buyer_stats
  5. python ~/bin/gen_export_config.py -d gmall_report -t ads_order_by_province
  6. python ~/bin/gen_export_config.py -d gmall_report -t ads_page_path
  7. python ~/bin/gen_export_config.py -d gmall_report -t ads_repeat_purchase_by_tm
  8. python ~/bin/gen_export_config.py -d gmall_report -t ads_sku_cart_num_top3_by_cate
  9. python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats
  10. python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_cate
  11. python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_tm
  12. python ~/bin/gen_export_config.py -d gmall_report -t ads_traffic_stats_by_channel
  13. python ~/bin/gen_export_config.py -d gmall_report -t ads_user_action
  14. python ~/bin/gen_export_config.py -d gmall_report -t ads_user_change
  15. python ~/bin/gen_export_config.py -d gmall_report -t ads_user_retention
  16. python ~/bin/gen_export_config.py -d gmall_report -t ads_user_stats
复制代码
3)为gen_export_config.sh脚本增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ~/bin/gen_export_config.sh
4)执行gen_export_config.sh脚本,天生配置文件
[atguigu@hadoop102 bin]$ gen_export_config.sh
5)观察天生的配置文件
  1. [atguigu@hadoop102 bin]$ ls /opt/module/datax/job/export/
复制代码
编写每日导出脚本
(1)在hadoop102的/home/atguigu/bin目次下创建hdfs_to_mysql.sh
[atguigu@hadoop102 bin]$ vim hdfs_to_mysql.sh
(2)编写如下内容
  1. #! /bin/bash
  2. DATAX_HOME=/opt/module/datax
  3. #DataX导出路径不允许存在空文件,该函数作用为清理空文件
  4. handle_export_path(){
  5.   target_file=$1
  6.   for i in `hadoop fs -ls -R $target_file | awk '{print $8}'`; do
  7.     hadoop fs -test -z $i
  8.     if [[ $? -eq 0 ]]; then
  9.       echo "$i文件大小为0,正在删除"
  10.       hadoop fs -rm -r -f $i
  11.     fi
  12.   done
  13. }
  14. #数据导出
  15. export_data() {
  16.   datax_config=$1
  17.   export_dir=$2
  18.   hadoop fs -test -e $export_dir
  19.   if [[ $? -eq 0 ]]
  20.   then
  21.     handle_export_path $export_dir
  22.     file_count=$(hadoop fs -ls $export_dir | wc -l)
  23.     if [ $file_count -gt 0 ]
  24.     then
  25.       set -e;
  26.       $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config
  27.       set +e;
  28.     else
  29.       echo "$export_dir 目录为空,跳过~"
  30.     fi
  31.   else
  32.     echo "路径 $export_dir 不存在,跳过~"
  33.   fi
  34. }
  35. case $1 in
  36.   "ads_new_buyer_stats")
  37.     export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats
  38.   ;;
  39.   "ads_order_by_province")
  40.     export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province
  41.   ;;
  42.   "ads_page_path")
  43.     export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path
  44.   ;;
  45.   "ads_repeat_purchase_by_tm")
  46.     export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm
  47.   ;;
  48.   "ads_trade_stats")
  49.     export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats
  50.   ;;
  51.   "ads_trade_stats_by_cate")
  52.     export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate
  53.   ;;
  54.   "ads_trade_stats_by_tm")
  55.     export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm
  56.   ;;
  57.   "ads_traffic_stats_by_channel")
  58.     export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel
  59.   ;;
  60.   "ads_user_action")
  61.     export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action
  62.   ;;
  63.   "ads_user_change")
  64.     export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change
  65.   ;;
  66.   "ads_user_retention")
  67.     export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention
  68.   ;;
  69.   "ads_user_stats")
  70.     export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats
  71.   ;;
  72.   "ads_activity_stats")
  73.     export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats
  74.   ;;
  75.   "ads_coupon_stats")
  76.     export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats
  77.   ;;
  78.   "ads_sku_cart_num_top3_by_cate")
  79.     export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate
  80.   ;;
  81. "all")
  82.   export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats
  83.   export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province
  84.   export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path
  85.   export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm
  86.   export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats
  87.   export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate
  88.   export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm
  89.   export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel
  90.   export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action
  91.   export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change
  92.   export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention
  93.   export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats
  94.   export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats
  95.   export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats
  96.   export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate
  97.   ;;
  98. esac
复制代码
(3)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x hdfs_to_mysql.sh
(4)脚本用法
[atguigu@hadoop102 bin]$ hdfs_to_mysql.sh all

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

滴水恩情

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表