ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink Connector 写入 Iceberg 流程源码剖析_confluent icebergsinkconnect [打印本页]

作者: 络腮胡菲菲    时间: 2025-2-17 20:06
标题: Flink Connector 写入 Iceberg 流程源码剖析_confluent icebergsinkconnect
  1.   // 添加 Writer 算子,有并行度
  2.   SingleOutputStreamOperator<WriteResult> writerStream =
  3.       appendWriter(distributeStream, flinkRowType, equalityFieldIds);
  4.   // 添加 Commit 算子,并行度固定为 1
  5.   SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
  6.   // 添加 sink
  7.   return appendDummySink(committerStream);
  8. }
复制代码
  1. ### appendWriter 方法
复制代码
private SingleOutputStreamOperator appendWriter(
DataStream input, RowType flinkRowType, List equalityFieldIds) {
  1. ....
  2.   if (flinkWriteConf.upsertMode()) {
  3.     if (!table.spec().isUnpartitioned()) {
  4.         // 在 upser 模式下检查分区建必须在 equalityFieldIds 中
  5.       for (PartitionField partitionField : table.spec().fields()) {
  6.         Preconditions.checkState(
  7.             equalityFieldIds.contains(partitionField.sourceId()),
  8.             "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
  9.             partitionField,
  10.             equalityFieldColumns);
  11.       }
  12.     }
  13.   }
  14.   // 创建 streamWriter
  15.   IcebergStreamWriter<RowData> streamWriter =
  16.       createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds);
  17.   // 设置并行度 如果没有指定则和输入流的并行度一样
  18.   int parallelism =
  19.       flinkWriteConf.writeParallelism() == null
  20.           ? input.getParallelism()
  21.           : flinkWriteConf.writeParallelism();
  22.   ....
  23.   return writerStream;
  24. }
复制代码
  1. ### createStreamWriter 方法
复制代码
static IcebergStreamWriter createStreamWriter(
Table table,
FlinkWriteConf flinkWriteConf,
RowType flinkRowType,
List equalityFieldIds) {

  1. Table serializableTable = SerializableTable.copyOf(table);
  2. FileFormat format = flinkWriteConf.dataFileFormat();
  3. // 创建 TaskWriterFactory 根据 表的 Schema 创建对应的 Writer
  4. TaskWriterFactory<RowData> taskWriterFactory =
  5.     new RowDataTaskWriterFactory(
  6.         serializableTable,
  7.         flinkRowType,
  8.         flinkWriteConf.targetDataFileSize(),
  9.         format,
  10.         writeProperties(table, format, flinkWriteConf),
  11.         equalityFieldIds,
  12.         flinkWriteConf.upsertMode());
  13. // 新建 IcebergStreamWriter
  14. return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
复制代码
}
  1. ### IcebergStreamWriter 类
  2. 该类为一个 Flink 内部的 OneInputStreamOperator 类,拥有 Flink 算子相关特性
复制代码
class IcebergStreamWriter extends AbstractStreamOperator
implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {

@Override
public void open() {

// 初始化相关监控类
this.writerMetrics = new IcebergStreamWriterMetrics(super.metrics, fullTableName);
  1. // 初始化 taskWriterFactory 用于创建 writer
  2. this.taskWriterFactory.initialize(subTaskId, attemptId);
  3. // 创建 writer
  4.   // 主要分成四类
  5.   // 根据 Iceberg 表是否有分区和开启Upsert模式
  6.   // UnpartitionedWriter : 无分区 insert only
  7.   // RowDataPartitionedFanoutWriter : 分区 insert only
  8.   // UnpartitionedDeltaWriter :无分区 Upsert
  9.   // PartitionedDeltaWriter :有分区 Upsert
  10. this.writer = taskWriterFactory.create();
复制代码
}
@Override
public void processElement(StreamRecord element) throws Exception {
// 处置惩罚数据写入
writer.write(element.getValue());
}

// 将本次写入数据文件下发至 Commit 进行统一提交
private void flush() throws IOException {
if (writer == null) {
return;
}
  1. long startNano = System.nanoTime();
  2. WriteResult result = writer.complete();
  3. writerMetrics.updateFlushResult(result);
  4. output.collect(new StreamRecord<>(result));
  5. writerMetrics.flushDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));
  6. writer = null;
复制代码
}
}
  1. ### IcebergFilesCommitter 类
