马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
配景
近来偶尔间看到了一篇文章一文把握 Velox orderby 算子的排序算法,里面主要说到了Velox PrefixSort怎么用排序算饭加速大数据的排序的,其中有说到:
- 排序的过程,主要考虑霎三件事情:
- 1. 选择比较函数
- 2. 选择排序算法
- 3. 排序过程中的数据移动,移动数据或者移动指针?
复制代码 最让我深有感触的是 这里面涉及到的比力函数,这里的主要思想就是:
- 把所有的类型的比较(无论是字符串还是整数等),都转换为二进制字符串的比较,那么这在比较的速度上就会能够充分利用硬件资源,使得加速。其中就会涉及到各个字段类型的规范化: 按照order by的顺序依次进行规范,如果遇到不能规范的字段类型,则后续的规范直接中断。
复制代码 固然详细的Velox的代码我是没有去看,但是我们可以解析一下Spark中的Sort是怎么实现的,作为大数据的标杆组件,我们可以看一下,本文基于Spark 3.5
分析
直接切入到SortExec类,其中有个createSorter 方法,这里会构建排序函数,我们这里的重点不是排序函数,而是比力函数的实现:
- val ordering = RowOrdering.create(sortOrder, output)
- // The comparator for comparing prefix
- val boundSortExpression = BindReferences.bindReference(sortOrder.head, output)
- val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
- val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
- SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
- // The generator for prefix
- val prefixExpr = SortPrefix(boundSortExpression)
- val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))
- val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
- private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix
- override def computePrefix(row: InternalRow):
- UnsafeExternalRowSorter.PrefixComputer.Prefix = {
- val prefix = prefixProjection.apply(row)
- result.isNull = prefix.isNullAt(0)
- result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0)
- result
- }
- }
- val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
- rowSorter = UnsafeExternalRowSorter.create(
- schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)
复制代码 先说 UnsafeExternalRowSorter中 内存排序(UnsafeInMemorySorter)最根本的思想:先根据前缀比力算法进行比力,如果相等的话,则再遍历实际数据的指针去获取真正的数据进行比力,这种可以规避随机内存读取从而提交缓存的掷中率,进而提高比力的速率。
再说 这里自界说的前缀比力:
- BindReferences.bindReference(sortOrder.head, output) 这里指选择第一个排序的字段作为前缀比力的范例
- val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
这里会根据排序的字段范例选择出对应的排序方法:
- sortOrder.dataType match {
- case StringType => stringPrefixComparator(sortOrder)
- case BinaryType => binaryPrefixComparator(sortOrder)
- case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType | TimestampType |
- TimestampNTZType | _: AnsiIntervalType =>
- longPrefixComparator(sortOrder)
- case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>
- longPrefixComparator(sortOrder)
- case FloatType | DoubleType => doublePrefixComparator(sortOrder)
- case dt: DecimalType => doublePrefixComparator(sortOrder)
- case _ => NoOpPrefixComparator
- }
复制代码 最后 就只有两种前缀比力器 UnsignedPrefixComparator SignedPrefixComparator NoOpPrefixComparator
对于 String 以及Binary double float 这种会选择无符号的前缀比力
对于 double等根本数据范例会选择 有符号的前缀比力
这里为什么会这么选择,实在是跟内部的范例存储有关以及 prefixExpr 和 prefixComputer选择的Prefix有关
- 盘算前缀
主要涉及以下
- val prefixExpr = SortPrefix(boundSortExpression)
- val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))
- val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer
复制代码 这里主要利用代码生成的方式,通过prefixProjection.apply(row) 这只拿了第一个sortOrder的表达式,所以是以第一个sort表达式来获取前缀比力的。
其中 SortPrefix中的方法calcPrefix会根据Spark的内部范例,获取Long范例的可以用于比力的值,所以我们可以看到在prefixComputer的computePrefix方法中可以通过getLong(0)来获取对应的值。这样在后续内存排序(UnsafeInMemorySorter)中就可以用该long值进行排序。
其他
这里特别说一下:两种范例的BinaryType(对应内部的范例为Array[Byte]) 和 StringType(对应的内部的范例为UTF8String) 获取prefix的.
注意UTF8String 内部也是以 Array[Byte]存储的.
这两个都是通过ByteArray.getPrefix方法来获取对应的值的。
其中 Platform.BYTE_ARRAY_OFFSET调用UNSAFE.arrayBaseOffset(byte[].class) 获取数组第一个元素相对于数组起始地址的偏移量.
- public static long getPrefix(byte[] bytes) {
- if (bytes == null) {
- return 0L;
- }
- return getPrefix(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length);
- }
- static long getPrefix(Object base, long offset, int numBytes) {
复制代码 getPrefix 这个方法从字节数组取numBytes 个字节数之后构成Long范例返回。
实在byte的内部的底层也是按照数字存储的,取值范围是[-128,127],所以在底层转换为long也是可以的。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |