Search before asking
What happened
IcebergAggregatedCommitter.commitFiles() iterates over each worker's IcebergCommitInfo
and calls filesCommitter.doCommit() separately, resulting in N Iceberg snapshots
per checkpoint (where N = parallelism), instead of a single atomic snapshot.
Root cause (IcebergAggregatedCommitter.java:62):
private void commitFiles(List<IcebergCommitInfo> commitInfos) {
for (IcebergCommitInfo icebergCommitInfo : commitInfos) {
...
filesCommitter.doCommit(icebergCommitInfo.getResults()); // called per worker
}
}
Each doCommit() calls table.newAppend().commit() independently.
Problems caused by this:
-
No atomicity per checkpoint: If commit fails midway (e.g., after worker[0,1] succeed but worker[2] fails), the Iceberg table contains partial data visible to readers.
-
Duplicates on restore: preCommit() on pipeline restore re-commits files already committed in the previous checkpoint, causing duplicate rows in Iceberg.
-
Unnecessary snapshot proliferation: N snapshots per checkpoint instead of 1, increasing Iceberg metadata size proportionally to parallelism.
Expected behavior:
All workers' DataFiles/DeleteFiles should be merged into a single
AppendFiles (or RowDelta) operation per checkpoint, producing 1 snapshot:
private void commitFiles(List<IcebergCommitInfo> commitInfos) {
List<WriteResult> allResults =
commitInfos.stream()
.filter(ci -> ci.getResults() != null && !ci.getResults().isEmpty())
.flatMap(ci -> ci.getResults().stream())
.collect(toList());
if (!allResults.isEmpty()) {
filesCommitter.doCommit(allResults);
}
}
This matches how Flink's IcebergFilesCommitter works.
SeaTunnel Version
2.3.12
SeaTunnel Config
Running Command
This is a code-level bug, not reproducible via a specific command.
The issue can be verified by inspecting Iceberg table snapshots after
a job run with parallelism > 1:
SELECT * FROM <catalog>.<db>.<table>.snapshots;
-- Expected: 1 snapshot per checkpoint
-- Actual: N snapshots per checkpoint (N = parallelism)
Error Exception
No exception thrown. This is a silent data correctness bug.
Partial commits and duplicate data occur without any error log.
Zeta or Flink or Spark Version
Zeta
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
Code of Conduct
Search before asking
What happened
IcebergAggregatedCommitter.commitFiles() iterates over each worker's IcebergCommitInfo
and calls filesCommitter.doCommit() separately, resulting in N Iceberg snapshots
per checkpoint (where N = parallelism), instead of a single atomic snapshot.
Root cause (IcebergAggregatedCommitter.java:62):
Each doCommit() calls table.newAppend().commit() independently.
Problems caused by this:
No atomicity per checkpoint: If commit fails midway (e.g., after worker[0,1] succeed but worker[2] fails), the Iceberg table contains partial data visible to readers.
Duplicates on restore: preCommit() on pipeline restore re-commits files already committed in the previous checkpoint, causing duplicate rows in Iceberg.
Unnecessary snapshot proliferation: N snapshots per checkpoint instead of 1, increasing Iceberg metadata size proportionally to parallelism.
Expected behavior:
All workers' DataFiles/DeleteFiles should be merged into a single
AppendFiles (or RowDelta) operation per checkpoint, producing 1 snapshot:
This matches how Flink's IcebergFilesCommitter works.
SeaTunnel Version
2.3.12
SeaTunnel Config
Running Command
Error Exception
Zeta or Flink or Spark Version
Zeta
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
Code of Conduct