ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark快速上手(4)Spark核心编程-Spark分区器(Partitioner)@(RDD-K_V) [打印本页]

作者: 反转基因福娃    时间: 2022-8-22 13:13
标题: Spark快速上手(4)Spark核心编程-Spark分区器(Partitioner)@(RDD-K_V)
@Spark分区器(Partitioner)

HashPartitioner(默认的分区器)

HashPartitioner分区原理是对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则余数+分区的个数,最后返回的值就是这个key所属的分区ID,当key为null值是返回0。
源码在org.apache.spark包下:
origin code:
  1. class HashPartitioner(partitions: Int) extends Partitioner {
  2.   require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  3.   def numPartitions: Int = partitions
  4.   // 根据键的值来判断在哪一个分区
  5.   def getPartition(key: Any): Int = key match {
  6.     case null => 0   // 键为null始终在0分区
  7.     case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) // 键不为0,根据键的hashCode值和分区数进行计算
  8.   }
  9.   
  10.   override def equals(other: Any): Boolean = other match {
  11.     case h: HashPartitioner =>
  12.       h.numPartitions == numPartitions
  13.     case _ =>
  14.       false
  15.   }
  16. …………
  17. }
  18. // 底层实质:取模运算
  19. def nonNegativeMod(x: Int, mod: Int): Int = {
  20.    val rawMod = x % mod
  21.    rawMod + (if (rawMod < 0) mod else 0)
  22. }
复制代码
RangePartitioner

HashPartitioner分区的实现可能会导致数据倾斜,极端情况下会导致某些分区拥有RDD的所有数据。而RangePartitioner分区器则尽量保证各个分区数据均匀,而且分区和分区之间是有序的,也就是说令一个分区中的元素均比另一个分区中的元素小或者大;但是分区内的元素是不能保证顺序的。简单地说就是将一定范围内的数据映射到一个分区内。
  sortByKey底层使用的数据分区器就是RangePartitioner分区器,该分区器的实现方式主要通过两个步骤实现:
①先从整个RDD中抽取样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[key]类型的数组变量rangeBounds;
②判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标。该分区器要求RDD中的key类型必须是可排序的。
origin code:
  1. class RangePartitioner[K : Ordering : ClassTag, V](
  2.     partitions: Int,
  3.     rdd: RDD[_ <: Product2[K, V]],
  4.     private var ascending: Boolean = true,
  5.     val samplePointsPerPartitionHint: Int = 20)
  6.   extends Partitioner {
  7.   // A constructor declared in order to maintain backward compatibility for Java, when we add the
  8.   // 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160.
  9.   // This is added to make sure from a bytecode point of view, there is still a 3-arg ctor.
  10.   def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
  11.     this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
  12.   }
  13.   // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  14.   require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
  15.   require(samplePointsPerPartitionHint > 0,
  16.     s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")
  17.   // 获取RDD中key类型数据的排序器
  18.   private var ordering = implicitly[Ordering[K]]
  19.   // An array of upper bounds for the first (partitions - 1) partitions
  20.   private var rangeBounds: Array[K] = {
  21.     if (partitions <= 1) {
  22.       // 如果给定的分区数是一个的情况下,直接返回一个空的集合,表示数据不进行分区
  23.       Array.empty
  24.     } else {
  25.       // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
  26.       // Cast to double to avoid overflowing ints or longs
  27.       // 给定总的数据抽样大小,最多1M的数据量(10^6),最少20倍的RDD分区数量,也就是每个RDD分区至少抽取20条数据
  28.       val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
  29.       // Assume the input partitions are roughly balanced and over-sample a little bit.
  30.       // 计算每个分区抽样的数据量大小,假设输入数据每个分区分布的比较均匀
  31.       // 对于超大数据集(分区数量超过5万的)乘以3会让数据稍微增大一点,对于分区数低于5万的数据集,每个分区抽取数据量为60条也不算多
  32.       val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
  33.       // 从RDD中抽取数据,返回值:(总RDD数据量,Array[分区id, 当前分区的数据量, 当前分区抽取的数据])
  34.       val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
  35.       if (numItems == 0L) {
  36.         // 如果总的数据量为0(RDD为空),那么直接返回一个空的数组
  37.         Array.empty
  38.       } else {
  39.         // If a partition contains much more than the average number of items, we re-sample from it
  40.         // to ensure that enough items are collected from that partition.
  41.         // 计算总样本数量和总记录数的占比,占比最大为1.0
  42.         val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
  43.         // 保存样本数据的集合buffer
  44.         val candidates = ArrayBuffer.empty[(K, Float)]
  45.         // 保存数据分布不均衡的分区id(数据量超过fraction比率的分区)
  46.         val imbalancedPartitions = mutable.Set.empty[Int]
  47.         // 计算抽取出来的样本数据
  48.         sketched.foreach { case (idx, n, sample) =>
  49.           if (fraction * n > sampleSizePerPartition) {
  50.             // 如果fraction乘以当前分区中的数据量大于之前计算的每个分区的抽样数据大小,那么表示当前分区抽取的数据太少了,该分区数据分布不均衡,需要重新抽取
  51.             imbalancedPartitions += idx
  52.           } else {
  53.             // 当前分区不属于数据分布不均衡的分区,计算占比权重,并添加到candidates集合中
  54.             // The weight is 1 over the sampling probability.
  55.             val weight = (n.toDouble / sample.length).toFloat
  56.             for (key <- sample) {
  57.               candidates += ((key, weight))
  58.             }
  59.           }
  60.         }
  61.         // 对数据分布不均衡的RDD分区,重新进行数据抽样
  62.         if (imbalancedPartitions.nonEmpty) {
  63.           // Re-sample imbalanced partitions with the desired sampling probability.
  64.           // 获取数据分布不均衡的RDD分区,并构成RDD
  65.           val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
  66.           // 随机种子
  67.           val seed = byteswap32(-rdd.id - 1)
  68.           // 利用RDD的sample抽样函数API进行数据抽样
  69.           val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
  70.           val weight = (1.0 / fraction).toFloat
  71.           candidates ++= reSampled.map(x => (x, weight))
  72.         }
  73.         // 将最终的抽样数据计算出rangeBounds
  74.         RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
  75.       }
  76.     }
  77.   }
  78.   // 下一个RDD的分区数量是rangeBounds数组中元素数量+1个
  79.   def numPartitions: Int = rangeBounds.length + 1
  80.   // 二分查找器,内部使用Java中的Arrays提供的二分查找方法
  81.   private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
  82.   // 根据RDD的key值返回对应的分区id,从0开始
  83.   def getPartition(key: Any): Int = {
  84.     // 强制转换key类型为RDD中原本的数据类型
  85.     val k = key.asInstanceOf[K]
  86.     var partition = 0
  87.     if (rangeBounds.length <= 128) {
  88.       // If we have less than 128 partitions naive search
  89.       // 如果分区数据小于等于128个,那么直接本地循环寻找当前k所属的分区下标
  90.       while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
  91.         partition += 1
  92.       }
  93.     } else {
  94.       // Determine which binary search method to use only once.
  95.       // 如果分区数量大于128个,那么使用二分查找方法寻找对应k所属的下标
  96.       // 但是如果k在rangeBounds中没有出现,实质上返回的是一个负数(范围)或者是一个超过rangeBounds大小的数(最后一个分区,比所有的数据都大)
  97.       partition = binarySearch(rangeBounds, k)
  98.       // binarySearch either returns the match location or -[insertion point]-1
  99.       if (partition < 0) {
  100.         partition = -partition-1
  101.       }
  102.       if (partition > rangeBounds.length) {
  103.         partition = rangeBounds.length
  104.       }
  105.     }
  106.     // 根据数据排序是升序还是降序进行数据的排列,默认为升序
  107.     if (ascending) {
  108.       partition
  109.     } else {
  110.       rangeBounds.length - partition
  111.     }
  112.   }
  113.   override def equals(other: Any): Boolean = other match {
  114.     case r: RangePartitioner[_, _] =>
  115.       r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
  116.     case _ =>
  117.       false
  118.   }
  119.   override def hashCode(): Int = {
  120.     val prime = 31
  121.     var result = 1
  122.     var i = 0
  123.     while (i < rangeBounds.length) {
  124.       result = prime * result + rangeBounds(i).hashCode
  125.       i += 1
  126.     }
  127.     result = prime * result + ascending.hashCode
  128.     result
  129.   }
  130.   @throws(classOf[IOException])
  131.   private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
  132.     val sfactory = SparkEnv.get.serializer
  133.     sfactory match {
  134.       case js: JavaSerializer => out.defaultWriteObject()
  135.       case _ =>
  136.         out.writeBoolean(ascending)
  137.         out.writeObject(ordering)
  138.         out.writeObject(binarySearch)
  139.         val ser = sfactory.newInstance()
  140.         Utils.serializeViaNestedStream(out, ser) { stream =>
  141.           stream.writeObject(scala.reflect.classTag[Array[K]])
  142.           stream.writeObject(rangeBounds)
  143.         }
  144.     }
  145.   }
  146.   @throws(classOf[IOException])
  147.   private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
  148.     val sfactory = SparkEnv.get.serializer
  149.     sfactory match {
  150.       case js: JavaSerializer => in.defaultReadObject()
  151.       case _ =>
  152.         ascending = in.readBoolean()
  153.         ordering = in.readObject().asInstanceOf[Ordering[K]]
  154.         binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]
  155.         val ser = sfactory.newInstance()
  156.         Utils.deserializeViaNestedStream(in, ser) { ds =>
  157.           implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
  158.           rangeBounds = ds.readObject[Array[K]]()
  159.         }
  160.     }
  161.   }
  162. }
