Paimon Deletion Vector

打印 上一主题 下一主题

主题 1498|帖子 1498|积分 4494

deletion vector 是通过一组向量, 维护一个文件中被删除的行, 可以明白为一种索引. 这种方式可以以 Merge On Write 的方式, 来避免 Merge On Read 的过程, 从而以写入性能换取读取性能. 对于写少读多, 或者对读取性能有更高要求的场景会比较恰当.
避免 Merge On Read 会带来以下几个好处:

  • 读取时仅须要直接读取文件, 并根据 bitmap 跳过对应的删除行即可, 跳过 Merge 过程, 直观的提升读取效率
  • 和 Native 引擎联合更好, 可以通过 C++ reader, 直接读取文件并举行向量过滤, 避免了通过 jni 的 merge 过程
  • 由于不再须要 merge, 因此 split 切分可以更细, 从而增多 split, 提升读取的并发度
  • 可以举行 value 字段的 filter push down, 从而实现更好的过滤效果.

    • 因为在 MOR 的情况下, 由于存在多个 Key value 须要归并的情况, 因此在归并之前不能举行 value filter 下推.

写入/compaction 过程

LookupChangelogMergeFunctionWrapper

deletion vector 的创建依赖于 compaction. 在 compaction 过程中通过 lookup 查找高层文件, 并举行文件删除行的标志.
开启 delete vector 后, 会强制举行 Lookup, 从而使用 ForceUpLevel0Compaction 的 Compaction 策略.
org.apache.paimon.operation.KeyValueFileStoreWrite#createRewriter
  1. // 对于needLookup的场景, 采用ForceUpLevel0 compaction的机制
  2. CompactStrategy compactStrategy =  
  3.         options.needLookup()  
  4.                 ? new ForceUpLevel0Compaction(universalCompaction)  
  5.                 : universalCompaction;
复制代码
  1. processor =  
  2.         lookupStrategy.deletionVector  
  3.                 ? new PositionedKeyValueProcessor(  
  4.                         valueType,  
  5.                         lookupStrategy.produceChangelog  
  6.                                 || mergeEngine != DEDUPLICATE
  7.                                 || !options.sequenceField().isEmpty())  
  8.                 : new KeyValueProcessor(valueType);
复制代码
这里的 processor 对应于 lookup 过程中怎样处理 value 字段, 首先对于 deletion vector 场景, 我们查找到一个 key 时, 须要知道对应的"行号" position, 因此须要 PositionedKeyValueProcessor 即记载对应 KV pair 的行号.
其次对于以下三种场景, 还要求 lookup 的过程中, 读取完整的 value

  • lookupStrategy.produceChangelog  由于要产生 Changelog, 所以须要知道前值, 因此须要完整的 value 读取
  • mergeEngine != DEDUPLICATE
  • !options.sequenceField().isEmpty() 和上面的一样, 这几类场景都是基于 L0 key 查找到高层值的时候, 不能简单的将高层标志为 delete, 而是须要执行一次 Merge 过程, 例如 Partial-Update, 或者根据 sequence field 比较后才 deduplicate, 所以这几类也须要读取完整value.
如果不是这几类, 比如不带排序字段的 deduplicate, 那么在 lookup 的过程中, 只须要读取对应的 key 即可, 那么就可以大大低落 lookup 的 IO 开销.
DeletionVectorsMaintainer

Lookup 过程中, 对于查找到高层的 key, 可以对高层数据标志删除. DeletionVectorsMaintainer 中维护了文件到 DeletionVector 的映射, DeletionVector 的实现通常是一个 RoaringBitmap.
  1. if (lookupResult != null) {  
  2.     if (lookupStrategy.deletionVector) {  
  3.         PositionedKeyValue positionedKeyValue = (PositionedKeyValue) lookupResult;  
  4.         highLevel = positionedKeyValue.keyValue();  
  5.         deletionVectorsMaintainer.notifyNewDeletion(  
  6.                 positionedKeyValue.fileName(), positionedKeyValue.rowPosition());  
  7.     } else {  
  8.         highLevel = (KeyValue) lookupResult;  
  9.     }  
  10. }
