大数据-239 离线数仓 - 广告业务 测试 FlumeAgent 加载ODS、DWD层 ...

打印 上一主题 下一主题

主题 776|帖子 776|积分 2328

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据发掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)
章节内容

上节我们完成了如下的内容:


  • 广告业务 点击次数 ADS层
  • 广告效果分析 ADS 层
  • 需求分析与加载

导入数据


Flume Agent

Flume 是一个分布式、可靠且可扩展的系统,用于收集、聚合和传输大量日记数据。它常用于从各种数据源(例如日记文件、应用程序、系统等)收集数据并将其传输到 Hadoop 生态系统(例如 HDFS、Hive、HBase 等)进行进一步处理。Flume 主要由多个组件构成,其中 Flume Agent 是焦点的执行单位。
Flume Agent 是 Flume 架构中的基本执行单位,负责处理数据流的吸取、传输和存储。它可以独立运行或作为 Flume 集群的一部分来提供更高的可扩展性。每个 Flume Agent 由以下几部分构成:


  • Source:用于吸取数据。
  • Channel:用于在 Source 和 Sink 之间暂时存储数据。
  • Sink:用于将数据传送到外部存储系统(如 HDFS、HBase 等)。
  1. flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs3.conf -name a1 -Dflume.roog.logger=INFO,console
复制代码
详细内容如下所示:

扩展性与容错性

Flume Agent 支持分布式摆设,可以通过多个 Agent 在不同节点之间通报数据,适应大规模数据流转的需求。它的容错机制保证了即使某个组件出现故障,数据也不会丢失,依靠 Channel 和 Sink 的队列机制,事件可以被持久化,直到乐成传送。
利用场景

Flume Agent 广泛应用于各种日记收集和大数据处理场景:


  • 日记收集:Flume 可以从多个日记源收集数据,并将其统一存储到 Hadoop 系统(如 HDFS、HBase)中进行后续分析。
  • 实时数据传输:Flume 可以作为实时数据流管道,将数据从不同的数据源实时地传输到目标存储。
  • 数据聚合与整合:Flume 支持将多种范例的数据源进行聚合,提供统一的流处理方式。
准备数据

这里准备了 event 数据,如下图所示,将这批数据上传到指定的目录下,Flume会根据设置进行解析:

上传到服务器上:

观察效果

观察Flume的日记,可以看到如下的效果:
  1. 24/08/31 16:47:59 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=Unknown//startlog..1725094067630.tmp
  2. 24/08/31 16:48:00 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
  3. 24/08/31 16:48:00 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=2020-07-29//startlog..1725094080328.tmp
  4. 24/08/31 16:48:01 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
  5. 24/08/31 16:48:01 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=2020-07-30//startlog..1725094081742.tmp
  6. 24/08/31 16:48:03 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
  7. 24/08/31 16:48:03 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=2020-07-31//startlog..1725094083220.tmp
  8. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0726.log, inode: 2172609, pos: 7815141
  9. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0725.log, inode: 2172608, pos: 7817434
  10. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0727.log, inode: 2172610, pos: 7813191
  11. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0729.log, inode: 2172613, pos: 7833524
  12. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0728.log, inode: 2172612, pos: 7841254
  13. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0721.log, inode: 2164743, pos: 7795972
  14. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0731.log, inode: 2172615, pos: 7804311
  15. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0723.log, inode: 2164790, pos: 7810323
  16. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0730.log, inode: 2172614, pos: 7814852
  17. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0722.log, inode: 2164744, pos: 7841582
  18. 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0724.log, inode: 2164798, pos: 7860701
复制代码
对应的截图如下所示:

同时我们检察HDFS中的数据情况,如下所示:

调用次序

脚本的调用次序,下面是我们在广告业务中编写的脚本:
  1. ods_load_event_log.sh
  2. dwd_load_event_log.sh
  3. dwd_load_ad_log.sh
  4. ads_load_ad_show.sh
  5. ads_load_ad_show_rate.sh
  6. ads_load_ad_show_page.sh
  7. ads_load_ad_show_page_window.sh
复制代码
加载ODS层

之前编写的:
  1. sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-21
复制代码
执行脚本,效果如下所示:

在Hive中检察对应的数据:
  1. hive
  2. use ods;
  3. select * from ods_log_event limit 5;
复制代码
可以看到数据已经加载进来了:

这里我是把全部数据都加载了,后续执行:
  1. sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-22
  2. sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-23
  3. sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-24
  4. ...省略
复制代码
最终的数据大约有:

加载DWD层

event_log

  1. sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-21
复制代码
执行效果如下所示:

在Hive中检察对应的内容:
  1. hive
  2. use dwd;
  3. select * from dwd_event_log limit 5;
复制代码
执行效果如下:

这里我是把全部的数据都加载了,如下所示:
  1. sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-22
  2. sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-23
  3. sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-24
  4. ...省略
复制代码
加载完成之后,Hive中的数据量如下所示:

ad_log

  1. sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-21
复制代码
运行效果如下图所示:

检察Hive中的数据:
  1. hive
  2. use dwd;
  3. select * from dwd_ad limit 5;
复制代码
运行效果如下图所示:

继续加载其他的数据:
  1. sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-22
  2. sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-23
  3. sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-24
  4. ...省略
复制代码
最终Hive中的数据总量的效果是:
  1. select count(*) from dwd_ad;
复制代码
执行效果如下图所示:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

灌篮少年

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表