复制代码
将一定范围内的数映射到某一个分区内,在实现中,分界(rangeBounds)算法用到了水塘抽样算法。RangePartitioner的重点在于构建rangeBounds数组对象,主要步骤是:
RangePartitioner的sketch函数的作用是对RDD中的数据按照需要的样本数据量进行数据抽取,主要调用SamplingUtils类的reservoirSampleAndCount方法对每个分区进行数据抽取,抽取后计算出整体所有分区的数据量大小;reserviorSampleAndCount方法的抽取方式是先从迭代器中获取样本数量个数据(顺序获取),然后对剩余的数据进行判断,替换之前的样本数据,最终达到数据抽样的效果。RangePartitioner的determineBounds函数的作用是根据样本数据记忆权重大小确定数据边界。
RangePartitioner的determineBounds函数的作用是根据样本数据记忆权重大小确定数据边界,源代码如下:
origin code:
  1. /**
  2.    * Determines the bounds for range partitioning from candidates with weights indicating how many
  3.    * items each represents. Usually this is 1 over the probability used to sample this candidate.
  4.    *
  5.    * @param candidates unordered candidates with weights
  6.    * @param partitions number of partitions
  7.    * @return selected bounds
  8.    */
  9.   def determineBounds[K : Ordering : ClassTag](
  10.       candidates: ArrayBuffer[(K, Float)],
  11.       partitions: Int): Array[K] = {
  12.     val ordering = implicitly[Ordering[K]]
  13.     // 按照数据进行排序,默认升序排序
  14.     val ordered = candidates.sortBy(_._1)
  15.     // 获取总的样本数据大小
  16.     val numCandidates = ordered.size
  17.     // 计算总的权重大小
  18.     val sumWeights = ordered.map(_._2.toDouble).sum
  19.     // 计算步长
  20.     val step = sumWeights / partitions
  21.     var cumWeight = 0.0
  22.     var target = step
  23.     val bounds = ArrayBuffer.empty[K]
  24.     var i = 0
  25.     var j = 0
  26.     var previousBound = Option.empty[K]
  27.     while ((i < numCandidates) && (j < partitions - 1)) {
  28.       // 获取排序后的第i个数据及权重
  29.       val (key, weight) = ordered(i)
  30.       // 累计权重
  31.       cumWeight += weight
  32.       if (cumWeight >= target) {
  33.         // Skip duplicate values.
  34.         // 权重已经达到一个步长的范围,计算出一个分区id的值
  35.         if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {// 上一个边界值为空,或者当前边界值key数据大于上一个边界的值,那么当前key有效,进行计算
  36.           // 添加当前key到边界集合中
  37.           bounds += key
  38.           // 累计target步长界限
  39.           target += step
  40.           // 分区数量加1
  41.           j += 1
  42.           // 上一个边界的值重置为当前边界的值
  43.           previousBound = Some(key)
  44.         }
  45.       }
  46.       i += 1
  47.     }
  48.     // 返回结果
  49.     bounds.toArray
  50.   }
复制代码
自定义分区器

自定义分区器是需要继承org.apache.spark.Partitioner类并实现以下三个方法:
e.g.1
  1. // CustomPartitioner
  2. import org.apache.spark.Partitioner
  3. /**
  4. * @param numPartition 分区数量
  5. */
  6. class CustomPartitioner(numPartition: Int) extends Partitioner{
  7.     // 返回分区的总数
  8.     override def numPartitions: Int = numPartition
  9.     // 根据传入的 key 返回分区的索引
  10.     override def getPartition(key: Any): Int = {
  11.         key.toString.toInt % numPartition
  12.     }
  13. }
  14. // CustomPartitionerDemo
  15. import com.work.util.SparkUtil
  16. import org.apache.spark.SparkContext
  17. import org.apache.spark.rdd.RDD
  18. object CustomPartitionerDemo {
  19.     def main(args: Array[String]): Unit = {
  20.         val sc: SparkContext = SparkUtil.getSparkContext()
  21.         println("=================== 原始数据 =====================")
  22.         // zipWithIndex 该函数将 RDD 中的元素和这个元素在 RDD 中的 ID(索引号)组合成键值对
  23.         val data: RDD[(Int, Long)] = sc.parallelize(0 to 10, 1).zipWithIndex()
  24.         println(data.collect().toBuffer)
  25.         println("=================== 分区和数据组合成 Map =====================")
  26.         val func: (Int, Iterator[(Int, Long)]) => Iterator[String] = (index: Int, iter: Iterator[(Int, Long)]) => {
  27.             iter.map(x => "[partID:" + index + ", value:" + x + "]")
  28.         }
  29.         val array: Array[String] = data.mapPartitionsWithIndex(func).collect()
  30.         for (i <- array) {
  31.             println(i)
  32.         }
  33.         println("=================== 自定义5个分区和数据组合成 Map =====================")
  34.         val rdd1: RDD[(Int, Long)] = data.partitionBy(new CustomPartitioner(5))
  35.         val array1: Array[String] = rdd1.mapPartitionsWithIndex(func).collect()
  36.         for (i <- array1) {
  37.             println(i)
  38.         }
  39.     }
  40. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4