Paimon Deletion Vector
deletion vector 是通过一组向量, 维护一个文件中被删除的行, 可以明白为一种索引. 这种方式可以以 Merge On Write 的方式, 来避免 Merge On Read 的过程, 从而以写入性能换取读取性能. 对于写少读多, 或者对读取性能有更高要求的场景会比较恰当.避免 Merge On Read 会带来以下几个好处:
[*]读取时仅须要直接读取文件, 并根据 bitmap 跳过对应的删除行即可, 跳过 Merge 过程, 直观的提升读取效率
[*]和 Native 引擎联合更好, 可以通过 C++ reader, 直接读取文件并举行向量过滤, 避免了通过 jni 的 merge 过程
[*]由于不再须要 merge, 因此 split 切分可以更细, 从而增多 split, 提升读取的并发度
[*]可以举行 value 字段的 filter push down, 从而实现更好的过滤效果.
[*]因为在 MOR 的情况下, 由于存在多个 Key value 须要归并的情况, 因此在归并之前不能举行 value filter 下推.
写入/compaction 过程
LookupChangelogMergeFunctionWrapper
deletion vector 的创建依赖于 compaction. 在 compaction 过程中通过 lookup 查找高层文件, 并举行文件删除行的标志.
开启 delete vector 后, 会强制举行 Lookup, 从而使用 ForceUpLevel0Compaction 的 Compaction 策略.
org.apache.paimon.operation.KeyValueFileStoreWrite#createRewriter
// 对于needLookup的场景, 采用ForceUpLevel0 compaction的机制
CompactStrategy compactStrategy =
options.needLookup()
? new ForceUpLevel0Compaction(universalCompaction)
: universalCompaction;processor =
lookupStrategy.deletionVector
? new PositionedKeyValueProcessor(
valueType,
lookupStrategy.produceChangelog
|| mergeEngine != DEDUPLICATE
|| !options.sequenceField().isEmpty())
: new KeyValueProcessor(valueType);这里的 processor 对应于 lookup 过程中怎样处理 value 字段, 首先对于 deletion vector 场景, 我们查找到一个 key 时, 须要知道对应的"行号" position, 因此须要 PositionedKeyValueProcessor 即记载对应 KV pair 的行号.
其次对于以下三种场景, 还要求 lookup 的过程中, 读取完整的 value
[*]lookupStrategy.produceChangelog由于要产生 Changelog, 所以须要知道前值, 因此须要完整的 value 读取
[*]mergeEngine != DEDUPLICATE
[*]!options.sequenceField().isEmpty() 和上面的一样, 这几类场景都是基于 L0 key 查找到高层值的时候, 不能简单的将高层标志为 delete, 而是须要执行一次 Merge 过程, 例如 Partial-Update, 或者根据 sequence field 比较后才 deduplicate, 所以这几类也须要读取完整value.
如果不是这几类, 比如不带排序字段的 deduplicate, 那么在 lookup 的过程中, 只须要读取对应的 key 即可, 那么就可以大大低落 lookup 的 IO 开销.
DeletionVectorsMaintainer
Lookup 过程中, 对于查找到高层的 key, 可以对高层数据标志删除. DeletionVectorsMaintainer 中维护了文件到 DeletionVector 的映射, DeletionVector 的实现通常是一个 RoaringBitmap.
if (lookupResult != null) {
if (lookupStrategy.deletionVector) {
PositionedKeyValue positionedKeyValue = (PositionedKeyValue) lookupResult;
highLevel = positionedKeyValue.keyValue();
deletionVectorsMaintainer.notifyNewDeletion(
positionedKeyValue.fileName(), positionedKeyValue.rowPosition());
} else {
highLevel = (KeyValue) lookupResult;
}
}按照 pip-16 中的形貌, 每个 bucket 会维护一个 delete vector 文件 , 这个文件中维护了所有有删除 key 的文件和对应的 bitmap.
https://aitozi.oss-cn-hangzhou.aliyuncs.com/img.Pasted%20image%2020241103220849.png
[*]同步天生: 在 Compaction 完成后, 将内存中维护的 Map 数据布局写入对应的 index 文件. 这块其实会在内存内里维护每个有删除举动的文件的 deletion vector. 启动阶段也会从元数据中读取恢复. 当文件比较多的时候, 这块的内存开销大概也不容忽视. 而且由于 Map 维护, 只要有一个文件更新, 整个 index 文件也是要被重写的.
[*]异步天生: delete vector 天生也支持异步化, 这样就可以不阻塞主链路的写入流程.
查询/读取
RawSplitRead
KeyValueTableRead
this.readProviders =
Arrays.asList(
new RawFileSplitReadProvider(batchRawReadSupplier, this::assignValues),
new MergeFileSplitReadProvider(mergeReadSupplier, this::assignValues),
new IncrementalChangelogReadProvider(mergeReadSupplier, this::assignValues),
new IncrementalDiffReadProvider(mergeReadSupplier, this::assignValues));对于 KeyValueTableRead, 会创建一堆的 SplitReadProvider, 哪个 match 就走哪个读取.
public boolean match(DataSplit split, boolean forceKeepDelete) {
boolean matched = !forceKeepDelete && !split.isStreaming() && split.rawConvertible();
if (matched) {
// for legacy version, we are not sure if there are delete rows, but in order to be
// compatible with the query acceleration of the OLAP engine, we have generated raw
// files.
// Here, for the sake of correctness, we still need to perform drop delete filtering.
for (DataFileMeta file : split.dataFiles()) {
if (!file.deleteRowCount().isPresent()) {
return false;
}
}
}
return matched;
}
[*]对于 dv 表, 他的 split 是 rawConvertible 的, 即表示对应的 reader 可以转化为 raw reader.
ApplyDeletionFileRecordIterator
public InternalRow next() throws IOException {
while (true) {
InternalRow next = iterator.next();
if (next == null) {
return null;
}
if (!deletionVector.isDeleted(returnedPosition())) {
return next;
}
}
}真正的读取过程, 就是根据提前加载的 delete vector 根据行号举行过滤.
另有一些其他关于读取的改动, 重要是 filter 下推相干的. 因为当文件可以 raw read, 不须要归并后, 非主键字段也就可以安全下推了.
例如: 开启 dv 的表, 可以应用其他的 value filter, 因此也就可以使用索引机制了.
Append Table DV support
除此之外, Paimon 还使用 deletion vector 实现了对 Append 表的删除
append 表的删除可以类比 iceberg 的实现, 根据输入数据, 构建删除的 deletion vector, 从而实现 append 表的删除逻辑.
if (deletionVectorsEnabled) {
// Step2: collect all the deletion vectors that marks the deleted rows.
val deletionVectors = collectDeletionVectors(
candidateDataSplits,
dataFilePathToMeta,
condition,
relation,
sparkSession)
deletionVectors.cache()
try {
// Step3: write these updated data
val touchedDataSplits = deletionVectors.collect().map {
SparkDeletionVectors.toDataSplit(_, root, pathFactory, dataFilePathToMeta)
}
val addCommitMessage = writeOnlyUpdatedData(sparkSession, touchedDataSplits)
// Step4: write these deletion vectors.
val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
addCommitMessage ++ indexCommitMsg
} finally {
deletionVectors.unpersist()
}
} else {
[*]通过 filter 过滤, 先拿到 update 或 delete 语句大概影响的 split
[*]构建 Reader 读取, 读取的 plan 额外添加 Metadata column, __paimon_file_path 和 __paimon_row_index , 这两个是上面 deletion vector 构建的依赖元信息
[*]根据 update 输入构建 deletion vector (indexCommitMsg), 根据 update 输出构建addCommitMsg
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]