大数据-250 离线数仓 - 电商分析 ADS层 与 Airflow 使命调理系统基本先容 ...

打印 上一主题 下一主题

主题 800|帖子 800|积分 2400

点一下关注吧!!!非常感谢!!连续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据发掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)
章节内容

上节我们完成了如下的内容:


  • DWD层建表与脚本
  • DWS层建表与脚本

ADS层开发

需求:计算当天


  • 全国所有订单信息
  • 全国、一级商品分类订单信息
  • 全国、二级商品分类订单信息
  • 大区所有订单信息
  • 大区、一级商品分类订单信息
  • 大区、二级商品分类订单信息
  • 都会所有订单信息
  • 都会、一级商品分类订单信息
  • 都会、二级商品分类订单信息
用到的表:


  • dws.dws_trade_orders_w
ADS层建表

  1. -- ADS层订单分析表
  2. DROP TABLE IF EXISTS ads.ads_trade_order_analysis;
  3. create table if not exists ads.ads_trade_order_analysis(
  4. areatype string, -- 区域范围:区域类型(全国、大
  5. 区、城市)
  6. regionname string, -- 区域名称
  7. cityname string, -- 城市名称
  8. categorytype string, -- 商品分类类型(一级、二级)
  9. category1 string, -- 商品一级分类名称
  10. category2 string, -- 商品二级分类名称
  11. totalcount bigint, -- 订单数量
  12. total_productnum bigint, -- 商品数量
  13. totalmoney double -- 支付金额
  14. )
  15. partitioned by (dt string)
  16. row format delimited fields terminated by ',';
复制代码
ADS层加载数据

编写一个脚本加载数据
  1. vim ads_load_trade_order_analysis.sh
复制代码
1笔订单,有多个商品,多个商品有差别的分类,这会导致一笔订单有多个分类,它们是分别统计的:
  1. #!/bin/bash
  2. source /etc/profile
  3. if [ -n "$1" ]
  4. then
  5. do_date=$1
  6. else
  7. do_date=`date -d "-1 day" +%F`
  8. fi
  9. sql="
  10. with mid_orders as (
  11. select regionname,
  12. cityname,
  13. firstname category1,
  14. secondname category2,
  15. count(distinct orderid) as totalcount,
  16. sum(productsnum) as total_productnum,
  17. sum(paymoney) as totalmoney
  18. from dws.dws_trade_orders_w
  19. where dt='$do_date'
  20. group by regionname, cityname, firstname, secondname
  21. )
  22. insert overwrite table ads.ads_trade_order_analysis
  23. partition(dt='$do_date')
  24. select '全国' as areatype,
  25. '' as regionname,
  26. '' as cityname,
  27. '' as categorytype,
  28. '' as category1,
  29. '' as category2,
  30. sum(totalcount),
  31. sum(total_productnum),
  32. sum(totalmoney)
  33. from mid_orders
  34. union all
  35. select '全国' as areatype,
  36. '' as regionname,
  37. '' as cityname,
  38. '一级' as categorytype,
  39. category1,
  40. '' as category2,
  41. sum(totalcount),
  42. sum(total_productnum),
  43. sum(totalmoney)
  44. from mid_orders
  45. group by category1
  46. union all
  47. select '全国' as areatype,
  48. '' as regionname,
  49. '' as cityname,
  50. '二级' as categorytype,
  51. '' as category1,
  52. category2,
  53. sum(totalcount),
  54. sum(total_productnum),
  55. sum(totalmoney)
  56. from mid_orders
  57. group by category2
  58. union all
  59. select '大区' as areatype,
  60. regionname,
  61. '' as cityname,
  62. '' as categorytype,
  63. '' as category1,
  64. '' as category2,
  65. sum(totalcount),
  66. sum(total_productnum),
  67. sum(totalmoney)
  68. from mid_orders
  69. group by regionname
  70. union all
  71. select '大区' as areatype,
  72. regionname,
  73. '' as cityname,
  74. '一级' as categorytype,
  75. category1,
  76. '' as category2,
  77. sum(totalcount),
  78. sum(total_productnum),
  79. sum(totalmoney)
  80. from mid_orders
  81. group by regionname, category1
  82. union all
  83. select '大区' as areatype,
  84. regionname,
  85. '' as cityname,
  86. '二级' as categorytype,
  87. '' as category1,
  88. category2,
  89. sum(totalcount),
  90. sum(total_productnum),
  91. sum(totalmoney)
  92. from mid_orders
  93. group by regionname, category2
  94. union all
  95. select '城市' as areatype,
  96. '' as regionname,
  97. cityname,
  98. '' as categorytype,
  99. '' as category1,
  100. '' as category2,
  101. sum(totalcount),
  102. sum(total_productnum),
  103. sum(totalmoney)
  104. from mid_orders
  105. group by cityname
  106. union all
  107. select '城市' as areatype,
  108. '' as regionname,
  109. cityname,
  110. '一级' as categorytype,
  111. category1,
  112. '' as category2,
  113. sum(totalcount),
  114. sum(total_productnum),
  115. sum(totalmoney)
  116. from mid_orders
  117. group by cityname, category1
  118. union all
  119. select '城市' as areatype,
  120. '' as regionname,
  121. cityname,
  122. '二级' as categorytype,
  123. '' as category1,
  124. category2,
  125. sum(totalcount),
  126. sum(total_productnum),
  127. sum(totalmoney)
  128. from mid_orders
  129. group by cityname, category2;
  130. "
  131. hive -e "$sql"
