铁佛 发表于 2024-8-26 13:31:26

大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD

点一下关注吧!!!非常感谢!!持续更新!!!

现在已经更新到了:



[*]Hadoop(已更完)
[*]HDFS(已更完)
[*]MapReduce(已更完)
[*]Hive(已更完)
[*]Flume(已更完)
[*]Sqoop(已更完)
[*]Zookeeper(已更完)
[*]HBase(已更完)
[*]Redis (已更完)
[*]Kafka(已更完)
[*]Spark(正在更新!)
章节内容

上节完成的内容如下:


[*]Spark程序的优化:广播变量、累加器
[*]研究广播变量
[*]研究累加器
https://i-blog.csdnimg.cn/direct/84881427a5764bfe888fa2f9435631db.png
Standalone提交

Standalone组成

Standalone 模式下有四个重要的组成部门,分别是:


[*]Driver: 用户编写的Spark应用程序就运行在Driver上,由Driver进程执行
[*]Master:主要负责资源的调理和分配,并进行集群的监控等职责
[*]Worker:Worker运行在集群中的一台服务器上,负责管理该节点上的资源,负责启动节点上的Executor
[*]Executor:一个Worker上可以运行多个Executor,Executor通过启动多个线程(Task)对RDD的分区进行盘算
SparkContext组件

什么是SparkContext

SparkContext 是 Spark 应用程序的主控制器,它负责与 Spark 集群的管理节点(Driver)和工作节点(Workers)进行交互。通过 SparkContext,用户可以提交作业、管理 RDD(弹性分布式数据集)和其他数据集,并执行各种操作。SparkContext 是 Spark 应用程序的基础,每个应用程序在启动时都会创建一个 SparkContext 实例。
SparkContext 的主要职责



[*]集群连接: SparkContext 负责连接到集群管理器(如 YARN、Mesos 或 Spark 的独立集群管理器),并获取集群的资源,以便在集群上执行任务。
[*]作业调理: SparkContext 通过 DAG(有向无环图)将用户的应用程序逻辑转换为一系列任务(Tasks),然后将这些任务分配给集群中的工作节点执行。
[*]RDD 管理: RDD 是 Spark 的焦点抽象,用于表现分布式数据集。SparkContext 提供了创建 RDD 的方法,如从外部存储系统(HDFS、S3 等)中加载数据,或者从 Scala 聚集创建 RDD。
[*]广播变量和累加器: SparkContext 提供了广播变量和累加器的支持,广播变量用于在集群中的所有节点间共享只读数据,累加器用于在集群中执行全局计数或求和操作。
[*]检查点: 为了支持容错,SparkContext 提供了将 RDD 存储到可靠存储中的功能,这称为检查点。如许,在发生故障时,Spark 可以从检查点恢复 RDD。
SparkContext中的三大组件:



[*]DAGScheduler:负责将DAG划分若干个Stage
[*]TaskScheduler:将DAGScheduler提交的Stage(Taskset)进行优先排序,再将Task发送到Executor
[*]SchedulerBackend:定义了许多与Executor事件相关的处理,包罗:新的Executor注册进来的时候记录Executor的信息,增加全局的资源量(核数),Executor更新状态,若任务完成的话,采取Core,其他停止Executor、Remove Executor等事件
https://i-blog.csdnimg.cn/direct/e4d740a30c544637aa7cbd0d0ee8e2ec.png
常用的 SparkContext 方法



[*]parallelize: 将本地聚集转换为 RDD。
[*]textFile: 从文本文件中读取数据并创建 RDD。
[*]stop: 停止 SparkContext。
[*]broadcast: 创建广播变量。
[*]accumulator: 创建累加器。
Standalone提交



[*]启动应用程序,完成SparkContext的初始化
[*]Driver向Master注册,申请资源
[*]Master检查集群资源状态,若集群资源满足,通知Worker启动Executor
[*]Executor启动后向Driver注册(称为反向注册)
[*]Driver完成DAG的解析,得到Tasks,然后向Executor发送Task
[*]Executor向Driver汇总任务的执行环境
[*]应用程序执行完毕,采取资源
https://i-blog.csdnimg.cn/direct/b7c938ad648c420882ea228457647e6d.png
Shuffle原理

基本概念

Shuffle的本意是洗牌,目的是为了把牌弄乱。


