大数据-250 离线数仓 - 电商分析 ADS层 与 Airflow 使命调理系统基本先容
点一下关注吧!!!非常感谢!!连续更新!!!Java篇开始了!
目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:
[*]Hadoop(已更完)
[*]HDFS(已更完)
[*]MapReduce(已更完)
[*]Hive(已更完)
[*]Flume(已更完)
[*]Sqoop(已更完)
[*]Zookeeper(已更完)
[*]HBase(已更完)
[*]Redis (已更完)
[*]Kafka(已更完)
[*]Spark(已更完)
[*]Flink(已更完)
[*]ClickHouse(已更完)
[*]Kudu(已更完)
[*]Druid(已更完)
[*]Kylin(已更完)
[*]Elasticsearch(已更完)
[*]DataX(已更完)
[*]Tez(已更完)
[*]数据发掘(已更完)
[*]Prometheus(已更完)
[*]Grafana(已更完)
[*]离线数仓(正在更新…)
章节内容
上节我们完成了如下的内容:
[*]DWD层建表与脚本
[*]DWS层建表与脚本
https://i-blog.csdnimg.cn/direct/e660bc721b6645279e436d69288c8f18.png
ADS层开发
需求:计算当天
[*]全国所有订单信息
[*]全国、一级商品分类订单信息
[*]全国、二级商品分类订单信息
[*]大区所有订单信息
[*]大区、一级商品分类订单信息
[*]大区、二级商品分类订单信息
[*]都会所有订单信息
[*]都会、一级商品分类订单信息
[*]都会、二级商品分类订单信息
用到的表:
[*]dws.dws_trade_orders_w
ADS层建表
-- ADS层订单分析表
DROP TABLE IF EXISTS ads.ads_trade_order_analysis;
create table if not exists ads.ads_trade_order_analysis(
areatype string, -- 区域范围:区域类型(全国、大
区、城市)
regionname string, -- 区域名称
cityname string, -- 城市名称
categorytype string, -- 商品分类类型(一级、二级)
category1 string, -- 商品一级分类名称
category2 string, -- 商品二级分类名称
totalcount bigint, -- 订单数量
total_productnum bigint, -- 商品数量
totalmoney double -- 支付金额
)
partitioned by (dt string)
row format delimited fields terminated by ',';
ADS层加载数据
编写一个脚本加载数据
vim ads_load_trade_order_analysis.sh
1笔订单,有多个商品,多个商品有差别的分类,这会导致一笔订单有多个分类,它们是分别统计的:
#!/bin/bash
source /etc/profile
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
with mid_orders as (
select regionname,
cityname,
firstname category1,
secondname category2,
count(distinct orderid) as totalcount,
sum(productsnum) as total_productnum,
sum(paymoney) as totalmoney
from dws.dws_trade_orders_w
where dt='$do_date'
group by regionname, cityname, firstname, secondname
)
insert overwrite table ads.ads_trade_order_analysis
partition(dt='$do_date')
select '全国' as areatype,
'' as regionname,
'' as cityname,
'' as categorytype,
'' as category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
union all
select '全国' as areatype,
'' as regionname,
'' as cityname,
'一级' as categorytype,
category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by category1
union all
select '全国' as areatype,
'' as regionname,
'' as cityname,
'二级' as categorytype,
'' as category1,
category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by category2
union all
select '大区' as areatype,
regionname,
'' as cityname,
'' as categorytype,
'' as category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by regionname
union all
select '大区' as areatype,
regionname,
'' as cityname,
'一级' as categorytype,
category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by regionname, category1
union all
select '大区' as areatype,
regionname,
'' as cityname,
'二级' as categorytype,
'' as category1,
category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by regionname, category2
union all
select '城市' as areatype,
'' as regionname,
cityname,
'' as categorytype,
'' as category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by cityname
union all
select '城市' as areatype,
'' as regionname,
cityname,
'一级' as categorytype,
category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by cityname, category1
union all
select '城市' as areatype,
'' as regionname,
cityname,
'二级' as categorytype,
'' as category1,
category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by cityname, category2;
"
hive -e "$sql"
备注:由于在 dws.dws_trade_orders_w中,一笔订单大概有多条记录,所有在统计订单数量的时候用count(distinct orderid)
末了小结
https://i-blog.csdnimg.cn/direct/979c6095890e4540bc3c29689334c84a.png
# 加载ODS数据(含DataX迁移数据)
/data/lagoudw/script/trade/ods_load_trade.sh
# 加载DIM层数据
/data/lagoudw/script/trade/dim_load_product_cat.sh
/data/lagoudw/script/trade/dim_load_shop_org.sh
/data/lagoudw/script/trade/dim_load_payment.sh
/data/lagoudw/script/trade/dim_load_product_info.sh
# 加载DWD层数据
/data/lagoudw/script/trade/dwd_load_trade_orders.sh
# 加载DWS层数据
/data/lagoudw/script/trade/dws_load_trade_orders.sh
# 加载ADS层数据
/data/lagoudw/script/trade/ads_load_trade_order_analysis.sh
Airflow简介
Airflow是Airbnb开源的一个用Python编写的调理工具,与2014年启动,2015年春季开源,2016年加入Apache软件基金会的孵化计划。
Airflow将一个工作流制定为一组使命的有向无环图(DAG),并指派到一组计算节点上,根据相互之间的依赖关系,有序执行。
Airflow的优势如下:
[*]机动易用,Airflow是Python编写的,工作流的定义也利用Python编写
[*]功能强盛,支持多种差别范例的作业,可自定义差别范例的作业,如Shell、Python、MySQL、Oracle、Hive等
[*]简便优雅,作业的定义简单明了
[*]易扩展,提供各种基类供扩展,有多种执行器可供选择
Apache Airflow 是一个开源的使命调理和工作流管理平台,主要用于开发、调试和监控数据管道。Airflow 通过利用 Python 脚本定义使命和依赖关系,帮助用户以编程的方式构建动态的、可视化的工作流。
体系架构
https://i-blog.csdnimg.cn/direct/d15fdb24222b4daa92c3aec1697a17f1.png
[*]WebServer保卫进程,接受HTTP哀求,通过Python Flask Web应用程序与Airflow举行交互,WebServer提供功能的功能包罗:中止、规复、触发使命,监控正在运行的使命,断点续跑使命,查询使命的状态,日记等详细信息。
[*]Scheduler保卫进程,周期性地轮询使命的调理计算,以确定是否触发使命执行
[*]Worker保卫进程,Worker负责启动机器上的Executor来执行使命,利用CeleryExecutor后可以在多个机器人摆设Worker服务
焦点特性
基于代码的工作流定义
工作流以 Python 脚本定义,称为 DAG(Directed Acyclic Graph,有向无环图)。每个 DAG 包罗一组使命及其依赖关系。
支持参数化和动态天生 DAG,适合复杂工作流场景。
使命调理
支持机动的使命调理,用户可以通逾期间间隔、特定时间点等方式定义使命的运行周期。
调理器自动根据依赖关系按次序触发使命执行。
使命分布式执行
使命可以分布式运行,支持横向扩展,能够通过多个 Worker 节点进步使命处置惩罚能力。
可视化界面
Airflow 提供了功能强盛的 Web UI,用于查看 DAG 的运行状态、日记以及汗青记录。
图形化展示 DAG 结构,方便直观地相识工作流执行环境。
高可扩展性
支持通过插件机制扩展功能,比方自定义操作符(Operator)或 Hook。
内置多种操作符和连接器(如 BashOperator、PythonOperator、MySqlOperator 等)用于集成各种使命和外部系统。
依赖管理
Airflow 支持基于使命之间的依赖关系举行调理。用户可以显式地定义使命次序和依赖。
重点概念
DAG
DAG(Directed Acyclic Graph)有向无环图
[*]在AirFlow中,一个DAG定义了一个完备的作业,同一个DAG中的所有Task拥有相同的调理时间
[*]参数:dag_id(唯一识别DAG),default_args(默认参数,如果当前DAG实例的作业没有设置相应参数,则采用DAG实例的default_agrs中的相应参数)
[*]schedule_interval:设置GAG的执行周期,可采用crontab语法
Task
[*]Task为DAG中具体的作业使命,依赖于DAG,必须存在于某个DAG中,Task在DAG中可以设置依赖关系
[*]dag:当前作业属于相应的DAG
[*]task_id 使命标识符
[*]owner:使命的拥有者
[*]start_date:使命的开始时间
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]