复制代码
按照 pip-16 中的形貌, 每个 bucket 会维护一个 delete vector 文件 , 这个文件中维护了所有有删除 key 的文件和对应的 bitmap.


  • 同步天生: 在 Compaction 完成后, 将内存中维护的 Map 数据布局写入对应的 index 文件. 这块其实会在内存内里维护每个有删除举动的文件的 deletion vector. 启动阶段也会从元数据中读取恢复. 当文件比较多的时候, 这块的内存开销大概也不容忽视. 而且由于 Map 维护, 只要有一个文件更新, 整个 index 文件也是要被重写的.
  • 异步天生: delete vector 天生也支持异步化, 这样就可以不阻塞主链路的写入流程.
查询/读取

RawSplitRead

KeyValueTableRead
  1. this.readProviders =
  2.                 Arrays.asList(
  3.                                 new RawFileSplitReadProvider(batchRawReadSupplier, this::assignValues),
  4.                                 new MergeFileSplitReadProvider(mergeReadSupplier, this::assignValues),
  5.                                 new IncrementalChangelogReadProvider(mergeReadSupplier, this::assignValues),
  6.                                 new IncrementalDiffReadProvider(mergeReadSupplier, this::assignValues));
复制代码
对于 KeyValueTableRead, 会创建一堆的 SplitReadProvider, 哪个 match 就走哪个读取.
  1. public boolean match(DataSplit split, boolean forceKeepDelete) {
  2.         boolean matched = !forceKeepDelete && !split.isStreaming() && split.rawConvertible();
  3.         if (matched) {
  4.                 // for legacy version, we are not sure if there are delete rows, but in order to be
  5.                 // compatible with the query acceleration of the OLAP engine, we have generated raw
  6.                 // files.
  7.                 // Here, for the sake of correctness, we still need to perform drop delete filtering.
  8.                 for (DataFileMeta file : split.dataFiles()) {
  9.                         if (!file.deleteRowCount().isPresent()) {
  10.                                 return false;
  11.                         }
  12.                 }
  13.         }
  14.         return matched;
  15. }
复制代码

  • 对于 dv 表, 他的 split 是 rawConvertible 的, 即表示对应的 reader 可以转化为 raw reader.
ApplyDeletionFileRecordIterator
  1. public InternalRow next() throws IOException {
  2.         while (true) {
  3.                 InternalRow next = iterator.next();
  4.                 if (next == null) {
  5.                         return null;
  6.                 }
  7.                 if (!deletionVector.isDeleted(returnedPosition())) {
  8.                         return next;
  9.                 }
  10.         }
  11. }
复制代码
真正的读取过程, 就是根据提前加载的 delete vector 根据行号举行过滤.
另有一些其他关于读取的改动, 重要是 filter 下推相干的. 因为当文件可以 raw read, 不须要归并后, 非主键字段也就可以安全下推了.
例如: 开启 dv 的表, 可以应用其他的 value filter, 因此也就可以使用索引机制了.
Append Table DV support

除此之外, Paimon 还使用 deletion vector 实现了对 Append 表的删除
append 表的删除可以类比 iceberg 的实现, 根据输入数据, 构建删除的 deletion vector, 从而实现 append 表的删除逻辑.
  1.   if (deletionVectorsEnabled) {
  2.         // Step2: collect all the deletion vectors that marks the deleted rows.
  3.         val deletionVectors = collectDeletionVectors(
  4.           candidateDataSplits,
  5.           dataFilePathToMeta,
  6.           condition,
  7.           relation,
  8.           sparkSession)
  9.         deletionVectors.cache()
  10.         try {
  11.           // Step3: write these updated data
  12.           val touchedDataSplits = deletionVectors.collect().map {
  13.                 SparkDeletionVectors.toDataSplit(_, root, pathFactory, dataFilePathToMeta)
  14.           }
  15.           val addCommitMessage = writeOnlyUpdatedData(sparkSession, touchedDataSplits)
  16.           // Step4: write these deletion vectors.
  17.           val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
  18.           addCommitMessage ++ indexCommitMsg
  19.         } finally {
  20.           deletionVectors.unpersist()
  21.         }
  22.   } else {
复制代码

  • 通过 filter 过滤, 先拿到 update 或 delete 语句大概影响的 split
  • 构建 Reader 读取, 读取的 plan 额外添加 Metadata column, __paimon_file_path 和 __paimon_row_index , 这两个是上面 deletion vector 构建的依赖元信息
  • 根据 update 输入构建 deletion vector (indexCommitMsg), 根据 update 输出构建addCommitMsg

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

圆咕噜咕噜

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表