[*]Spark、Hadoop中的Shuffle可不是为了把数据弄乱,而是为了将随机排列的数据转换成具有一定规则的数据。
[*]Shuffle是MapReduce盘算框架中的一个特殊的阶段,介于Map和Reduce之间。
[*]Shuffle涉及到了本地磁盘(非HDFS)的读写和网络传输,大多数Spark作业的性能都斲丧在了Shuffle阶段,因此Shuffle性能的高低直接影响到了整个程序的运行效率
Shuffle汗青



[*]Spark 0.8 及从前 Hash Based Shuflle
[*]Spark 0.8.1 为 Hash Based Shuflle 引入 File Consolidation 机制
[*]Spark 0.9 引入 External Append Only Map
[*]Spark 1.1 引入 Sort Based Shuffle,但默认仍为 Hash Based Shuffle
[*]Spark 1.2 默认的 Shuffle方式改为 Sort Based Shuffle
[*]Spark 1.4 引入 Tungsten-Sort Based Shuffle
[*]Spark 1.6 Tungsten-Sort 并入 Sort Based Shuffle
[*]Spark 2.0 Hash Based Shuffle 退出汗青舞台
https://i-blog.csdnimg.cn/direct/8b1c6e51713749b697d8f95e247f2051.png
Hash Base Shuffle V1

简单介绍

Hash-based Shuffle 是 Apache Spark 中数据分布和重新排序的一种方式。Shuffle 是指在不同阶段的任务之间重新分配数据的过程。Hash-based Shuffle 在 Spark 1.x 版本中引入,被称为 Shuffle V1。
Shuffle V1 是 Spark 最初版本利用的 Shuffle 机制,基于 Hash 方法实现数据分布。它的主要特点是通过对数据的键进行哈希处理,将数据分配到相应的 reducer 节点上。Shuffle V1 的实现相对简单,但在大规模数据处理时存在一些局限性,如磁盘 I/O 过多、垃圾采取压力大等。


[*]每个 Shuffle Map Task 必要为每个下游的Task创建一个单独的文件
[*]Shuffle 过程中会生成海量的小文件,同时打开过多的文件、IO效率低
https://i-blog.csdnimg.cn/direct/5ef6eb17a76d4fa2a375e7b71506ede1.png
工作原理

Map 端处理:


[*]每个 map 任务在完成后,会根据键的哈希值将数据划分到不同的 bucket 中,这些 bucket 对应下游的 reduce 任务。
[*]Map 任务会将这些数据块(称为 partition)写入本地磁盘,并为每个 reduce 任务生成一个文件(包罗索引文件和数据文件)。
Reduce 端处理:


[*]当 reduce 任务启动时,它会从所有 map 任务生成的输出中拉取对应的数据块。
[*]Reduce 任务根据 map 任务输出的索引文件来读取相应的 partition 数据,并在本地进行聚合或其他处理。
局限性



[*]磁盘 I/O: 每个 map 任务为每个 reduce 任务生成单独的文件,这会导致大量的小文件和频仍的磁盘 I/O 操作。当集群规模和数据量增大时,I/O 开销变得非常大。
[*]垃圾采取: Shuffle V1 在处理过程中会产生大量的中央结果,导致 JVM 内存中会积累大量对象,增加了垃圾采取的压力,可能导致频仍的 GC 暂停(Stop-the-world)。
[*]容错性: 如果某个任务失败,Spark 必要重新盘算该任务的所有中央结果,Shuffle V1 没有很好的机制来优化这一过程。
适用场景

只管 Shuffle V1 存在一些问题,但在小规模数据处理或集群中,Shuffle V1 的性能体现照旧可以接受的,特别是对资源斲丧较少的作业。不过,随着数据规模的增大,Shuffle V1 的局限性会变得明显,因今后续的 Spark 版本引入了更优化的 Shuffle 机制(Shuffle V2 和 Tungsten-Sort Based Shuffle)。
Hash Base Shuffle V2

简单介绍

Hash-Based Shuffle V2 是 Apache Spark 中对最初版本的 Hash-Based Shuffle 进行的改进,旨在解决 Shuffle V1 中存在的一些性能和稳固性问题。Shuffle 是分布式盘算中数据重新分布的重要机制,而 Shuffle V2 的引入大大进步了 Spark 在处理大规模数据集时的性能和效率。
焦点思想

Hash Base Shuffle V2 焦点思想:
允许不同Task复用同一批磁盘文件,有效将多个Task的磁盘文件进行一定水平上的归并,从而大幅度减少磁盘文件的数量,进而提升ShuffleWrite的性能,一定水平上解决了HashV1中的问题,但不彻底。
Hash Shuffle 规避了排序,进步了性能,总的来说在 Hash Shuffle过程中生成了海量的小文件
https://i-blog.csdnimg.cn/direct/e3b9e0c89c02428188455d7f127f596b.png
Shuffle V2 的改进点

