离线数仓建设之数据导出

打印 上一主题 下一主题

主题 845|帖子 845|积分 2535

为了方便报表应用利用数据,需将ADS各项指标统计结果导出到MySQL,方便熟悉 SQL 职员利用。
1 MySQL建库建表

1.1 创建数据库

创建car_data_report数据库:
  1. CREATE DATABASE IF NOT EXISTS car_data_report
  2. # 字符集
  3. DEFAULT CHARSET utf8mb4
  4. # 排序规则
  5. COLLATE utf8mb4_general_ci;
复制代码
1.1.2 创建表

① 里程相关统计

创建ads_mileage_stat_last_month表,存储里程相关统计数据。
  1. DROP TABLE IF EXISTS ads_mileage_stat_last_month;
  2. CREATE TABLE ads_mileage_stat_last_month (
  3.   vin VARCHAR(20) COMMENT '汽车唯一ID',
  4.   mon VARCHAR(7) COMMENT '统计月份',
  5.   avg_mileage INT COMMENT '日均里程',
  6.   avg_speed DECIMAL(16, 2) COMMENT '平均时速分子',
  7.   danger_count DECIMAL(16, 2) COMMENT '平均百公里急加减速次数'
  8. ) COMMENT '里程相关统计';
复制代码
② 告警相关统计

创建ads_alarm_stat_last_month表,存储告警相关的统计数据。
  1. DROP TABLE IF EXISTS ads_alarm_stat_last_month;
  2. CREATE TABLE ads_alarm_stat_last_month (
  3.   vin VARCHAR(20) COMMENT '汽车唯一ID',
  4.   mon VARCHAR(7) COMMENT '统计月份',
  5.   alarm_count INT COMMENT '告警次数',
  6.   l1_alarm_count INT COMMENT '一级告警次数',
  7.   l2_alarm_count INT COMMENT '二级告警次数',
  8.   l3_alarm_count INT COMMENT '三级告警次数'
  9. ) COMMENT '告警相关统计';
复制代码
3)温控相关统计
创建ads_temperature_stat_last_month表,存储温控相关的统计数据。
  1. DROP TABLE IF EXISTS ads_temperature_stat_last_month;
  2. CREATE TABLE ads_temperature_stat_last_month (
  3.   vin VARCHAR(20) COMMENT '汽车唯一ID',
  4.   mon VARCHAR(7) COMMENT '统计月份',
  5.   max_motor_temperature INT COMMENT '电机最高温度',
  6.   avg_motor_temperature DECIMAL(16, 2) COMMENT '电机平均温度',
  7.   max_motor_controller_temperature INT COMMENT '电机控制器最高温度',
  8.   avg_motor_controller_temperature DECIMAL(16, 2) COMMENT '电机控制器平均温度',
  9.   max_battery_temperature INT COMMENT '最高电池温度',
  10.   battery_temperature_abnormal_count INT COMMENT '电池温度异常值次数'
  11. ) COMMENT '温控相关统计';
复制代码
4)能耗相关统计
创建ads_consume_stat_last_month表,存储能耗相关的统计数据。
  1. DROP TABLE IF EXISTS ads_consume_stat_last_month;
  2. CREATE TABLE ads_consume_stat_last_month (
  3.   vin VARCHAR(20) COMMENT '汽车唯一ID',
  4.   mon VARCHAR(7) COMMENT '统计月份',
  5.   soc_per_charge DECIMAL(16, 2) COMMENT '次均充电电量',
  6.   duration_per_charge DECIMAL(16, 2) COMMENT '次均充电时长',
  7.   charge_count INT COMMENT '充电次数',
  8.   fast_charge_count INT COMMENT '快充次数',
  9.   slow_charge_count INT COMMENT '慢充次数',
  10.   fully_charge_count INT COMMENT '深度充电次数',
  11.   soc_per_100km DECIMAL(16, 2) COMMENT 'soc百公里平均消耗',
  12.   soc_per_run DECIMAL(16, 2) COMMENT '每次里程soc平均消耗',
  13.   soc_last_100km DECIMAL(16, 2) COMMENT '最近百公里soc消耗'
  14. ) COMMENT '能耗主题统计';
复制代码
2 数据导出

DataX作为数据导出工具,并选择HDFSReader和MySQLWriter作为数据源和目标。
2.1 编写DataX设置文件

