IT评测·应用市场-qidao123.com技术社区

标题: Spark中排序--前缀排序prefixSort [打印本页]

作者: 石小疯    时间: 2025-4-8 01:53
标题: Spark中排序--前缀排序prefixSort
配景

近来偶尔间看到了一篇文章一文把握 Velox orderby 算子的排序算法,里面主要说到了Velox PrefixSort怎么用排序算饭加速大数据的排序的,其中有说到:
  1. 排序的过程,主要考虑霎三件事情:
  2. 1. 选择比较函数
  3. 2. 选择排序算法
  4. 3. 排序过程中的数据移动,移动数据或者移动指针?
复制代码
最让我深有感触的是 这里面涉及到的比力函数,这里的主要思想就是:
  1. 把所有的类型的比较(无论是字符串还是整数等),都转换为二进制字符串的比较,那么这在比较的速度上就会能够充分利用硬件资源,使得加速。其中就会涉及到各个字段类型的规范化: 按照order by的顺序依次进行规范,如果遇到不能规范的字段类型,则后续的规范直接中断。
复制代码
固然详细的Velox的代码我是没有去看,但是我们可以解析一下Spark中的Sort是怎么实现的,作为大数据的标杆组件,我们可以看一下,本文基于Spark 3.5
分析

直接切入到SortExec类,其中有个createSorter 方法,这里会构建排序函数,我们这里的重点不是排序函数,而是比力函数的实现:
  1.     val ordering = RowOrdering.create(sortOrder, output)
  2.     // The comparator for comparing prefix
  3.     val boundSortExpression = BindReferences.bindReference(sortOrder.head, output)
  4.     val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
  5.     val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
  6.       SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
  7.     // The generator for prefix
  8.     val prefixExpr = SortPrefix(boundSortExpression)
  9.     val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))
  10.     val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
  11.       private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix
  12.       override def computePrefix(row: InternalRow):
  13.           UnsafeExternalRowSorter.PrefixComputer.Prefix = {
  14.         val prefix = prefixProjection.apply(row)
  15.         result.isNull = prefix.isNullAt(0)
  16.         result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0)
  17.         result
  18.       }
  19.     }
  20.     val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
  21.     rowSorter = UnsafeExternalRowSorter.create(
  22.       schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)
复制代码
先说 UnsafeExternalRowSorter中 内存排序(UnsafeInMemorySorter)最根本的思想:先根据前缀比力算法进行比力,如果相等的话,则再遍历实际数据的指针去获取真正的数据进行比力,这种可以规避随机内存读取从而提交缓存的掷中率,进而提高比力的速率。
再说 这里自界说的前缀比力:

其他

这里特别说一下:两种范例的BinaryType(对应内部的范例为Array[Byte]) 和 StringType(对应的内部的范例为UTF8String) 获取prefix的.
注意UTF8String 内部也是以 Array[Byte]存储的.
这两个都是通过ByteArray.getPrefix方法来获取对应的值的。
其中 Platform.BYTE_ARRAY_OFFSET调用UNSAFE.arrayBaseOffset(byte[].class) 获取数组第一个元素相对于数组起始地址的偏移量.
  1.   public static long getPrefix(byte[] bytes) {
  2.     if (bytes == null) {
  3.       return 0L;
  4.     }
  5.     return getPrefix(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length);
  6.   }
  7.   static long getPrefix(Object base, long offset, int numBytes) {
复制代码
getPrefix 这个方法从字节数组取numBytes 个字节数之后构成Long范例返回。
实在byte的内部的底层也是按照数字存储的,取值范围是[-128,127],所以在底层转换为long也是可以的。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4