}
private void commitPendingResult(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {
…
continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
// 数据文件不问 0 或者 一连最大空提交到达了配置的参数阈值触发提交
if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
if (replacePartitions) {
// replace 提交
// 利用 newReplacePartitions()
replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
} else {
// 普通提交
// 利用 newAppend()
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
}
continuousEmptyCheckpoints = 0;
…
}
…
private void replacePartitions(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {
Preconditions.checkState(
summary.deleteFilesCount() == 0, “Cannot overwrite partitions with delete files.”);
// 利用 newReplacePartitions 提交
ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool);
for (WriteResult result : pendingResults.values()) {
Preconditions.checkState(
result.referencedDataFiles().length == 0, “Should have no referenced data files.”);
Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
}
…
}
private void commitDeltaTxn(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {
if (summary.deleteFilesCount() == 0) {
// To be compatible with iceberg format V1.
AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool);
for (WriteResult result : pendingResults.values()) {
Preconditions.checkState(
result.referencedDataFiles().length == 0,
“Should have no referenced data files for append.”);
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
commitOperation(appendFiles, summary, “append”, newFlinkJobId, operatorId, checkpointId);
} else {
// To be compatible with iceberg format V2.
for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
// We don’t commit the merged result into a single transaction because for the sequential
// transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied
// to data files from txn1. Committing the merged one will lead to the incorrect delete
// semantic.
WriteResult result = e.getValue();