我们必要为每个表编写一个DataX设置文件。以ads_alarm_stat_last_month为例:
  1. {
  2.   "job": {
  3.     "setting": {
  4.       "speed": {
  5.         "channel": 1  // DataX 作业的并发通道数,一般根据系统资源进行调整,1 表示单通道
  6.       }
  7.     },
  8.     "content": [
  9.       {
  10.         "reader": {
  11.           ...
  12.         },
  13.         "writer": {
  14.           "name": "mysqlwriter",  // 写入数据的插件类型为 MySQL 数据库写入
  15.           "parameter": {
  16.             "writeMode": "replace",  // 写入模式为替换(如果表存在则先删除再写入)
  17.             "username": "root",  // 数据库用户名
  18.             "password": "000000",  // 数据库密码
  19.             "column": [  // 写入的列信息,包括 vin、mon、alarm_count、l1_alarm_count、l2_alarm_count、l3_alarm_count
  20.               "vin",
  21.               "mon",
  22.               "alarm_count",
  23.               "l1_alarm_count",
  24.               "l2_alarm_count",
  25.               "l3_alarm_count"
  26.             ],
  27.             "connection": [  // 数据库连接信息列表,支持多个数据库连接
  28.               {
  29.                 "jdbcUrl": "jdbc:mysql://hadoop102:3306/car_data_report?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8",  // MySQL 数据库连接地址,设置了 SSL、公钥检索、Unicode 编码等参数
  30.                 "table": [  // 写入的数据库表列表,这里只写入 ads_alarm_stat_last_month 表
  31.                   "ads_alarm_stat_last_month"
  32.                 ]
  33.               }
  34.             ]
  35.           }
  36.         }
  37.       }
  38.     ]
  39.   }
  40. }
复制代码
导出路径参数path并未写死,需在提交任务时通过参数动态传入,参数名称为exportdir。
模版设置参数解析:
HDFSReader:

即:
  1. "reader": {
  2.   "name": "hdfsreader",  // 读取数据的插件类型为 HDFS 文件读取
  3.   "parameter": {
  4.     "path": "${exportdir}",  // HDFS 文件路径,使用 ${exportdir} 变量表示动态路径
  5.     "defaultFS": "hdfs://hadoop102:8020",  // HDFS 默认文件系统地址
  6.     "column": [  // 需要读取的列信息,这里使用通配符 * 表示读取所有列
  7.       "*"
  8.     ],
  9.     "fileType": "text",  // 文件类型为文本文件
  10.     "encoding": "UTF-8",  // 文件编码格式为 UTF-8
  11.     "fieldDelimiter": "\t",  // 字段分隔符为制表符
  12.     "nullFormat": "\\N"  // 空值格式为 \N
  13.   }
  14. },
复制代码
MySQLWriter:
  1. "writer": {
  2.   "name": "mysqlwriter",  // 写入数据的插件类型为 MySQL 数据库写入
  3.   "parameter": {
  4.     "writeMode": "replace",  // 写入模式为替换(如果表存在则先删除再写入)
  5.     "username": "root",  // 数据库用户名
  6.     "password": "000000",  // 数据库密码
  7.     "column": [  // 写入的列信息,包括 vin、mon、alarm_count、l1_alarm_count、l2_alarm_count、l3_alarm_count
  8.       "vin",
  9.       "mon",
  10.       "alarm_count",
  11.       "l1_alarm_count",
  12.       "l2_alarm_count",
  13.       "l3_alarm_count"
  14.     ],
  15.     "connection": [  // 数据库连接信息列表,支持多个数据库连接
  16.       {
  17.         "jdbcUrl": "jdbc:mysql://hadoop102:3306/car_data_report?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8",  // MySQL 数据库连接地址,设置了 SSL、公钥检索、Unicode 编码等参数
  18.         "table": [  // 写入的数据库表列表,这里只写入 ads_alarm_stat_last_month 表
  19.           "ads_alarm_stat_last_month"
  20.         ]
  21.       }
  22.     ]
  23.   }
  24. }
复制代码
2.2 DataX设置文件生成脚本

TODO(在下载的资料压缩包里)datax_config_generator拷贝到/opt/module。
修改/opt/module/datax_config_generator/configuration.properties:
  1. mysql.username=root
  2. mysql.password=000000
  3. mysql.host=hadoop102
  4. mysql.port=3306
  5. mysql.database.import=car_data
  6. mysql.database.export=car_data_report
  7. mysql.tables.import=
  8. mysql.tables.export=
  9. is.seperated.tables=0
  10. hdfs.uri=hdfs://hadoop102:8020
  11. import_out_dir=/opt/module/datax/job/import
  12. export_out_dir=/opt/module/datax/job/export
复制代码
执行设置文件生成器:
  1. java -jar datax-config-generator-1.0.1-jar-with-dependencies.jar
复制代码
观察生成的设置文件:
  1. ll /opt/module/datax/job/export/
  2. 总用量 20
  3. -rw-rw-r--. 1 atguigu atguigu  961 4月  26 19:47 car_data_report.ads_alarm_stat_last_month.json
  4. -rw-rw-r--. 1 atguigu atguigu 1095 4月  26 19:47 car_data_report.ads_consume_stat_last_month.json
  5. -rw-rw-r--. 1 atguigu atguigu 1062 4月  26 19:47 car_data_report.ads_electric_stat_last_month.json
  6. -rw-rw-r--. 1 atguigu atguigu  939 4月  26 19:47 car_data_report.ads_mileage_stat_last_month.json
  7. -rw-rw-r--. 1 atguigu atguigu 1083 4月  26 19:47 car_data_report.ads_temperature_stat_last_month.json
复制代码
2.3 测试生成的DataX设置文件

以ads_trans_order_stats为例,测试用脚本生成的设置文件是否可用。
1)执行DataX同步命令
  1. python /opt/module/datax/bin/datax.py -p"-Dexportdir=/warehouse/car_data/ads/ads_order_stats" /opt/module/datax/job/export/tms_report.ads_order_stats.json
复制代码
2)观察同步结果
观察MySQL目标表是否出现数据。

2.4 编写导出脚本

创建hdfs_to_mysql.sh
  1. vim hdfs_to_mysql.sh
复制代码
  1. #!/bin/bash
  2. # 设置 DataX 的安装路径
  3. DATAX_HOME=/opt/module/datax
  4. # 清理指定路径下的空文件
  5. # 参数 $1: 待清理的路径
  6. handle_export_path() {
  7.   for file in $(hadoop fs -ls -R "$1" | awk '{print $8}'); do
  8.     # 检查文件是否为空
  9.     if hadoop fs -test -z "$file"; then
  10.       echo "$file 文件大小为0,正在删除..."
  11.       # 删除空文件
  12.       hadoop fs -rm -r -f "$file"
  13.     fi
  14.   done
  15. }
  16. # 导出数据到指定路径
  17. # 参数 $1: DataX 配置文件路径
  18. # 参数 $2: 导出路径
  19. export_data() {
  20.   datax_config="$1"
  21.   export_dir="$2"
  22.   # 调用清理空文件函数
  23.   handle_export_path "$export_dir"
  24.   # 执行 DataX 导出命令
  25.   $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" "$datax_config"
  26. }
  27. # 主逻辑,根据传入的参数执行数据导出操作
  28. case $1 in
  29.   'ads_mileage_stat_last_month' | 'ads_alarm_stat_last_month' | 'ads_temperature_stat_last_month' | 'ads_electric_stat_last_month' | 'ads_consume_stat_last_month')
  30.     # 导出单个表的数据
  31.     export_data "/opt/module/datax/job/export/car_data_report.$1.json" "/warehouse/car_data/ads/$1"
  32.     ;;
  33.   'all')
  34.     # 导出所有表的数据
  35.     for table in 'ads_mileage_stat_last_month' 'ads_alarm_stat_last_month' 'ads_temperature_stat_last_month' 'ads_electric_stat_last_month' 'ads_consume_stat_last_month'; do
  36.       export_data "/opt/module/datax/job/export/car_data_report.$table.json" "/warehouse/car_data/ads/$table"
  37.     done
  38.     ;;
  39.   *)
  40.     # 未知参数,打印提示信息
  41.     echo "Usage: $0 {ads_table_name | all}"
  42.     echo "Example: $0 ads_mileage_stat_last_month"
  43.     ;;
  44. esac
复制代码
  1. chmod +x hdfs_to_mysql.sh
  2. hdfs_to_mysql.sh all
复制代码
关注我,紧跟本系列专栏文章,咱们下篇再续!
作者简介:魔都技术专家兼架构,多家大厂后端一线研发履历,各大技术社区头部专家博主。具有丰富的引领团队履历,深厚业务架构和解决方案的积累。
负责:

  • 中央/分销预订系统性能优化
  • 运动&优惠券等营销中台建设
  • 交易平台及数据中台等架构和开发设计
目前主攻降低软件复杂性设计、构建高可用系统方向。
参考:
本文由博客一文多发平台 OpenWrite 发布!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万万哇

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表