Spark RDD sortBy算子执行时进行数据 “采样”是什么意思? ...

打印 上一主题 下一主题

主题 823|帖子 823|积分 2469

一、sortBy 和 RangePartitioner

sortBy 在 Spark 中会在执行排序时采用 rangePartitioner 进行分区,这会影响数据的分区方式,而且这一步调是通过对数据进行 “采样” 来计算分区的范围。不过,紧张的是,sortBy 本身仍然是一个 transformation,它不会立刻触发计算,但在执行过程中会涉及到对数据的排序、分区和终极计算。
1. sortBy 和 RangePartitioner

sortBy 会利用 RangePartitioner 来决定数据如何进行分区。RangePartitioner 会在排序之前,首先对数据进行采样,从而得出每个分区的范围,然后根据这些范围进行数据的分区。这是因为数据排序是一个全局操作,而 RangePartitioner 提供了一个合理的分别策略,使得 Spark 在执行排序时可以或许并行化。


  • 采样过程
    当调用 sortBy 时,Spark 会对数据进行 采样,通常利用的是 SampledRDD,这种采样会用来估计数据的分布范围,并为后续的分区计算提供依据。
  • RangePartitioner 的利用
    RangePartitioner 会根据数据的值分别成差别的范围。通常在分布式环境中,我们必要将数据按某种方式分别为多个分区,这个过程会利用一个范围来决定数据分布。
2. 是否会触发 runJob

sortBy 作为 transformation 不会立刻触发作业执行。它返回一个新的 RDD,并仅在后续执行 action 操作时才会触发实际的计算。因此,sortBy 不会直接导致 runJob 的执行。只有在你执行类似 collect(), count(), saveAsTextFile() 等行动算子时,整个作业才会执行。
但是,sortBy 内部会涉及到 采样范围分区,这些过程是为了确保排序可以或许在多个分区上并行高效地完成,全部这些操作都在 Spark 内部的 task 中完成。runJob 会在行动算子执行时启动,但在执行过程中,rangePartitioner 的计算、数据的重新分区等步调会被渐渐执行。
3. 源码分析

我们可以通过检察 Spark 源码来更清楚地理解这些步调。以下是关于 sortBy 和其内部处理的一些关键源码:
RDD.sortBy 源码

  1. def sortBy[K: ClassTag, U: Ordering](f: T => K, ascending: Boolean = true, numPartitions: Int)(implicit ord: Ordering[K]): RDD[T] = {
  2.   val partitioner = new RangePartitioner(numPartitions, this) // 使用 RangePartitioner
  3.   val map = this.mapPartitionsWithIndex { (index, iter) =>
  4.     // 计算分区内的排序
  5.     val partitioned = iter.toArray.sortBy(f)
  6.     partitioned.iterator
  7.   }
  8.   map
  9. }
复制代码
在这个方法中,RangePartitioner 被用来决定如何将数据分成多个分区。而在实际执行时,分区是通过 mapPartitionsWithIndex 来执行的。
RangePartitioner 源码

  1. class RangePartitioner[K, V](partitions: Int, rdd: RDD[(K, V)]) extends Partitioner {
  2.   def getPartition(key: Any): Int = {
  3.     // 根据 key 的范围来决定在哪个分区
  4.     val partitionIndex = rangePartition(key)
  5.     partitionIndex
  6.   }
  7.   def rangePartition(key: Any): Int = {
  8.     // 进行采样,并将数据按范围分到对应的分区
  9.   }
  10. }
复制代码
4. 触发计算的条件



  • sortBy 是一个 transformation 操作,它会生成一个新的 RDD,并不会立刻执行排序。
  • RangePartitioner 会在背景进行数据的分区计算和范围分割,但这统统都不会触发作业执行,直到 action 操作 被调用。
5. 总结



  • sortBy 会利用 RangePartitioner 进行数据的分区和范围分别,这过程中会对数据进行采样以确定每个分区的范围。
  • 这个过程本身不会触发作业执行,只有当你执行一个 action 操作时(如 collect() 或 saveAsTextFile()),Spark 才会触发计算,并启动实际的作业执行,进行排序和分区。

二、 RangePartitioner 的 采样过程

在 Spark 中,RangePartitioner 的 采样过程 是其焦点部分之一,它确保可以或许为数据分配适当的分区,并保证每个分区的数据范围在排序时可以或许合理地分布。这里我们将深入探讨 RangePartitioner 是如何通过采样来计算分区范围的。
1. RangePartitioner 概述

