Spark中排序--前缀排序prefixSort

打印 上一主题 下一主题

主题 1869|帖子 1869|积分 5607

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
配景

近来偶尔间看到了一篇文章一文把握 Velox orderby 算子的排序算法,里面主要说到了Velox PrefixSort怎么用排序算饭加速大数据的排序的,其中有说到:
  1. 排序的过程,主要考虑霎三件事情:
  2. 1. 选择比较函数
  3. 2. 选择排序算法
  4. 3. 排序过程中的数据移动,移动数据或者移动指针?
复制代码
最让我深有感触的是 这里面涉及到的比力函数,这里的主要思想就是:
  1. 把所有的类型的比较(无论是字符串还是整数等),都转换为二进制字符串的比较,那么这在比较的速度上就会能够充分利用硬件资源,使得加速。其中就会涉及到各个字段类型的规范化: 按照order by的顺序依次进行规范,如果遇到不能规范的字段类型,则后续的规范直接中断。
复制代码
固然详细的Velox的代码我是没有去看,但是我们可以解析一下Spark中的Sort是怎么实现的,作为大数据的标杆组件,我们可以看一下,本文基于Spark 3.5
分析

直接切入到SortExec类,其中有个createSorter 方法,这里会构建排序函数,我们这里的重点不是排序函数,而是比力函数的实现:
  1.     val ordering = RowOrdering.create(sortOrder, output)
  2.     // The comparator for comparing prefix
  3.     val boundSortExpression = BindReferences.bindReference(sortOrder.head, output)
  4.     val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
  5.     val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
  6.       SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
  7.     // The generator for prefix
  8.     val prefixExpr = SortPrefix(boundSortExpression)
  9.     val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))
  10.     val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
  11.       private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix
  12.       override def computePrefix(row: InternalRow):
  13.           UnsafeExternalRowSorter.PrefixComputer.Prefix = {
  14.         val prefix = prefixProjection.apply(row)
  15.         result.isNull = prefix.isNullAt(0)
  16.         result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0)
  17.         result
  18.       }
  19.     }
  20.     val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
  21.     rowSorter = UnsafeExternalRowSorter.create(
  22.       schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)
复制代码
先说 UnsafeExternalRowSorter中 内存排序(UnsafeInMemorySorter)最根本的思想:先根据前缀比力算法进行比力,如果相等的话,则再遍历实际数据的指针去获取真正的数据进行比力,这种可以规避随机内存读取从而提交缓存的掷中率,进而提高比力的速率。
再说 这里自界说的前缀比力:


  • BindReferences.bindReference(sortOrder.head, output) 这里指选择第一个排序的字段作为前缀比力的范例
  • val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
    这里会根据排序的字段范例选择出对应的排序方法:
    1.     sortOrder.dataType match {
    2.     case StringType => stringPrefixComparator(sortOrder)
    3.     case BinaryType => binaryPrefixComparator(sortOrder)
    4.     case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType | TimestampType |
    5.         TimestampNTZType | _: AnsiIntervalType =>
    6.       longPrefixComparator(sortOrder)
    7.     case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>
    8.       longPrefixComparator(sortOrder)
    9.     case FloatType | DoubleType => doublePrefixComparator(sortOrder)
    10.     case dt: DecimalType => doublePrefixComparator(sortOrder)
    11.     case _ => NoOpPrefixComparator
    12.   }
    复制代码
    最后 就只有两种前缀比力器 UnsignedPrefixComparator SignedPrefixComparator NoOpPrefixComparator
    对于 String 以及Binary double float 这种会选择无符号的前缀比力
    对于 double等根本数据范例会选择 有符号的前缀比力
    这里为什么会这么选择,实在是跟内部的范例存储有关以及 prefixExpr 和 prefixComputer选择的Prefix有关
  • 盘算前缀
    主要涉及以下
    1.   val prefixExpr = SortPrefix(boundSortExpression)
    2.   val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))
    3.   val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer
    复制代码
    这里主要利用代码生成的方式,通过prefixProjection.apply(row) 这只拿了第一个sortOrder的表达式,所以是以第一个sort表达式来获取前缀比力的。
    其中 SortPrefix中的方法calcPrefix会根据Spark的内部范例,获取Long范例的可以用于比力的值,所以我们可以看到在prefixComputer的computePrefix方法中可以通过getLong(0)来获取对应的值。这样在后续内存排序(UnsafeInMemorySorter)中就可以用该long值进行排序。
其他

这里特别说一下:两种范例的BinaryType(对应内部的范例为Array[Byte]) 和 StringType(对应的内部的范例为UTF8String) 获取prefix的.
注意UTF8String 内部也是以 Array[Byte]存储的.
这两个都是通过ByteArray.getPrefix方法来获取对应的值的。
其中 Platform.BYTE_ARRAY_OFFSET调用UNSAFE.arrayBaseOffset(byte[].class) 获取数组第一个元素相对于数组起始地址的偏移量.
  1.   public static long getPrefix(byte[] bytes) {
  2.     if (bytes == null) {
  3.       return 0L;
  4.     }
  5.     return getPrefix(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length);
  6.   }
  7.   static long getPrefix(Object base, long offset, int numBytes) {
复制代码
getPrefix 这个方法从字节数组取numBytes 个字节数之后构成Long范例返回。
实在byte的内部的底层也是按照数字存储的,取值范围是[-128,127],所以在底层转换为long也是可以的。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

石小疯

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表