ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Spark RDD sortBy算子执行时进行数据 “采样”是什么意思?
[打印本页]
作者:
钜形不锈钢水箱
时间:
2024-12-7 19:50
标题:
Spark RDD sortBy算子执行时进行数据 “采样”是什么意思?
一、sortBy 和
RangePartitioner
sortBy
在 Spark 中会在执行排序时采用
rangePartitioner
进行分区,这会影响数据的分区方式,而且这一步调是通过对数据进行
“采样”
来计算分区的范围。不过,紧张的是,
sortBy
本身仍然是一个
transformation
,它不会立刻触发计算,但在执行过程中会涉及到对数据的排序、分区和终极计算。
1.
sortBy 和 RangePartitioner
sortBy 会利用 RangePartitioner 来决定数据如何进行分区。RangePartitioner 会在排序之前,首先对数据进行采样,从而得出每个分区的范围,然后根据这些范围进行数据的分区。这是因为数据排序是一个全局操作,而 RangePartitioner 提供了一个合理的分别策略,使得 Spark 在执行排序时可以或许并行化。
采样过程
:
当调用 sortBy 时,Spark 会对数据进行
采样
,通常利用的是 SampledRDD,这种采样会用来估计数据的分布范围,并为后续的分区计算提供依据。
RangePartitioner 的利用
:
RangePartitioner 会根据数据的值分别成差别的范围。通常在分布式环境中,我们必要将数据按某种方式分别为多个分区,这个过程会利用一个范围来决定数据分布。
2.
是否会触发 runJob
sortBy 作为
transformation
不会立刻触发作业执行。它返回一个新的 RDD,并仅在后续执行
action
操作时才会触发实际的计算。因此,sortBy 不会直接导致 runJob 的执行。只有在你执行类似 collect(), count(), saveAsTextFile() 等行动算子时,整个作业才会执行。
但是,sortBy 内部会涉及到
采样
和
范围分区
,这些过程是为了确保排序可以或许在多个分区上并行高效地完成,全部这些操作都在 Spark 内部的
task
中完成。runJob 会在行动算子执行时启动,但在执行过程中,
rangePartitioner
的计算、数据的重新分区等步调会被渐渐执行。
3.
源码分析
我们可以通过检察 Spark 源码来更清楚地理解这些步调。以下是关于 sortBy 和其内部处理的一些关键源码:
RDD.sortBy 源码
def sortBy[K: ClassTag, U: Ordering](f: T => K, ascending: Boolean = true, numPartitions: Int)(implicit ord: Ordering[K]): RDD[T] = {
val partitioner = new RangePartitioner(numPartitions, this) // 使用 RangePartitioner
val map = this.mapPartitionsWithIndex { (index, iter) =>
// 计算分区内的排序
val partitioned = iter.toArray.sortBy(f)
partitioned.iterator
}
map
}
复制代码
在这个方法中,RangePartitioner 被用来决定如何将数据分成多个分区。而在实际执行时,分区是通过 mapPartitionsWithIndex 来执行的。
RangePartitioner 源码
class RangePartitioner[K, V](partitions: Int, rdd: RDD[(K, V)]) extends Partitioner {
def getPartition(key: Any): Int = {
// 根据 key 的范围来决定在哪个分区
val partitionIndex = rangePartition(key)
partitionIndex
}
def rangePartition(key: Any): Int = {
// 进行采样,并将数据按范围分到对应的分区
}
}
复制代码
4.
触发计算的条件
sortBy
是一个 transformation 操作,它会生成一个新的 RDD,并不会立刻执行排序。
RangePartitioner
会在背景进行数据的分区计算和范围分割,但这统统都不会触发作业执行,直到
action 操作
被调用。
5.
总结
sortBy 会利用 RangePartitioner 进行数据的分区和范围分别,这过程中会对数据进行采样以确定每个分区的范围。
这个过程本身不会触发作业执行,只有当你执行一个
action
操作时(如 collect() 或 saveAsTextFile()),Spark 才会触发计算,并启动实际的作业执行,进行排序和分区。
二、 RangePartitioner 的 采样过程
在 Spark 中,RangePartitioner 的
采样过程
是其焦点部分之一,它确保可以或许为数据分配适当的分区,并保证每个分区的数据范围在排序时可以或许合理地分布。这里我们将深入探讨
RangePartitioner
是如何通过采样来计算分区范围的。
1.
RangePartitioner 概述
RangePartitioner 是 Spark 中的一个分区器,常用于按范围将数据进行分区。它通常用于类似 sortBy 这类必要全局排序的操作,目的是为了在分布式环境中进行高效的并行排序。
RangePartitioner 在执行分区时,会利用
采样
来估算每个分区的范围(即每个分区的界限)。这种采样过程通过从数据中提取一个小样本,帮助计算出数据在差别分区上的分布,从而保证数据可以或许匀称地分配到各个分区中。
2.
RangePartitioner 采样过程
采样是 RangePartitioner 计算每个分区的范围的关键。这个过程涉及到以下步调:
2.1
数据采样
RangePartitioner 会从数据中
随机采样
一部分元素,用来估算数据的分布和计算每个分区的界限。这个采样过程通常不会采用全部数据,而是通过肯定比例的数据来进行推测。这是为了镌汰计算开销,同时确保分区的均衡性。
采样操作通常是在
分布式环境中并行执行
的,Spark 会在多个分区上并行地获取样本数据。
采样的比例
:采样比例通常是一个相对较小的数值,目的是镌汰计算量。Spark 内部会在每个分区中执行采样,以确保终极分区的界限可以或许反映整个数据集的分布。
2.2
计算分区界限
一旦采样完成,RangePartitioner 就会利用这些采样数据来计算每个分区的界限。这个过程基于采样数据的排序:
排序样本数据
:首先,对采样数据进行排序,确保数据可以按顺序进行分区。
计算分割点
:然后,RangePartitioner 会根据排序后的数据分别出多个界限点。这些界限点代表了每个分区的数据范围。比方,如果数据有 1000 个元素,而且要求将数据分别为 10 个分区,那么就会在排序后的数据中选取 9 个分割点。
2.3
创建分区
RangePartitioner 利用这些界限点来创建新的分区。数据根据其值所在的范围,决定落入哪个分区。具体来说,RangePartitioner 会为每个分区计算出一个界限值,然后将全部数据按这些界限值进行分配。
分区计算
:对于每个数据元素,RangePartitioner 会根据元素的值和这些界限值,决定该元素属于哪个分区。
3.
代码实现中的采样部分
在 Spark 的源码中,RangePartitioner 的采样过程是通过以下代码来实现的:
3.1
RangePartitioner 类中的采样
class RangePartitioner[K, V](partitions: Int, rdd: RDD[(K, V)]) extends Partitioner {
// 进行数据的采样
val sample = rdd.sample(withReplacement = false, fraction = 0.1, seed = 12345)
val sortedSample = sample.map(_._1).sortBy(identity)
// 计算每个分区的分割点
val splits = sortedSample.zipWithIndex.map { case (key, index) =>
if (index % (sampleCount / partitions) == 0) key else null
}.filter(_ != null)
def getPartition(key: Any): Int = {
// 根据采样的分割点进行分区
var low = 0
var high = partitions - 1
while (low < high) {
val mid = (low + high) / 2
if (key < splits(mid)) high = mid - 1 else low = mid + 1
}
low
}
}
复制代码
在上面的代码中,sample 操作会对 RDD 中的数据进行采样,并将其按值排序。然后,通过分割排序后的数据,计算出每个分区的界限点。这些界限点随后用于 getPartition 方法中来确定数据的分配。
3.2
采样与排序
rdd.sample(withReplacement = false, fraction = 0.1):从原始 RDD 中采样 10% 的数据(fraction = 0.1),而且不进行重复采样。
sortBy(identity):对采样的数据进行排序,确保采样数据的顺序正确,便于后续计算界限。
4.
触发计算
在执行 sortBy 操作时,Spark 会根据 RangePartitioner 对数据进行采样、排序和分区计算。这些操作会在你执行
action 操作
(如 collect()、saveAsTextFile())时触发,具体的分区计算会在计算过程中完成。直到行动算子触发,计算过程才会开始,RangePartitioner 会根据采样数据生成分区,并终极执行数据的排序。
5.
总结
采样
:RangePartitioner 会从数据中随机采样一部分元素(通常是 10% 或其他比例),用来估算数据的分布。
排序与计算分区界限
:采样数据被排序,并根据排序后的数据计算出每个分区的界限。这样可以确保数据匀称分配到差别的分区。
数据分区
:根据采样和计算出的界限,RangePartitioner 会将数据分配到相应的分区中。
通过这种采样与分区机制,RangePartitioner 可以或许高效地支持 Spark 的排序操作,使得数据在分布式环境中可以或许有效地并行处理。
三、举例先容RangePartitioner采样过程
理解 RangePartitioner 如何通过采样来得到数据分布、计算界限,并将数据分配到相应分区的过程,确实比较抽象。我会通过一个简单的例子来帮助你更直观地理解这个过程。
问题场景
假设你有一个数据集,包含了以下的 10 个整数:
[10, 23, 1, 9, 15, 37, 2, 16, 40, 3]
复制代码
你想用 RangePartitioner 来将这些数据分为
3 个分区
,而且根据它们的值进行排序。
1.
采样数据
首先,为了计算每个分区的界限,RangePartitioner 会对数据集进行采样。假设我们采样 30% 的数据(即随机选择 3 个数据点)。假设采样到的数据是:
[10, 23, 3]
复制代码
2.
排序采样数据
然后,对采样的数据进行排序,确保它们按大小排列。对于这个例子,排序后的采样数据是:
[3, 10, 23]
复制代码
3.
计算分区界限
通过对采样数据进行排序,RangePartitioner 可以计算出分区的界限。在我们的例子中,我们有 3 个分区,因此我们必要为数据计算 2 个界限(因为 n 个分区必要 n-1 个界限)。
根据排序后的采样数据 [3, 10, 23]
,RangePartitioner 可以选择分割点来确定界限:
第一个界限
:选择采样数据的第一个元素(3)。
第二个界限
:选择采样数据的最后一个元素(23)。
现在我们有了两个界限:
分区 1
:全部小于 10 的数据
分区 2
:全部大于即是 10 小于 23 的数据
分区 3
:全部大于即是 23 的数据
4.
分配数据到分区
接下来,RangePartitioner 会根据这些界限将数据分配到相应的分区中。具体的分区规则是:
分区 1
:全部小于 10 的元素 → [1, 2, 3, 9]
分区 2
:全部大于即是 10 且小于 23 的元素 → [10, 15, 16]
分区 3
:全部大于即是 23 的元素 → [23, 37, 40]
所以终极的分区效果是:
分区 1
:[1, 2, 3, 9]
分区 2
:[10, 15, 16]
分区 3
:[23, 37, 40]
5.
总结过程
通过这个例子,我们可以看到 RangePartitioner 的整个过程:
采样数据
:从整个数据集中随机抽取一部分数据(这里是 30%)。
排序采样数据
:对采样数据进行排序,确保我们能根据数据的范围计算界限。
计算分区界限
:根据排序后的采样数据,选择界限来分别数据(比方第一个和最后一个元素)。
分配数据到分区
:根据界限将全部数据分配到相应的分区中。
6.
实际执行的环境
采样比例
:在实际的 Spark 中,采样比例并不肯定是 30%,通常是根据数据的大小和分区数量进行调解的。采样可以确保 RangePartitioner 在计算界限时不会消耗过多资源。
多个分区
:如果数据集更大,分区数量更多,RangePartitioner 会选择更多的采样点来分别分区。界限点会根据排序后的采样数据来动态选择。
7.
关键源码中的采样部分
在实际 Spark 的源码中,采样是通过 sample 方法实现的:
val sample = rdd.sample(withReplacement = false, fraction = 0.1, seed = 12345)
val sortedSample = sample.map(_._1).sortBy(identity)
复制代码
然后通过这些采样的排序数据,计算每个分区的界限。比方,当分区数量是 3 时,RangePartitioner 会选取采样数据的前几个元素作为界限,并用这些界限来确定每个分区的范围。
8.
进一步优化
在实际利用中,Spark 的 RangePartitioner 会通过自适应调解采样的比例和算法来优化性能,确保在处理大型数据集时依然高效。在某些环境下,Spark 会利用更智能的策略来决定采样的方式,以便在并行处理中制止过多的计算开销。
总结
通过采样、排序和计算界限,RangePartitioner 确保了数据可以匀称地分配到差别的分区中,从而为排序等操作提供并行化的支持。这一过程使得 Spark 在处理大规模数据时可以或许有效地进行全局排序。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4