RangePartitioner 是 Spark 中的一个分区器,常用于按范围将数据进行分区。它通常用于类似 sortBy 这类必要全局排序的操作,目的是为了在分布式环境中进行高效的并行排序。
RangePartitioner 在执行分区时,会利用 采样 来估算每个分区的范围(即每个分区的界限)。这种采样过程通过从数据中提取一个小样本,帮助计算出数据在差别分区上的分布,从而保证数据可以或许匀称地分配到各个分区中。
2. RangePartitioner 采样过程

采样是 RangePartitioner 计算每个分区的范围的关键。这个过程涉及到以下步调:
2.1 数据采样

RangePartitioner 会从数据中 随机采样 一部分元素,用来估算数据的分布和计算每个分区的界限。这个采样过程通常不会采用全部数据,而是通过肯定比例的数据来进行推测。这是为了镌汰计算开销,同时确保分区的均衡性。
采样操作通常是在 分布式环境中并行执行 的,Spark 会在多个分区上并行地获取样本数据。


  • 采样的比例:采样比例通常是一个相对较小的数值,目的是镌汰计算量。Spark 内部会在每个分区中执行采样,以确保终极分区的界限可以或许反映整个数据集的分布。
2.2 计算分区界限

一旦采样完成,RangePartitioner 就会利用这些采样数据来计算每个分区的界限。这个过程基于采样数据的排序:


  • 排序样本数据:首先,对采样数据进行排序,确保数据可以按顺序进行分区。
  • 计算分割点:然后,RangePartitioner 会根据排序后的数据分别出多个界限点。这些界限点代表了每个分区的数据范围。比方,如果数据有 1000 个元素,而且要求将数据分别为 10 个分区,那么就会在排序后的数据中选取 9 个分割点。
2.3 创建分区

RangePartitioner 利用这些界限点来创建新的分区。数据根据其值所在的范围,决定落入哪个分区。具体来说,RangePartitioner 会为每个分区计算出一个界限值,然后将全部数据按这些界限值进行分配。


  • 分区计算:对于每个数据元素,RangePartitioner 会根据元素的值和这些界限值,决定该元素属于哪个分区。
3. 代码实现中的采样部分

在 Spark 的源码中,RangePartitioner 的采样过程是通过以下代码来实现的:
3.1 RangePartitioner 类中的采样

  1. class RangePartitioner[K, V](partitions: Int, rdd: RDD[(K, V)]) extends Partitioner {
  2.   // 进行数据的采样
  3.   val sample = rdd.sample(withReplacement = false, fraction = 0.1, seed = 12345)
  4.   val sortedSample = sample.map(_._1).sortBy(identity)
  5.   // 计算每个分区的分割点
  6.   val splits = sortedSample.zipWithIndex.map { case (key, index) =>
  7.     if (index % (sampleCount / partitions) == 0) key else null
  8.   }.filter(_ != null)
  9.   def getPartition(key: Any): Int = {
  10.     // 根据采样的分割点进行分区
  11.     var low = 0
  12.     var high = partitions - 1
  13.     while (low < high) {
  14.       val mid = (low + high) / 2
  15.       if (key < splits(mid)) high = mid - 1 else low = mid + 1
  16.     }
  17.     low
  18.   }
  19. }
复制代码
在上面的代码中,sample 操作会对 RDD 中的数据进行采样,并将其按值排序。然后,通过分割排序后的数据,计算出每个分区的界限点。这些界限点随后用于 getPartition 方法中来确定数据的分配。
3.2 采样与排序



  • rdd.sample(withReplacement = false, fraction = 0.1):从原始 RDD 中采样 10% 的数据(fraction = 0.1),而且不进行重复采样。
  • sortBy(identity):对采样的数据进行排序,确保采样数据的顺序正确,便于后续计算界限。
4. 触发计算

在执行 sortBy 操作时,Spark 会根据 RangePartitioner 对数据进行采样、排序和分区计算。这些操作会在你执行 action 操作(如 collect()、saveAsTextFile())时触发,具体的分区计算会在计算过程中完成。直到行动算子触发,计算过程才会开始,RangePartitioner 会根据采样数据生成分区,并终极执行数据的排序。
5. 总结



  • 采样:RangePartitioner 会从数据中随机采样一部分元素(通常是 10% 或其他比例),用来估算数据的分布。
  • 排序与计算分区界限:采样数据被排序,并根据排序后的数据计算出每个分区的界限。这样可以确保数据匀称分配到差别的分区。
  • 数据分区:根据采样和计算出的界限,RangePartitioner 会将数据分配到相应的分区中。