复制代码
class IcebergFilesCommitter extends AbstractStreamOperator
implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {

@Override
public void initializeState(StateInitializationContext context) throws Exception {
  1. // 最大连续空提交
  2. // 在间断指定次数 Checkpoint 都没有数据后才真正触发 Commit,生成 Snapshot。
  3.   // 减少空快照生成
  4. maxContinuousEmptyCommits =
  5.     PropertyUtil.propertyAsInt(table.properties(), MAX\_CONTINUOUS\_EMPTY\_COMMITS, 10);
  6. // 创建 文件输出 OutputFileFactory
  7. this.manifestOutputFileFactory =
  8.     FlinkManifestUtil.createOutputFileFactory(
  9.         table, flinkJobId, operatorUniqueId, subTaskId, attemptId);
复制代码
if (context.isRestored()) {

// 从状态中恢复未提交的数据文件
NavigableMap<Long, byte[]> uncommittedDataFiles =
Maps.newTreeMap(checkpointsState.get().iterator().next())
.tailMap(maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {
// Committed all uncommitted data files from the old flink job to iceberg table.
long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
// 如果存在未提交的文件 进行提交
commitUpToCheckpoint(
uncommittedDataFiles, restoredFlinkJobId, operatorUniqueId, maxUncommittedCheckpointId);
}
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
  1.   // 将 checkpointId 对应的写入完成的DATA-FILE生成清单文件并放入 dataFilesPerCheckpoint
  2. dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
  3. // Reset the snapshot state to the latest state.
  4. checkpointsState.clear();
  5.   // 存入状态
  6. checkpointsState.add(dataFilesPerCheckpoint);
  7. jobIdState.clear();
  8. jobIdState.add(flinkJobId);
  9. // Clear the local buffer for current checkpoint.
  10. writeResultsOfCurrentCkpt.clear();
  11. committerMetrics.checkpointDuration(
  12.     TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));
复制代码
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {

if (checkpointId > maxCommittedCheckpointId) {
LOG.info(“Checkpoint {} completed. Attempting commit.”, checkpointId);
// 完成 checkpoint 对数据进行 COMMIT
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
LOG.info(
“Skipping committing checkpoint {}. {} is already committed.”,
checkpointId,
maxCommittedCheckpointId);
}
}
private void commitUpToCheckpoint(
NavigableMap<Long, byte[]> deltaManifestsMap,
String newFlinkJobId,
String operatorId,
long checkpointId)
throws IOException {
NavigableMap<Long, byte[]> pendingMap =
// 获取等候提交的数据文件
deltaManifestsMap.headMap(checkpointId, true);
List manifests = Lists.newArrayList();
NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
// 数据文件为空则跳过
if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
continue;
}
  1.   DeltaManifests deltaManifests =
  2.       SimpleVersionedSerialization.readVersionAndDeSerialize(
  3.           DeltaManifestsSerializer.INSTANCE, e.getValue());
  4.   pendingResults.put(
  5.       e.getKey(),
  6.       FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()));
  7.   manifests.addAll(deltaManifests.manifests());
  8. }
  9. // 获取当前待提交文件的 数据条数及数据文件大小
  10. CommitSummary summary = new CommitSummary(pendingResults);
  11. // 提交数据
  12. commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
  13. committerMetrics.updateCommitSummary(summary);
  14. pendingMap.clear();
  15. // 清除已提交数据
  16. deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
复制代码
}
private void commitPendingResult(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {

continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
// 数据文件不问 0 或者 一连最大空提交到达了配置的参数阈值触发提交
if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
if (replacePartitions) {
// replace 提交
// 利用 newReplacePartitions()
replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
} else {
// 普通提交
// 利用 newAppend()
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
}
continuousEmptyCheckpoints = 0;

}

private void replacePartitions(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {
Preconditions.checkState(
summary.deleteFilesCount() == 0, “Cannot overwrite partitions with delete files.”);
// 利用 newReplacePartitions 提交
ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool);
for (WriteResult result : pendingResults.values()) {
Preconditions.checkState(
result.referencedDataFiles().length == 0, “Should have no referenced data files.”);
Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
}

}
private void commitDeltaTxn(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {
if (summary.deleteFilesCount() == 0) {
// To be compatible with iceberg format V1.
AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool);
for (WriteResult result : pendingResults.values()) {
Preconditions.checkState(
result.referencedDataFiles().length == 0,
“Should have no referenced data files for append.”);
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
commitOperation(appendFiles, summary, “append”, newFlinkJobId, operatorId, checkpointId);
} else {
// To be compatible with iceberg format V2.
for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
// We don’t commit the merged result into a single transaction because for the sequential
// transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied
// to data files from txn1. Committing the merged one will lead to the incorrect delete
// semantic.
WriteResult result = e.getValue();
  1.     RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
  2.         // 分别写入 DataFile 和 Delete File
  3.     Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
  4.     Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
  5.     commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey());
  6.   }
  7. }
复制代码
}
private void commitOperation(
SnapshotUpdate<?> operation,
CommitSummary summary,
String description,
String newFlinkJobId,
String operatorId,
long checkpointId) {

// 提交操纵
operation.commit(); // abort is automatically called if this fails.

committerMetrics.commitDuration(durationMs);
}
@Override
public void processElement(StreamRecord element) throws Exception{
final WriteResult value = element.getValue();
if (“DDL”.equalsIgnoreCase(value.getType())) {
this.writeResultsOfCurrentCkpt.add(element.getValue());
doCommit();
} else {
this.writeResultsOfCurrentCkpt.add(element.getValue());
}
}

}
[code][/code]
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4