ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark RDD sortBy算子执行时进行数据 “采样”是什么意思? [打印本页]

作者: 钜形不锈钢水箱    时间: 2024-12-7 19:50
标题: Spark RDD sortBy算子执行时进行数据 “采样”是什么意思?
一、sortBy 和 RangePartitioner

sortBy 在 Spark 中会在执行排序时采用 rangePartitioner 进行分区,这会影响数据的分区方式,而且这一步调是通过对数据进行 “采样” 来计算分区的范围。不过,紧张的是,sortBy 本身仍然是一个 transformation,它不会立刻触发计算,但在执行过程中会涉及到对数据的排序、分区和终极计算。
1. sortBy 和 RangePartitioner

sortBy 会利用 RangePartitioner 来决定数据如何进行分区。RangePartitioner 会在排序之前,首先对数据进行采样,从而得出每个分区的范围,然后根据这些范围进行数据的分区。这是因为数据排序是一个全局操作,而 RangePartitioner 提供了一个合理的分别策略,使得 Spark 在执行排序时可以或许并行化。

2. 是否会触发 runJob

sortBy 作为 transformation 不会立刻触发作业执行。它返回一个新的 RDD,并仅在后续执行 action 操作时才会触发实际的计算。因此,sortBy 不会直接导致 runJob 的执行。只有在你执行类似 collect(), count(), saveAsTextFile() 等行动算子时,整个作业才会执行。
但是,sortBy 内部会涉及到 采样范围分区,这些过程是为了确保排序可以或许在多个分区上并行高效地完成,全部这些操作都在 Spark 内部的 task 中完成。runJob 会在行动算子执行时启动,但在执行过程中,rangePartitioner 的计算、数据的重新分区等步调会被渐渐执行。
3. 源码分析

我们可以通过检察 Spark 源码来更清楚地理解这些步调。以下是关于 sortBy 和其内部处理的一些关键源码:
RDD.sortBy 源码

  1. def sortBy[K: ClassTag, U: Ordering](f: T => K, ascending: Boolean = true, numPartitions: Int)(implicit ord: Ordering[K]): RDD[T] = {
  2.   val partitioner = new RangePartitioner(numPartitions, this) // 使用 RangePartitioner
  3.   val map = this.mapPartitionsWithIndex { (index, iter) =>
  4.     // 计算分区内的排序
  5.     val partitioned = iter.toArray.sortBy(f)
  6.     partitioned.iterator
  7.   }
  8.   map
  9. }
复制代码
在这个方法中,RangePartitioner 被用来决定如何将数据分成多个分区。而在实际执行时,分区是通过 mapPartitionsWithIndex 来执行的。
RangePartitioner 源码

  1. class RangePartitioner[K, V](partitions: Int, rdd: RDD[(K, V)]) extends Partitioner {
  2.   def getPartition(key: Any): Int = {
  3.     // 根据 key 的范围来决定在哪个分区
  4.     val partitionIndex = rangePartition(key)
  5.     partitionIndex
  6.   }
  7.   def rangePartition(key: Any): Int = {
  8.     // 进行采样,并将数据按范围分到对应的分区
  9.   }
  10. }
复制代码
4. 触发计算的条件


5. 总结



二、 RangePartitioner 的 采样过程

在 Spark 中,RangePartitioner 的 采样过程 是其焦点部分之一,它确保可以或许为数据分配适当的分区,并保证每个分区的数据范围在排序时可以或许合理地分布。这里我们将深入探讨 RangePartitioner 是如何通过采样来计算分区范围的。
1. RangePartitioner 概述

RangePartitioner 是 Spark 中的一个分区器,常用于按范围将数据进行分区。它通常用于类似 sortBy 这类必要全局排序的操作,目的是为了在分布式环境中进行高效的并行排序。
RangePartitioner 在执行分区时,会利用 采样 来估算每个分区的范围(即每个分区的界限)。这种采样过程通过从数据中提取一个小样本,帮助计算出数据在差别分区上的分布,从而保证数据可以或许匀称地分配到各个分区中。
2. RangePartitioner 采样过程

采样是 RangePartitioner 计算每个分区的范围的关键。这个过程涉及到以下步调:
2.1 数据采样

RangePartitioner 会从数据中 随机采样 一部分元素,用来估算数据的分布和计算每个分区的界限。这个采样过程通常不会采用全部数据,而是通过肯定比例的数据来进行推测。这是为了镌汰计算开销,同时确保分区的均衡性。
采样操作通常是在 分布式环境中并行执行 的,Spark 会在多个分区上并行地获取样本数据。

2.2 计算分区界限

一旦采样完成,RangePartitioner 就会利用这些采样数据来计算每个分区的界限。这个过程基于采样数据的排序:

2.3 创建分区