通过这种采样与分区机制,RangePartitioner 可以或许高效地支持 Spark 的排序操作,使得数据在分布式环境中可以或许有效地并行处理。

三、举例先容RangePartitioner采样过程

理解 RangePartitioner 如何通过采样来得到数据分布、计算界限,并将数据分配到相应分区的过程,确实比较抽象。我会通过一个简单的例子来帮助你更直观地理解这个过程。
问题场景

假设你有一个数据集,包含了以下的 10 个整数:
  1. [10, 23, 1, 9, 15, 37, 2, 16, 40, 3]
复制代码
你想用 RangePartitioner 来将这些数据分为 3 个分区,而且根据它们的值进行排序。
1. 采样数据

首先,为了计算每个分区的界限,RangePartitioner 会对数据集进行采样。假设我们采样 30% 的数据(即随机选择 3 个数据点)。假设采样到的数据是:
  1. [10, 23, 3]
复制代码
2. 排序采样数据

然后,对采样的数据进行排序,确保它们按大小排列。对于这个例子,排序后的采样数据是:
  1. [3, 10, 23]
复制代码
3. 计算分区界限

通过对采样数据进行排序,RangePartitioner 可以计算出分区的界限。在我们的例子中,我们有 3 个分区,因此我们必要为数据计算 2 个界限(因为 n 个分区必要 n-1 个界限)。
根据排序后的采样数据 [3, 10, 23]
,RangePartitioner 可以选择分割点来确定界限:


  • 第一个界限:选择采样数据的第一个元素(3)。
  • 第二个界限:选择采样数据的最后一个元素(23)。
现在我们有了两个界限:


  • 分区 1:全部小于 10 的数据
  • 分区 2:全部大于即是 10 小于 23 的数据
  • 分区 3:全部大于即是 23 的数据
4. 分配数据到分区

接下来,RangePartitioner 会根据这些界限将数据分配到相应的分区中。具体的分区规则是:


  • 分区 1:全部小于 10 的元素 → [1, 2, 3, 9]
  • 分区 2:全部大于即是 10 且小于 23 的元素 → [10, 15, 16]
  • 分区 3:全部大于即是 23 的元素 → [23, 37, 40]
所以终极的分区效果是:


  • 分区 1:[1, 2, 3, 9]
  • 分区 2:[10, 15, 16]
  • 分区 3:[23, 37, 40]
5. 总结过程

通过这个例子,我们可以看到 RangePartitioner 的整个过程:

  • 采样数据:从整个数据集中随机抽取一部分数据(这里是 30%)。
  • 排序采样数据:对采样数据进行排序,确保我们能根据数据的范围计算界限。
  • 计算分区界限:根据排序后的采样数据,选择界限来分别数据(比方第一个和最后一个元素)。
  • 分配数据到分区:根据界限将全部数据分配到相应的分区中。
6. 实际执行的环境



  • 采样比例:在实际的 Spark 中,采样比例并不肯定是 30%,通常是根据数据的大小和分区数量进行调解的。采样可以确保 RangePartitioner 在计算界限时不会消耗过多资源。
  • 多个分区:如果数据集更大,分区数量更多,RangePartitioner 会选择更多的采样点来分别分区。界限点会根据排序后的采样数据来动态选择。
7. 关键源码中的采样部分

在实际 Spark 的源码中,采样是通过 sample 方法实现的:
  1. val sample = rdd.sample(withReplacement = false, fraction = 0.1, seed = 12345)
  2. val sortedSample = sample.map(_._1).sortBy(identity)
复制代码
然后通过这些采样的排序数据,计算每个分区的界限。比方,当分区数量是 3 时,RangePartitioner 会选取采样数据的前几个元素作为界限,并用这些界限来确定每个分区的范围。

8. 进一步优化

在实际利用中,Spark 的 RangePartitioner 会通过自适应调解采样的比例和算法来优化性能,确保在处理大型数据集时依然高效。在某些环境下,Spark 会利用更智能的策略来决定采样的方式,以便在并行处理中制止过多的计算开销。
总结

通过采样、排序和计算界限,RangePartitioner 确保了数据可以匀称地分配到差别的分区中,从而为排序等操作提供并行化的支持。这一过程使得 Spark 在处理大规模数据时可以或许有效地进行全局排序。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

钜形不锈钢水箱

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表