IT评测·应用市场-qidao123.com技术社区
标题:
Spark中排序--前缀排序prefixSort
[打印本页]
作者:
石小疯
时间:
2025-4-8 01:53
标题:
Spark中排序--前缀排序prefixSort
配景
近来偶尔间看到了一篇文章一文把握 Velox orderby 算子的排序算法,里面主要说到了Velox PrefixSort怎么用排序算饭加速大数据的排序的,其中有说到:
排序的过程,主要考虑霎三件事情:
1. 选择比较函数
2. 选择排序算法
3. 排序过程中的数据移动,移动数据或者移动指针?
复制代码
最让我深有感触的是 这里面涉及到的比力函数,这里的主要思想就是:
把所有的类型的比较(无论是字符串还是整数等),都转换为二进制字符串的比较,那么这在比较的速度上就会能够充分利用硬件资源,使得加速。其中就会涉及到各个字段类型的规范化: 按照order by的顺序依次进行规范,如果遇到不能规范的字段类型,则后续的规范直接中断。
复制代码
固然详细的Velox的代码我是没有去看,但是我们可以解析一下Spark中的Sort是怎么实现的,作为大数据的标杆组件,我们可以看一下,本文基于Spark 3.5
分析
直接切入到SortExec类,其中有个createSorter 方法,这里会构建排序函数,我们这里的重点不是排序函数,而是比力函数的实现:
val ordering = RowOrdering.create(sortOrder, output)
// The comparator for comparing prefix
val boundSortExpression = BindReferences.bindReference(sortOrder.head, output)
val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
// The generator for prefix
val prefixExpr = SortPrefix(boundSortExpression)
val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))
val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix
override def computePrefix(row: InternalRow):
UnsafeExternalRowSorter.PrefixComputer.Prefix = {
val prefix = prefixProjection.apply(row)
result.isNull = prefix.isNullAt(0)
result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0)
result
}
}
val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
rowSorter = UnsafeExternalRowSorter.create(
schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)
复制代码
先说 UnsafeExternalRowSorter中 内存排序(UnsafeInMemorySorter)最根本的思想:先根据前缀比力算法进行比力,如果相等的话,则再遍历实际数据的指针去获取真正的数据进行比力,这种可以规避随机内存读取从而提交缓存的掷中率,进而提高比力的速率。
再说 这里自界说的前缀比力:
BindReferences.bindReference(sortOrder.head, output) 这里指选择第一个排序的字段作为前缀比力的范例
val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
这里会根据排序的字段范例选择出对应的排序方法:
sortOrder.dataType match {
case StringType => stringPrefixComparator(sortOrder)
case BinaryType => binaryPrefixComparator(sortOrder)
case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType | TimestampType |
TimestampNTZType | _: AnsiIntervalType =>
longPrefixComparator(sortOrder)
case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>
longPrefixComparator(sortOrder)
case FloatType | DoubleType => doublePrefixComparator(sortOrder)
case dt: DecimalType => doublePrefixComparator(sortOrder)
case _ => NoOpPrefixComparator
}
复制代码
最后 就只有两种前缀比力器 UnsignedPrefixComparator SignedPrefixComparator NoOpPrefixComparator
对于 String 以及Binary double float 这种会选择无符号的前缀比力
对于 double等根本数据范例会选择 有符号的前缀比力
这里为什么会这么选择,实在是跟内部的范例存储有关以及 prefixExpr 和 prefixComputer选择的Prefix有关
盘算前缀
主要涉及以下
val prefixExpr = SortPrefix(boundSortExpression)
val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))
val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer
复制代码
这里主要利用代码生成的方式,通过prefixProjection.apply(row) 这只拿了第一个sortOrder的表达式,所以是以第一个sort表达式来获取前缀比力的。
其中 SortPrefix中的方法calcPrefix会根据Spark的内部范例,获取Long范例的可以用于比力的值,所以我们可以看到在prefixComputer的computePrefix方法中可以通过getLong(0)来获取对应的值。这样在后续内存排序(UnsafeInMemorySorter)中就可以用该long值进行排序。
其他
这里特别说一下:两种范例的BinaryType(对应内部的范例为Array[Byte]) 和 StringType(对应的内部的范例为UTF8String) 获取prefix的.
注意UTF8String 内部也是以 Array[Byte]存储的.
这两个都是通过ByteArray.getPrefix方法来获取对应的值的。
其中 Platform.BYTE_ARRAY_OFFSET调用UNSAFE.arrayBaseOffset(byte[].class) 获取数组第一个元素相对于数组起始地址的偏移量.
public static long getPrefix(byte[] bytes) {
if (bytes == null) {
return 0L;
}
return getPrefix(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length);
}
static long getPrefix(Object base, long offset, int numBytes) {
复制代码
getPrefix 这个方法从字节数组取numBytes 个字节数之后构成Long范例返回。
实在byte的内部的底层也是按照数字存储的,取值范围是[-128,127],所以在底层转换为long也是可以的。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/)
Powered by Discuz! X3.4