点一下关注吧!!!非常感谢!!持续更新!!!
Java篇开始了!
目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(已更完)
- Druid(已更完)
- Kylin(已更完)
- Elasticsearch(已更完)
- DataX(已更完)
- Tez(已更完)
- 数据发掘(已更完)
- Prometheus(已更完)
- Grafana(已更完)
- 离线数仓(正在更新…)
章节内容
上节我们完成了如下的内容:
- 广告业务 点击次数 ADS层
- 广告效果分析 ADS 层
- 需求分析与加载
导入数据
Flume Agent
Flume 是一个分布式、可靠且可扩展的系统,用于收集、聚合和传输大量日记数据。它常用于从各种数据源(例如日记文件、应用程序、系统等)收集数据并将其传输到 Hadoop 生态系统(例如 HDFS、Hive、HBase 等)进行进一步处理。Flume 主要由多个组件构成,其中 Flume Agent 是焦点的执行单位。
Flume Agent 是 Flume 架构中的基本执行单位,负责处理数据流的吸取、传输和存储。它可以独立运行或作为 Flume 集群的一部分来提供更高的可扩展性。每个 Flume Agent 由以下几部分构成:
- Source:用于吸取数据。
- Channel:用于在 Source 和 Sink 之间暂时存储数据。
- Sink:用于将数据传送到外部存储系统(如 HDFS、HBase 等)。
- flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs3.conf -name a1 -Dflume.roog.logger=INFO,console
复制代码 详细内容如下所示:
扩展性与容错性
Flume Agent 支持分布式摆设,可以通过多个 Agent 在不同节点之间通报数据,适应大规模数据流转的需求。它的容错机制保证了即使某个组件出现故障,数据也不会丢失,依靠 Channel 和 Sink 的队列机制,事件可以被持久化,直到乐成传送。
利用场景
Flume Agent 广泛应用于各种日记收集和大数据处理场景:
- 日记收集:Flume 可以从多个日记源收集数据,并将其统一存储到 Hadoop 系统(如 HDFS、HBase)中进行后续分析。
- 实时数据传输:Flume 可以作为实时数据流管道,将数据从不同的数据源实时地传输到目标存储。
- 数据聚合与整合:Flume 支持将多种范例的数据源进行聚合,提供统一的流处理方式。
准备数据
这里准备了 event 数据,如下图所示,将这批数据上传到指定的目录下,Flume会根据设置进行解析:
上传到服务器上:
观察效果
观察Flume的日记,可以看到如下的效果:
- 24/08/31 16:47:59 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=Unknown//startlog..1725094067630.tmp
- 24/08/31 16:48:00 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
- 24/08/31 16:48:00 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=2020-07-29//startlog..1725094080328.tmp
- 24/08/31 16:48:01 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
- 24/08/31 16:48:01 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=2020-07-30//startlog..1725094081742.tmp
- 24/08/31 16:48:03 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
- 24/08/31 16:48:03 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=2020-07-31//startlog..1725094083220.tmp
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0726.log, inode: 2172609, pos: 7815141
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0725.log, inode: 2172608, pos: 7817434
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0727.log, inode: 2172610, pos: 7813191
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0729.log, inode: 2172613, pos: 7833524
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0728.log, inode: 2172612, pos: 7841254
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0721.log, inode: 2164743, pos: 7795972
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0731.log, inode: 2172615, pos: 7804311
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0723.log, inode: 2164790, pos: 7810323
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0730.log, inode: 2172614, pos: 7814852
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0722.log, inode: 2164744, pos: 7841582
- 24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0724.log, inode: 2164798, pos: 7860701
复制代码 对应的截图如下所示:
同时我们检察HDFS中的数据情况,如下所示:
调用次序
脚本的调用次序,下面是我们在广告业务中编写的脚本:
- ods_load_event_log.sh
- dwd_load_event_log.sh
- dwd_load_ad_log.sh
- ads_load_ad_show.sh
- ads_load_ad_show_rate.sh
- ads_load_ad_show_page.sh
- ads_load_ad_show_page_window.sh
复制代码 加载ODS层
之前编写的:
- sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-21
复制代码 执行脚本,效果如下所示:
在Hive中检察对应的数据:
- hive
- use ods;
- select * from ods_log_event limit 5;
复制代码 可以看到数据已经加载进来了:
这里我是把全部数据都加载了,后续执行:
- sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-22
- sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-23
- sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-24
- ...省略
复制代码 最终的数据大约有:
加载DWD层
event_log
- sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-21
复制代码 执行效果如下所示:
在Hive中检察对应的内容:
- hive
- use dwd;
- select * from dwd_event_log limit 5;
复制代码 执行效果如下:
这里我是把全部的数据都加载了,如下所示:
- sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-22
- sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-23
- sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-24
- ...省略
复制代码 加载完成之后,Hive中的数据量如下所示:
ad_log
- sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-21
复制代码 运行效果如下图所示:
检察Hive中的数据:
- hive
- use dwd;
- select * from dwd_ad limit 5;
复制代码 运行效果如下图所示:
继续加载其他的数据:
- sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-22
- sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-23
- sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-24
- ...省略
复制代码 最终Hive中的数据总量的效果是:
- select count(*) from dwd_ad;
复制代码 执行效果如下图所示:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |