deletion vector 是通过一组向量, 维护一个文件中被删除的行, 可以明白为一种索引. 这种方式可以以 Merge On Write 的方式, 来避免 Merge On Read 的过程, 从而以写入性能换取读取性能. 对于写少读多, 或者对读取性能有更高要求的场景会比较恰当.
避免 Merge On Read 会带来以下几个好处:
- 读取时仅须要直接读取文件, 并根据 bitmap 跳过对应的删除行即可, 跳过 Merge 过程, 直观的提升读取效率
- 和 Native 引擎联合更好, 可以通过 C++ reader, 直接读取文件并举行向量过滤, 避免了通过 jni 的 merge 过程
- 由于不再须要 merge, 因此 split 切分可以更细, 从而增多 split, 提升读取的并发度
- 可以举行 value 字段的 filter push down, 从而实现更好的过滤效果.
- 因为在 MOR 的情况下, 由于存在多个 Key value 须要归并的情况, 因此在归并之前不能举行 value filter 下推.
写入/compaction 过程
LookupChangelogMergeFunctionWrapper
deletion vector 的创建依赖于 compaction. 在 compaction 过程中通过 lookup 查找高层文件, 并举行文件删除行的标志.
开启 delete vector 后, 会强制举行 Lookup, 从而使用 ForceUpLevel0Compaction 的 Compaction 策略.
org.apache.paimon.operation.KeyValueFileStoreWrite#createRewriter - // 对于needLookup的场景, 采用ForceUpLevel0 compaction的机制
- CompactStrategy compactStrategy =
- options.needLookup()
- ? new ForceUpLevel0Compaction(universalCompaction)
- : universalCompaction;
复制代码- processor =
- lookupStrategy.deletionVector
- ? new PositionedKeyValueProcessor(
- valueType,
- lookupStrategy.produceChangelog
- || mergeEngine != DEDUPLICATE
- || !options.sequenceField().isEmpty())
- : new KeyValueProcessor(valueType);
复制代码 这里的 processor 对应于 lookup 过程中怎样处理 value 字段, 首先对于 deletion vector 场景, 我们查找到一个 key 时, 须要知道对应的"行号" position, 因此须要 PositionedKeyValueProcessor 即记载对应 KV pair 的行号.
其次对于以下三种场景, 还要求 lookup 的过程中, 读取完整的 value
- lookupStrategy.produceChangelog 由于要产生 Changelog, 所以须要知道前值, 因此须要完整的 value 读取
- mergeEngine != DEDUPLICATE
- !options.sequenceField().isEmpty() 和上面的一样, 这几类场景都是基于 L0 key 查找到高层值的时候, 不能简单的将高层标志为 delete, 而是须要执行一次 Merge 过程, 例如 Partial-Update, 或者根据 sequence field 比较后才 deduplicate, 所以这几类也须要读取完整value.
如果不是这几类, 比如不带排序字段的 deduplicate, 那么在 lookup 的过程中, 只须要读取对应的 key 即可, 那么就可以大大低落 lookup 的 IO 开销.
DeletionVectorsMaintainer
Lookup 过程中, 对于查找到高层的 key, 可以对高层数据标志删除. DeletionVectorsMaintainer 中维护了文件到 DeletionVector 的映射, DeletionVector 的实现通常是一个 RoaringBitmap.- if (lookupResult != null) {
- if (lookupStrategy.deletionVector) {
- PositionedKeyValue positionedKeyValue = (PositionedKeyValue) lookupResult;
- highLevel = positionedKeyValue.keyValue();
- deletionVectorsMaintainer.notifyNewDeletion(
- positionedKeyValue.fileName(), positionedKeyValue.rowPosition());
- } else {
- highLevel = (KeyValue) lookupResult;
- }
- }
复制代码 按照 pip-16 中的形貌, 每个 bucket 会维护一个 delete vector 文件 , 这个文件中维护了所有有删除 key 的文件和对应的 bitmap.
- 同步天生: 在 Compaction 完成后, 将内存中维护的 Map 数据布局写入对应的 index 文件. 这块其实会在内存内里维护每个有删除举动的文件的 deletion vector. 启动阶段也会从元数据中读取恢复. 当文件比较多的时候, 这块的内存开销大概也不容忽视. 而且由于 Map 维护, 只要有一个文件更新, 整个 index 文件也是要被重写的.
- 异步天生: delete vector 天生也支持异步化, 这样就可以不阻塞主链路的写入流程.
查询/读取
RawSplitRead
KeyValueTableRead- this.readProviders =
- Arrays.asList(
- new RawFileSplitReadProvider(batchRawReadSupplier, this::assignValues),
- new MergeFileSplitReadProvider(mergeReadSupplier, this::assignValues),
- new IncrementalChangelogReadProvider(mergeReadSupplier, this::assignValues),
- new IncrementalDiffReadProvider(mergeReadSupplier, this::assignValues));
复制代码 对于 KeyValueTableRead, 会创建一堆的 SplitReadProvider, 哪个 match 就走哪个读取.- public boolean match(DataSplit split, boolean forceKeepDelete) {
- boolean matched = !forceKeepDelete && !split.isStreaming() && split.rawConvertible();
- if (matched) {
- // for legacy version, we are not sure if there are delete rows, but in order to be
- // compatible with the query acceleration of the OLAP engine, we have generated raw
- // files.
- // Here, for the sake of correctness, we still need to perform drop delete filtering.
- for (DataFileMeta file : split.dataFiles()) {
- if (!file.deleteRowCount().isPresent()) {
- return false;
- }
- }
- }
- return matched;
- }
复制代码
- 对于 dv 表, 他的 split 是 rawConvertible 的, 即表示对应的 reader 可以转化为 raw reader.
ApplyDeletionFileRecordIterator
- public InternalRow next() throws IOException {
- while (true) {
- InternalRow next = iterator.next();
- if (next == null) {
- return null;
- }
- if (!deletionVector.isDeleted(returnedPosition())) {
- return next;
- }
- }
- }
复制代码 真正的读取过程, 就是根据提前加载的 delete vector 根据行号举行过滤.
另有一些其他关于读取的改动, 重要是 filter 下推相干的. 因为当文件可以 raw read, 不须要归并后, 非主键字段也就可以安全下推了.
例如: 开启 dv 的表, 可以应用其他的 value filter, 因此也就可以使用索引机制了.
Append Table DV support
除此之外, Paimon 还使用 deletion vector 实现了对 Append 表的删除
append 表的删除可以类比 iceberg 的实现, 根据输入数据, 构建删除的 deletion vector, 从而实现 append 表的删除逻辑.- if (deletionVectorsEnabled) {
- // Step2: collect all the deletion vectors that marks the deleted rows.
- val deletionVectors = collectDeletionVectors(
- candidateDataSplits,
- dataFilePathToMeta,
- condition,
- relation,
- sparkSession)
- deletionVectors.cache()
- try {
- // Step3: write these updated data
- val touchedDataSplits = deletionVectors.collect().map {
- SparkDeletionVectors.toDataSplit(_, root, pathFactory, dataFilePathToMeta)
- }
- val addCommitMessage = writeOnlyUpdatedData(sparkSession, touchedDataSplits)
- // Step4: write these deletion vectors.
- val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
- addCommitMessage ++ indexCommitMsg
- } finally {
- deletionVectors.unpersist()
- }
- } else {
复制代码
- 通过 filter 过滤, 先拿到 update 或 delete 语句大概影响的 split
- 构建 Reader 读取, 读取的 plan 额外添加 Metadata column, __paimon_file_path 和 __paimon_row_index , 这两个是上面 deletion vector 构建的依赖元信息
- 根据 update 输入构建 deletion vector (indexCommitMsg), 根据 update 输出构建addCommitMsg
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |