滴水恩情 发表于 2024-8-6 17:00:06

离线数仓数据导出-hive数据同步到mysql

为方便报表应用利用数据,需将ads各指标的统计结果导出到MySQL数据库中。
datax支持hive同步MySQL:仅仅支持hive存储的hdfs文件导出。以是reader选hdfs-reader,writer选mysql-writer。
https://img-blog.csdnimg.cn/direct/f1b4a5245c0e482d97a5b5d461d74b21.png
null值 在hive和mysql里的存储格式不一样,必要告诉DataX应该如何转换。
https://img-blog.csdnimg.cn/direct/107a3d6874d441ee9995cfcd83cba91a.png
MySQL建库建表

12.1.1 创建数据库
CREATE DATABASE IF NOT EXISTS gmall_report DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
建mysql表的,
1字段个数要和hive中的ads层数据保持一致,
2字段类型要和hive对等替换,
3字段次序也要一致
每张表要有主键
1)各活动补贴率
dt activity_id activity_name 三个主键联合而成
DROP TABLE IF EXISTS `ads_activity_stats`;
CREATE TABLE `ads_activity_stats`(
`dt` date NOT NULL COMMENT '统计日期',
`activity_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '活动ID',
`activity_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动名称',
`start_date` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动开始日期',
`reduce_rate` decimal(16, 2) NULL DEFAULT NULL COMMENT '补贴率',
PRIMARY KEY (`dt`, `activity_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '活动统计' ROW_FORMAT = Dynamic;
数据导出

DataX配置文件天生脚本
方便起见,此处提供了DataX配置文件批量天生脚本,脚本内容及利用方式如下。
1)在~/bin目次下创建gen_export_config.py脚本
$ vim ~/bin/gen_export_config.py
脚本内容如下
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "000000"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/export"


def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)


def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, )
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_mysql_columns(database, table):
    return map(lambda x: x, get_mysql_meta(database, table))


def generate_json(target_database, target_table):
    job = {
      "job": {
            "setting": {
                "speed": {
                  "channel": 3
                },
                "errorLimit": {
                  "record": 0,
                  "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                  "name": "hdfsreader",
                  "parameter": {
                        "path": "${exportdir}",
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "column": ["*"],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "nullFormat": "\\N"
                  }
                },
                "writer": {
                  "name": "mysqlwriter",
                  "parameter": {
                        "writeMode": "replace",
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(target_database, target_table),
                        "connection": [
                            {
                              "jdbcUrl":
                                    "jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8",
                              "table":
                            }
                        ]
                  }
                }
            }]
      }
    }
    if not os.path.exists(output_path):
      os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join()), "w") as f:
      json.dump(job, f)


def main(args):
    target_database = ""
    target_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl='])
    for opt_name, opt_value in options:
      if opt_name in ('-d', '--targetdb'):
            target_database = opt_value
      if opt_name in ('-t', '--targettbl'):
            target_table = opt_value

    generate_json(target_database, target_table)

if __name__ == '__main__':
    main(sys.argv)
在~/bin目次下创建gen_export_config.sh脚本
$ vim ~/bin/gen_export_config.sh
脚本内容如下。
#!/bin/bash

python ~/bin/gen_export_config.py -d gmall_report -t ads_activity_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_coupon_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_new_buyer_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_order_by_province
python ~/bin/gen_export_config.py -d gmall_report -t ads_page_path
python ~/bin/gen_export_config.py -d gmall_report -t ads_repeat_purchase_by_tm
python ~/bin/gen_export_config.py -d gmall_report -t ads_sku_cart_num_top3_by_cate
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_cate
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_tm
python ~/bin/gen_export_config.py -d gmall_report -t ads_traffic_stats_by_channel
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_action
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_change
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_retention
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_stats
3)为gen_export_config.sh脚本增加执行权限
$ chmod +x ~/bin/gen_export_config.sh
4)执行gen_export_config.sh脚本,天生配置文件
$ gen_export_config.sh
5)观察天生的配置文件
$ ls /opt/module/datax/job/export/
编写每日导出脚本
(1)在hadoop102的/home/atguigu/bin目次下创建hdfs_to_mysql.sh
$ vim hdfs_to_mysql.sh
(2)编写如下内容

#! /bin/bash

DATAX_HOME=/opt/module/datax

#DataX导出路径不允许存在空文件,该函数作用为清理空文件
handle_export_path(){
target_file=$1
for i in `hadoop fs -ls -R $target_file | awk '{print $8}'`; do
    hadoop fs -test -z $i
    if [[ $? -eq 0 ]]; then
      echo "$i文件大小为0,正在删除"
      hadoop fs -rm -r -f $i
    fi
done

}


#数据导出
export_data() {
datax_config=$1
export_dir=$2
hadoop fs -test -e $export_dir
if [[ $? -eq 0 ]]
then
    handle_export_path $export_dir
    file_count=$(hadoop fs -ls $export_dir | wc -l)
    if [ $file_count -gt 0 ]
    then
      set -e;
      $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config
      set +e;
    else
      echo "$export_dir 目录为空,跳过~"
    fi
else
    echo "路径 $export_dir 不存在,跳过~"
fi
}


case $1 in
"ads_new_buyer_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats
;;
"ads_order_by_province")
    export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province
;;
"ads_page_path")
    export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path
;;
"ads_repeat_purchase_by_tm")
    export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm
;;
"ads_trade_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats
;;
"ads_trade_stats_by_cate")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate
;;
"ads_trade_stats_by_tm")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm
;;
"ads_traffic_stats_by_channel")
    export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel
;;
"ads_user_action")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action
;;
"ads_user_change")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change
;;
"ads_user_retention")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention
;;
"ads_user_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats
;;
"ads_activity_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats
;;
"ads_coupon_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats
;;
"ads_sku_cart_num_top3_by_cate")
    export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate
;;

"all")
export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats
export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province
export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path
export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm
export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats
export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate
export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm
export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel
export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action
export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change
export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention
export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats
export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats
export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats
export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate
;;
esac
(3)增加脚本执行权限
$ chmod +x hdfs_to_mysql.sh
(4)脚本用法
$ hdfs_to_mysql.sh all

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 离线数仓数据导出-hive数据同步到mysql