复制代码
备注:由于在 dws.dws_trade_orders_w中,一笔订单大概有多条记录,所有在统计订单数量的时候用count(distinct orderid)
末了小结


  1. # 加载ODS数据(含DataX迁移数据)
  2. /data/lagoudw/script/trade/ods_load_trade.sh
  3. # 加载DIM层数据
  4. /data/lagoudw/script/trade/dim_load_product_cat.sh
  5. /data/lagoudw/script/trade/dim_load_shop_org.sh
  6. /data/lagoudw/script/trade/dim_load_payment.sh
  7. /data/lagoudw/script/trade/dim_load_product_info.sh
  8. # 加载DWD层数据
  9. /data/lagoudw/script/trade/dwd_load_trade_orders.sh
  10. # 加载DWS层数据
  11. /data/lagoudw/script/trade/dws_load_trade_orders.sh
  12. # 加载ADS层数据
  13. /data/lagoudw/script/trade/ads_load_trade_order_analysis.sh
复制代码
Airflow简介

Airflow是Airbnb开源的一个用Python编写的调理工具,与2014年启动,2015年春季开源,2016年加入Apache软件基金会的孵化计划。
Airflow将一个工作流制定为一组使命的有向无环图(DAG),并指派到一组计算节点上,根据相互之间的依赖关系,有序执行。
Airflow的优势如下:


  • 机动易用,Airflow是Python编写的,工作流的定义也利用Python编写
  • 功能强盛,支持多种差别范例的作业,可自定义差别范例的作业,如Shell、Python、MySQL、Oracle、Hive等
  • 简便优雅,作业的定义简单明了
  • 易扩展,提供各种基类供扩展,有多种执行器可供选择
Apache Airflow 是一个开源的使命调理和工作流管理平台,主要用于开发、调试和监控数据管道。Airflow 通过利用 Python 脚本定义使命和依赖关系,帮助用户以编程的方式构建动态的、可视化的工作流。
体系架构




  • WebServer保卫进程,接受HTTP哀求,通过Python Flask Web应用程序与Airflow举行交互,WebServer提供功能的功能包罗:中止、规复、触发使命,监控正在运行的使命,断点续跑使命,查询使命的状态,日记等详细信息。
  • Scheduler保卫进程,周期性地轮询使命的调理计算,以确定是否触发使命执行
  • Worker保卫进程,Worker负责启动机器上的Executor来执行使命,利用CeleryExecutor后可以在多个机器人摆设Worker服务
焦点特性

基于代码的工作流定义

工作流以 Python 脚本定义,称为 DAG(Directed Acyclic Graph,有向无环图)。每个 DAG 包罗一组使命及其依赖关系。
支持参数化和动态天生 DAG,适合复杂工作流场景。
使命调理

支持机动的使命调理,用户可以通逾期间间隔、特定时间点等方式定义使命的运行周期。
调理器自动根据依赖关系按次序触发使命执行。
使命分布式执行

使命可以分布式运行,支持横向扩展,能够通过多个 Worker 节点进步使命处置惩罚能力。
可视化界面

Airflow 提供了功能强盛的 Web UI,用于查看 DAG 的运行状态、日记以及汗青记录。
图形化展示 DAG 结构,方便直观地相识工作流执行环境。
高可扩展性

支持通过插件机制扩展功能,比方自定义操作符(Operator)或 Hook。
内置多种操作符和连接器(如 BashOperator、PythonOperator、MySqlOperator 等)用于集成各种使命和外部系统。
依赖管理

Airflow 支持基于使命之间的依赖关系举行调理。用户可以显式地定义使命次序和依赖。
重点概念

DAG

DAG(Directed Acyclic Graph)有向无环图


  • 在AirFlow中,一个DAG定义了一个完备的作业,同一个DAG中的所有Task拥有相同的调理时间
  • 参数:dag_id(唯一识别DAG),default_args(默认参数,如果当前DAG实例的作业没有设置相应参数,则采用DAG实例的default_agrs中的相应参数)
  • schedule_interval:设置GAG的执行周期,可采用crontab语法
Task



  • Task为DAG中具体的作业使命,依赖于DAG,必须存在于某个DAG中,Task在DAG中可以设置依赖关系
  • dag:当前作业属于相应的DAG
  • task_id 使命标识符
  • owner:使命的拥有者
  • start_date:使命的开始时间

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

瑞星

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表