RangePartitioner 利用这些界限点来创建新的分区。数据根据其值所在的范围,决定落入哪个分区。具体来说,RangePartitioner 会为每个分区计算出一个界限值,然后将全部数据按这些界限值进行分配。

3. 代码实现中的采样部分

在 Spark 的源码中,RangePartitioner 的采样过程是通过以下代码来实现的:
3.1 RangePartitioner 类中的采样

  1. class RangePartitioner[K, V](partitions: Int, rdd: RDD[(K, V)]) extends Partitioner {
  2.   // 进行数据的采样
  3.   val sample = rdd.sample(withReplacement = false, fraction = 0.1, seed = 12345)
  4.   val sortedSample = sample.map(_._1).sortBy(identity)
  5.   // 计算每个分区的分割点
  6.   val splits = sortedSample.zipWithIndex.map { case (key, index) =>
  7.     if (index % (sampleCount / partitions) == 0) key else null
  8.   }.filter(_ != null)
  9.   def getPartition(key: Any): Int = {
  10.     // 根据采样的分割点进行分区
  11.     var low = 0
  12.     var high = partitions - 1
  13.     while (low < high) {
  14.       val mid = (low + high) / 2
  15.       if (key < splits(mid)) high = mid - 1 else low = mid + 1
  16.     }
  17.     low
  18.   }
  19. }
复制代码
在上面的代码中,sample 操作会对 RDD 中的数据进行采样,并将其按值排序。然后,通过分割排序后的数据,计算出每个分区的界限点。这些界限点随后用于 getPartition 方法中来确定数据的分配。
3.2 采样与排序


4. 触发计算

在执行 sortBy 操作时,Spark 会根据 RangePartitioner 对数据进行采样、排序和分区计算。这些操作会在你执行 action 操作(如 collect()、saveAsTextFile())时触发,具体的分区计算会在计算过程中完成。直到行动算子触发,计算过程才会开始,RangePartitioner 会根据采样数据生成分区,并终极执行数据的排序。
5. 总结


通过这种采样与分区机制,RangePartitioner 可以或许高效地支持 Spark 的排序操作,使得数据在分布式环境中可以或许有效地并行处理。

三、举例先容RangePartitioner采样过程

理解 RangePartitioner 如何通过采样来得到数据分布、计算界限,并将数据分配到相应分区的过程,确实比较抽象。我会通过一个简单的例子来帮助你更直观地理解这个过程。
问题场景

假设你有一个数据集,包含了以下的 10 个整数:
  1. [10, 23, 1, 9, 15, 37, 2, 16, 40, 3]
复制代码
你想用 RangePartitioner 来将这些数据分为 3 个分区,而且根据它们的值进行排序。
1. 采样数据

首先,为了计算每个分区的界限,RangePartitioner 会对数据集进行采样。假设我们采样 30% 的数据(即随机选择 3 个数据点)。假设采样到的数据是:
  1. [10, 23, 3]
复制代码
2. 排序采样数据

然后,对采样的数据进行排序,确保它们按大小排列。对于这个例子,排序后的采样数据是:
  1. [3, 10, 23]
复制代码
3. 计算分区界限

通过对采样数据进行排序,RangePartitioner 可以计算出分区的界限。在我们的例子中,我们有 3 个分区,因此我们必要为数据计算 2 个界限(因为 n 个分区必要 n-1 个界限)。
根据排序后的采样数据 [3, 10, 23]
,RangePartitioner 可以选择分割点来确定界限:

现在我们有了两个界限:

4. 分配数据到分区

接下来,RangePartitioner 会根据这些界限将数据分配到相应的分区中。具体的分区规则是:

所以终极的分区效果是:

5. 总结过程

通过这个例子,我们可以看到 RangePartitioner 的整个过程:
6. 实际执行的环境


7. 关键源码中的采样部分

在实际 Spark 的源码中,采样是通过 sample 方法实现的:
  1. val sample = rdd.sample(withReplacement = false, fraction = 0.1, seed = 12345)
  2. val sortedSample = sample.map(_._1).sortBy(identity)
复制代码
然后通过这些采样的排序数据,计算每个分区的界限。比方,当分区数量是 3 时,RangePartitioner 会选取采样数据的前几个元素作为界限,并用这些界限来确定每个分区的范围。

8. 进一步优化

在实际利用中,Spark 的 RangePartitioner 会通过自适应调解采样的比例和算法来优化性能,确保在处理大型数据集时依然高效。在某些环境下,Spark 会利用更智能的策略来决定采样的方式,以便在并行处理中制止过多的计算开销。
总结

通过采样、排序和计算界限,RangePartitioner 确保了数据可以匀称地分配到差别的分区中,从而为排序等操作提供并行化的支持。这一过程使得 Spark 在处理大规模数据时可以或许有效地进行全局排序。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4