大数据-246 离线数仓 - 电商分析 拉链表的分析与构建与回滚
点一下关注吧!!!非常感谢!!持续更新!!!Java篇开始了!
目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:
[*]Hadoop(已更完)
[*]HDFS(已更完)
[*]MapReduce(已更完)
[*]Hive(已更完)
[*]Flume(已更完)
[*]Sqoop(已更完)
[*]Zookeeper(已更完)
[*]HBase(已更完)
[*]Redis (已更完)
[*]Kafka(已更完)
[*]Spark(已更完)
[*]Flink(已更完)
[*]ClickHouse(已更完)
[*]Kudu(已更完)
[*]Druid(已更完)
[*]Kylin(已更完)
[*]Elasticsearch(已更完)
[*]DataX(已更完)
[*]Tez(已更完)
[*]数据挖掘(已更完)
[*]Prometheus(已更完)
[*]Grafana(已更完)
[*]离线数仓(正在更新…)
章节内容
上节我们完成了如下的内容:
[*]电商分析 缓慢变化维
[*]拉链表 SCD Slowly Changing Dimensions
https://i-blog.csdnimg.cn/direct/ad4277d056cd45638ce086976d750a95.png
拉链表的实现
userinfo(分区表) => userid、mobile、regdate => 每日变动的数据(修改的+新增的)/ 历史数据(第一天)
userhis(拉链表)=> 多个两个字段 start_date / end_date
拉链表(Zipper Table)
拉链表是一种数据库计划模式,用于跟踪数据随时间的变化,同时保持高效的查询性能。这种模式广泛应用于数据仓库和数据分析场景,因为它能够很好地记录历史数据的变化环境。
拉链表的基本概念
拉链表的核心思想是将每条记录的有效时间范围存储起来,通过“拉链”方式记录版本变化。每一条记录都包罗以下关键信息:
[*]开始时间(Start Date/Effective Date):表示这条记录的生效时间。
[*]竣事时间(End Date/Expiration Date):表示这条记录的失效时间。
[*]是否当前有效(Is Current):表示这条记录是否为最新版本(通常通过标志位存储,如1表示当前记录,0表示历史记录)。
工作原理
[*]新增数据:当有新数据插入时,系统创建一条新记录,设置其开始时间为当前时间,竣事时间为一个默认的最大时间(如9999-12-31),同时将is_current字段设为1。
[*]更新数据:起首将现有的有效记录的竣事时间更新为当前时间,表示它的有效期竣事,同时将is_current标志设为0。
[*]然后插入一条新的记录,表示更新后的版本,开始时间为当前时间,竣事时间为默认最大时间,is_current标志为1。
[*]删除数据:一般通过逻辑删除方式(更新竣事时间和is_current字段)实现,而不是直接物理删除。
预备数据
这里的数据刚才已经全部都写入进去了
-- 1、userinfo初始化(2020-06-20)。获取历史数据
001,13551111111,2020-03-01,2020-06-20
002,13561111111,2020-04-01,2020-06-20
003,13571111111,2020-05-01,2020-06-20
004,13581111111,2020-06-01,2020-06-20
初始化拉链表
将2020-06-20的数据写入到表中
-- 2、初始化拉链表(2020-06-20)。userinfo => userhis
INSERT OVERWRITE TABLE test.userhis
SELECT
userid,
mobile,
regdate,
dt AS start_date,
'9999-12-31' AS end_date
FROM
test.userinfo
WHERE
dt = '2020-06-20';
实行结果如下所示:
https://i-blog.csdnimg.cn/direct/9b11cf7bd05548329cfc618ea2d194eb.png
继续预备数据
这批数据也已经写入了:
-- 3、次日新增数据(2020-06-21);获取新增数据
002,13562222222,2020-04-01,2020-06-21
004,13582222222,2020-06-01,2020-06-21
005,13552222222,2020-06-21,2020-06-21
构建拉链表
-- 4、构建拉链表(userhis)(2020-06-21)【核心】 userinfo(2020-06-21) + userhis => userhis
-- userinfo: 新增数据
-- userhis:历史数据
-- 第一步:处理新增数据【userinfo】(处理逻辑与加载历史数据类似)
SELECT
userid,
mobile,
regdate,
dt AS start_date,
'9999-12-31' AS end_date
FROM
test.userinfo
WHERE
dt = '2020-06-21';
-- 第二步:处理历史数据【userhis】(历史包括两部分:变化的、未变化的)
-- 变化的:start_date:不变;end_date:传入日期-1
-- 未变化的:不做处理
-- 观察数据
SELECT
A.userid,
B.userid,
B.mobile,
B.regdate,
B.start_date,
B.end_date
FROM
(SELECT *
FROM test.userinfo
WHERE dt = '2020-06-21') A
RIGHT JOIN
test.userhis B
ON
A.userid = B.userid;
-- 编写SQL,处理历史数据
SELECT
B.userid,
B.mobile,
B.regdate,
B.start_Date,
CASE
WHEN B.end_date = '9999-12-31' AND A.userid IS NOT NULL
THEN DATE_ADD('2020-06-21', INTERVAL -1 DAY)
ELSE B.end_date
END AS end_date
FROM
(SELECT * FROM test.userinfo WHERE dt = '2020-06-21') A
RIGHT JOIN
test.userhis B
ON
A.userid = B.userid;
-- 最终的处理(新增+历史数据)
INSERT OVERWRITE TABLE test.userhis
SELECT
userid,
mobile,
regdate,
dt AS start_date,
'9999-12-31' AS end_date
FROM
test.userinfo
WHERE
dt = '2020-06-21'
UNION ALL
SELECT
B.userid,
B.mobile,
B.regdate,
B.start_date,
CASE
WHEN B.end_date = '9999-12-31' AND A.userid IS NOT NULL
THEN date_add('2020-06-21', -1)
ELSE B.end_date
END AS end_date
FROM
(SELECT * FROM test.userinfo WHERE dt = '2020-06-21') A
RIGHT JOIN
test.userhis B
ON
A.userid = B.userid;
实行过程如下图所示:
https://i-blog.csdnimg.cn/direct/9ccd518edfd5471c860d9dfc18533064.png
拉链表测试脚本
vim test_zipper.sh
写入的内容如下所示:
#!/bin/bash
# 加载环境变量
source /etc/profile
# 判断是否传入日期参数,如果没有则使用前一天的日期
if [ -n "$1" ]; then
do_date=$1
else
do_date=$(date -d "-1 day" +%F)
fi
# SQL 语句
sql="
INSERT OVERWRITE TABLE test.userhis
SELECT
userid,
mobile,
regdate,
dt AS start_date,
'9999-12-31' AS end_date
FROM
test.userinfo
WHERE
dt = '$do_date'
UNION ALL
SELECT
B.userid,
B.mobile,
B.regdate,
B.start_date,
CASE
WHEN B.end_date = '9999-12-31' AND A.userid IS NOT NULL THEN date_add('$do_date', -1)
ELSE B.end_date
END AS end_date
FROM
(SELECT * FROM test.userinfo WHERE dt = '$do_date') A
RIGHT JOIN
test.userhis B
ON
A.userid = B.userid;
"
# 执行 Hive SQL
hive -e "$sql"
拉链表的回滚
由于种种原因必要将拉链表规复到rollback_date那一天的数据,此时有:
[*]end_date < rollback_date,即竣事日期<回滚日期,表示该行数据在roll_back_date之前产生,这些数据必要原样保留
[*]start_date <= rollback_date <= end_date,即开始日期 <= 回滚日期 <= 竣事日期,这些数据是回滚日期之后产生的,但是必要修改,将end_date改为9999-12-31
[*]其他数据不用管
https://i-blog.csdnimg.cn/direct/14890ef7f736404c9dd020d1a52e932a.png
按照上述方案进行编码:
处理end_date < rollback_date 的数据
SELECT
userid,
mobile,
regdate,
start_date,
end_date,
'1' AS tag
FROM
test.userhis
WHERE
end_date < '2020-06-22';
处理start_date <= rollback_date <= end_date 的数据,设置 end_date=9999-12-31
SELECT
userid,
mobile,
regdate,
start_date,
'9999-12-31' AS end_date,
'2' AS tag
FROM
test.userhis
WHERE
start_date <= '2020-06-22'
AND end_date >= '2020-06-22';
将前面两步的数据写入暂时表tmp(拉链表)
-- 删除暂时表DROP TABLE IF EXISTS test.tmp;-- 创建暂时表CREATE TABLE test.tmp ASSELECT userid, mobile, regdate, start_date, end_date, '1' AS tagFROM test.userhisWHERE end_date < '2020-06-22'UNION ALLSELECT
userid,
mobile,
regdate,
start_date,
'9999-12-31' AS end_date,
'2' AS tag
FROM
test.userhis
WHERE
start_date <= '2020-06-22'
AND end_date >= '2020-06-22';
-- 查询结果并按照 userid 和 start_date 进行聚集SELECT * FROM test.tmp CLUSTER BY userid, start_date; 模拟脚本:
zippertmp.sh
写入的内容如下所示:
#!/bin/bash
# 加载环境变量
source /etc/profile
# 判断是否传递日期参数,如果没有则使用前一天的日期
if [ -n "$1" ]; then
do_date=$1
else
do_date=$(date -d "-1 day" +%F)
fi
# 定义SQL查询语句
sql="
DROP TABLE IF EXISTS test.tmp;
CREATE TABLE test.tmp AS
SELECT userid, mobile, regdate, start_date, end_date, '1' AS tag
FROM test.userhis
WHERE end_date < '${do_date}'
UNION ALL
SELECT userid, mobile, regdate, start_date, '9999-12-31' AS end_date, '2' AS tag
FROM test.userhis
WHERE start_date <= '${do_date}'
AND end_date >= '${do_date}';
"
# 执行Hive查询
hive -e "$sql"
逐天回滚,检查数据
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]