归并输出文件:


[*]在 Shuffle V2 中,map 任务不再为每个 reduce 任务生成一个单独的文件,而是将多个 partition 的输出归并到一个文件中。如许,每个 map 任务只生成一个数据文件和一个索引文件,大大减少了生成的小文件数量。
[*]索引文件记录了每个 reduce 任务的数据在数据文件中的偏移量和长度,reduce 任务可以根据这个索引文件来定位它所需的数据。
磁盘 I/O 优化:


[*]通过归并输出文件,Shuffle V2 大幅减少了磁盘 I/O 操作,减少了文件系统的压力,并且低沉了与小文件相关的元数据管理开销。
内存斲丧优化:


[*]由于减少了文件数量,Shuffle V2 对 JVM 的内存压力也有所低沉,垃圾采取(GC)的频率和时长得到了优化。
容错性改进:


[*]Shuffle V2 采取了更加高效的数据管理机制,使得在任务失败时,重新拉取数据的开销更小。此外,数据文件的归并也使得在节点故障时可以更容易地恢复数据。
工作原理
Shuffle V2 的工作原理

Map 端处理:


[*]每个 map 任务在处理数据时,基于键的哈希值将数据分配到不同的 partition。与 Shuffle V1 不同的是,Shuffle V2 将多个 partition 的数据写入同一个文件。
[*]同时生成一个索引文件,记录每个 partition 在数据文件中的位置和长度。
Reduce 端处理:
[*]Reduce 任务通过索引文件,定位必要处理的数据块,并从 Map 任务的输出文件中读取相应的数据。
[*]通过这种方式,减少了 I/O 开销,并优化了数据拉取的效率。
适用场景

Shuffle V2 适用于绝大多数的 Spark 作业,特别是在处理大规模数据集时结果尤为明显。它减少了磁盘 I/O 操作,优化了内存斲丧,并进步了系统的容错性。对于必要高性能和稳固性的场景,Shuffle V2 是更好的选择。
Sort Base Shuffle



[*]Sort Base Shuffle 大大减少了 Shuffle 过程中产生的文件数,进步 Shuffle 的效率。
[*]Spark Shuffle 与 Hadoop Shuffle 从目的、意义、功能上看是雷同的,实现上有区别。
https://i-blog.csdnimg.cn/direct/bb03119920c54cfba76d300acb495d02.png
RDD编程优化

RDD复用

避免创建重复的RDD,在开辟过程中要注意,对于同一份数据,只应该创建一个RDD,不要创建过多个RDD来表现同一份数据。
RDD缓存/长期化



[*]当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新盘算一次,这种环境是必须要避免的,对同一个RDD的重复盘算是对资源的极大浪费
[*]对多次利用的RDD进行长期化,通过长期化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的盘算都会从内存/磁盘中直接获取RDD数据
[*]RDD的长期化是可以进行序列化的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑利用序列化的方式减小数据的体积,将数据完整存储在内存中
巧用 filter



[*]尽可能过早地执行filter操作,过滤无用数据
[*]在filter过滤较多数据后,利用 coalesce 对数据进行重分区
利用高性能算子



[*]避免利用 groupByKey,根据场景选择利用高性能的聚合算子:reduceByKey、aggregateByKey
[*]coalesce、repartition,在可能得环境下优先选择没有Shuffle的操作
[*]foreachPartition 优化输出操作
[*]map、mapPartition,选择合理的选择算子,mapPartitions性能更好,但数据量过大时可能会OOM
[*]用 repartitionAndSortWithinPartitions 替代 repartition + Sort 操作
[*]合理利用 cache、persist、checkpoint,选择合理的数据存储级别
[*]filter 的利用
[*]减少对数据源的扫描(算法复杂)
设置合理的并行度



[*]Spark作业中的并行度指各个Stage的Task的数量
[*]设置合理的并行度,让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度尽可能大,到达充分利用集群资源。
广播大变量



[*]默认环境下,Task中的算子中如果利用了外部变量,每个Task都会获取一份变量的副本,这会造多余的网络传输和内存斲丧
[*]利用广播变量,只会在每个Executor保存一个副本,Executor的所有Task共用此广播变量,如许就节约了网络及内存资源

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD