大数据-234 离线数仓 - 异构数据源 DataX 将数据 从 HDFS 到 MySQL ...

打印 上一主题 下一主题

主题 790|帖子 790|积分 2370

点一下关注吧!!!非常感谢!!连续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)
章节内容

上节我们完成了如下的内容(留存会员模块):


  • DWS 层
  • ADS 层
  • 创建 Hive 执行脚本

基本架构

之前已经完成了Flume的数据收罗到HDFS中,现在我们将依次走通流程:


  • ODS
  • DWD
  • DWS
  • ADS
  • DataX数据导出到MySQL

    ADS有4张表需要从数据堆栈的ADS层导入MySQL,即:Hive => MySQL
  1. ads.ads_member_active_count
  2. ads.ads_member_retention_count
  3. ads.ads_member_retention_rate
  4. ads.ads_new_member_cnt
复制代码
在Hive中可以看到这几张表:

创建库表

  1. -- MySQL 建表
  2. -- 活跃会员数
  3. create database dwads;
  4. drop table if exists dwads.ads_member_active_count;
  5. create table dwads.ads_member_active_count(
  6.   `dt` varchar(10) COMMENT '统计日期',
  7.   `day_count` int COMMENT '当日会员数量',
  8.   `week_count` int COMMENT '当周会员数量',
  9.   `month_count` int COMMENT '当月会员数量',
  10.   primary key (dt)
  11. );
  12. -- 新增会员数
  13. drop table if exists dwads.ads_new_member_cnt;
  14. create table dwads.ads_new_member_cnt
  15. (
  16.   `dt` varchar(10) COMMENT '统计日期',
  17.   `cnt` int,
  18.   primary key (dt)
  19. );
  20. -- 会员留存数
  21. drop table if exists dwads.ads_member_retention_count;
  22. create table dwads.ads_member_retention_count
  23. (
  24.   `dt` varchar(10) COMMENT '统计日期',
  25.   `add_date` varchar(10) comment '新增日期',
  26.   `retention_day` int comment '截止当前日期留存天数',
  27.   `retention_count` bigint comment '留存数',
  28.   primary key (dt)
  29. ) COMMENT '会员留存情况';
  30. -- 会员留存率
  31. drop table if exists dwads.ads_member_retention_rate;
  32. create table dwads.ads_member_retention_rate
  33. (
  34.   `dt` varchar(10) COMMENT '统计日期',
  35.   `add_date` varchar(10) comment '新增日期',
  36.   `retention_day` int comment '截止当前日期留存天数',
  37.   `retention_count` bigint comment '留存数',
  38.   `new_mid_count` bigint comment '当日会员新增数',
  39.   `retention_ratio` decimal(10,2) comment '留存率',
  40.   primary key (dt)
  41. ) COMMENT '会员留存率';
复制代码
执行结果如下图:

DataX

DataX 之前章节已经先容过了 这里就简单一说 详细教程看之前的

基本先容

DataX 是阿里巴巴开源的一款分布式数据同步工具,用于实现各种异构数据源之间高效、稳定的数据同步。其重要功能包括数据的批量导入、导出和实时传输,支持多种主流数据源,比方关系型数据库、NoSQL 数据库、大数据存储系统等。
DataX 的核心头脑是“插件化架构”,通过灵活的 Reader 和 Writer 插件实现不同数据源之间的数据互换。
DataX 的特点

插件化架构



  • Reader:用于从数据源读取数据。
  • Writer:用于将数据写入目标存储。
  • 插件开发简单,可以根据需要扩展支持新的数据源。
高性能与高扩展性



  • 支持大规模数据同步,处置处罚速率快。
  • 支持多线程并发传输,利用 CPU 和 IO 性能。
  • 可设置分片任务(Shard),实现分布式同步。
兼容性强



  • 支持丰富的异构数据源,包括 MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、HDFS、Hive、ODPS、ElasticSearch 等。
  • 可在不同系统之间传输数据,比如从传统 RDBMS 数据库迁移到大数据系统。
易用性



  • 设置简单,基于 JSON 文件定义任务,易于上手。
  • 提供详尽的运行日记,便于定位和解决题目。
  • 开源代码,支持二次开发。
可监控性



  • 提供详细的任务运行指标,比如吞吐量、数据量等。
  • 支持失败任务自动重试,确保数据同步过程的可靠性。
设置文件

导出活跃会员数(ads_member_active_count),编写一个JSON出来:
  1. vim /opt/wzk/datax/export_member_active_count.json
复制代码
hdfsreader => mysqlwriter
  1. {
  2.   "job": {
  3.     "setting": {
  4.       "speed": {
  5.         "channel": 1
  6.       }
  7.     },
  8.     "content": [{
  9.       "reader": {
  10.         "name": "hdfsreader",
  11.         "parameter": {
  12.           "path":
  13.           "/user/hive/warehouse/ads.db/ads_member_active_count/dt=$do_date/*",
  14.           "defaultFS": "hdfs://h121.wzk.icu:9000",
  15.           "column": [{
  16.             "type": "string",
  17.             "value": "$do_date"
  18.           }, {
  19.             "index": 0,
  20.             "type": "string"
  21.           },
  22.             {
  23.               "index": 1,
  24.               "type": "string"
  25.             },
  26.             {
  27.               "index": 2,
  28.               "type": "string"
  29.             }
  30.           ],
  31.           "fileType": "text",
  32.           "encoding": "UTF-8",
  33.           "fieldDelimiter": ","
  34.         }
  35.       },
  36.       "writer": {
  37.         "name": "mysqlwriter",
  38.         "parameter": {
  39.           "writeMode": "replace",
  40.           "username": "hive",
  41.           "password": "hive@wzk.icu",
  42.           "column": ["dt","day_count","week_count","month_count"],
  43.           "preSql": [
  44.             ""
  45.           ],
  46.           "connection": [{
  47.             "jdbcUrl":
  48.             "jdbc:mysql://h122.wzk.icu:3306/dwads?
  49.             useUnicode=true&characterEncoding=utf-8",
  50.             "table": [
  51.               "ads_member_active_count"
  52.             ]
  53.           }]
  54.         }
  55.       }
  56.     }]
  57.   }
  58. }
复制代码
写入的内容如下所示:

编写命令

DataX的运行的方式如下所示:
  1. python datax.py -p "-Ddo_date=2020-07-21" /opt/wzk/datax/export_member_active_count.json
复制代码
编写脚本

编写一个脚本用来完成这个流程:
  1. vim /opt/wzk/hive/export_member_active_count.sh
复制代码
写入的内容如下所示:
  1. #!/bin/bash
  2. JSON= /opt/wzk/datax
  3. source /etc/profile
  4. if [ -n "$1" ] ;then
  5. do_date=$1
  6. else
  7. do_date=`date -d "-1 day" +%F`
  8. fi
  9. python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" $JSON/export_member_active_count.json
复制代码
写入的内容如下所示:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

tsx81428

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表