为方便报表应用利用数据,需将ads各指标的统计结果导出到MySQL数据库中。
datax支持hive同步MySQL:仅仅支持hive存储的hdfs文件导出。以是reader选hdfs-reader,writer选mysql-writer。
null值 在hive和mysql里的存储格式不一样,必要告诉DataX应该如何转换。
MySQL建库建表
12.1.1 创建数据库
- CREATE DATABASE IF NOT EXISTS gmall_report DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
复制代码 建mysql表的,
1字段个数要和hive中的ads层数据保持一致,
2字段类型要和hive对等替换,
3字段次序也要一致
每张表要有主键
1)各活动补贴率
dt activity_id activity_name 三个主键联合而成
- DROP TABLE IF EXISTS `ads_activity_stats`;
- CREATE TABLE `ads_activity_stats` (
- `dt` date NOT NULL COMMENT '统计日期',
- `activity_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '活动ID',
- `activity_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动名称',
- `start_date` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动开始日期',
- `reduce_rate` decimal(16, 2) NULL DEFAULT NULL COMMENT '补贴率',
- PRIMARY KEY (`dt`, `activity_id`) USING BTREE
- ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '活动统计' ROW_FORMAT = Dynamic;
复制代码 数据导出
DataX配置文件天生脚本
方便起见,此处提供了DataX配置文件批量天生脚本,脚本内容及利用方式如下。
1)在~/bin目次下创建gen_export_config.py脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.py
脚本内容如下
- # coding=utf-8
- import json
- import getopt
- import os
- import sys
- import MySQLdb
- #MySQL相关配置,需根据实际情况作出修改
- mysql_host = "hadoop102"
- mysql_port = "3306"
- mysql_user = "root"
- mysql_passwd = "000000"
- #HDFS NameNode相关配置,需根据实际情况作出修改
- hdfs_nn_host = "hadoop102"
- hdfs_nn_port = "8020"
- #生成配置文件的目标路径,可根据实际情况作出修改
- output_path = "/opt/module/datax/job/export"
- def get_connection():
- return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)
- def get_mysql_meta(database, table):
- connection = get_connection()
- cursor = connection.cursor()
- sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
- cursor.execute(sql, [database, table])
- fetchall = cursor.fetchall()
- cursor.close()
- connection.close()
- return fetchall
- def get_mysql_columns(database, table):
- return map(lambda x: x[0], get_mysql_meta(database, table))
- def generate_json(target_database, target_table):
- job = {
- "job": {
- "setting": {
- "speed": {
- "channel": 3
- },
- "errorLimit": {
- "record": 0,
- "percentage": 0.02
- }
- },
- "content": [{
- "reader": {
- "name": "hdfsreader",
- "parameter": {
- "path": "${exportdir}",
- "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
- "column": ["*"],
- "fileType": "text",
- "encoding": "UTF-8",
- "fieldDelimiter": "\t",
- "nullFormat": "\\N"
- }
- },
- "writer": {
- "name": "mysqlwriter",
- "parameter": {
- "writeMode": "replace",
- "username": mysql_user,
- "password": mysql_passwd,
- "column": get_mysql_columns(target_database, target_table),
- "connection": [
- {
- "jdbcUrl":
- "jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8",
- "table": [target_table]
- }
- ]
- }
- }
- }]
- }
- }
- if not os.path.exists(output_path):
- os.makedirs(output_path)
- with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:
- json.dump(job, f)
- def main(args):
- target_database = ""
- target_table = ""
- options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl='])
- for opt_name, opt_value in options:
- if opt_name in ('-d', '--targetdb'):
- target_database = opt_value
- if opt_name in ('-t', '--targettbl'):
- target_table = opt_value
- generate_json(target_database, target_table)
- if __name__ == '__main__':
- main(sys.argv[1:])
复制代码 在~/bin目次下创建gen_export_config.sh脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.sh
脚本内容如下。
- #!/bin/bash
- python ~/bin/gen_export_config.py -d gmall_report -t ads_activity_stats
- python ~/bin/gen_export_config.py -d gmall_report -t ads_coupon_stats
- python ~/bin/gen_export_config.py -d gmall_report -t ads_new_buyer_stats
- python ~/bin/gen_export_config.py -d gmall_report -t ads_order_by_province
- python ~/bin/gen_export_config.py -d gmall_report -t ads_page_path
- python ~/bin/gen_export_config.py -d gmall_report -t ads_repeat_purchase_by_tm
- python ~/bin/gen_export_config.py -d gmall_report -t ads_sku_cart_num_top3_by_cate
- python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats
- python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_cate
- python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_tm
- python ~/bin/gen_export_config.py -d gmall_report -t ads_traffic_stats_by_channel
- python ~/bin/gen_export_config.py -d gmall_report -t ads_user_action
- python ~/bin/gen_export_config.py -d gmall_report -t ads_user_change
- python ~/bin/gen_export_config.py -d gmall_report -t ads_user_retention
- python ~/bin/gen_export_config.py -d gmall_report -t ads_user_stats
复制代码 3)为gen_export_config.sh脚本增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ~/bin/gen_export_config.sh
4)执行gen_export_config.sh脚本,天生配置文件
[atguigu@hadoop102 bin]$ gen_export_config.sh
5)观察天生的配置文件
- [atguigu@hadoop102 bin]$ ls /opt/module/datax/job/export/
复制代码 编写每日导出脚本
(1)在hadoop102的/home/atguigu/bin目次下创建hdfs_to_mysql.sh
[atguigu@hadoop102 bin]$ vim hdfs_to_mysql.sh
(2)编写如下内容
(3)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x hdfs_to_mysql.sh
(4)脚本用法
[atguigu@hadoop102 bin]$ hdfs_to_mysql.sh all
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |