Skip to content

[Bug][Connector-V2] IcebergAggregatedCommitter commits per worker instead of single atomic commit per checkpoint #10710

@suhyeon729

Description

@suhyeon729

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

IcebergAggregatedCommitter.commitFiles() iterates over each worker's IcebergCommitInfo
and calls filesCommitter.doCommit() separately, resulting in N Iceberg snapshots
per checkpoint (where N = parallelism), instead of a single atomic snapshot.

Root cause (IcebergAggregatedCommitter.java:62):

  private void commitFiles(List<IcebergCommitInfo> commitInfos) {                                                                  
      for (IcebergCommitInfo icebergCommitInfo : commitInfos) {            
          ...                                                      
          filesCommitter.doCommit(icebergCommitInfo.getResults()); // called per worker                                            
      }                                                                                                                            
  }  

Each doCommit() calls table.newAppend().commit() independently.

Problems caused by this:

  1. No atomicity per checkpoint: If commit fails midway (e.g., after worker[0,1] succeed but worker[2] fails), the Iceberg table contains partial data visible to readers.

  2. Duplicates on restore: preCommit() on pipeline restore re-commits files already committed in the previous checkpoint, causing duplicate rows in Iceberg.

  3. Unnecessary snapshot proliferation: N snapshots per checkpoint instead of 1, increasing Iceberg metadata size proportionally to parallelism.

Expected behavior:
All workers' DataFiles/DeleteFiles should be merged into a single
AppendFiles (or RowDelta) operation per checkpoint, producing 1 snapshot:

    private void commitFiles(List<IcebergCommitInfo> commitInfos) {
        List<WriteResult> allResults =
                commitInfos.stream()
                        .filter(ci -> ci.getResults() != null && !ci.getResults().isEmpty())
                        .flatMap(ci -> ci.getResults().stream())
                        .collect(toList());
        if (!allResults.isEmpty()) {
            filesCommitter.doCommit(allResults);
        }
    }
                                                                        

This matches how Flink's IcebergFilesCommitter works.

SeaTunnel Version

2.3.12

SeaTunnel Config

-

Running Command

This is a code-level bug, not reproducible via a specific command.                                                               
The issue can be verified by inspecting Iceberg table snapshots after                                                            
a job run with parallelism > 1:                                                                                                  
                                                                                                                                   
SELECT * FROM <catalog>.<db>.<table>.snapshots;                                                                                  
-- Expected: 1 snapshot per checkpoint                                                                                         
-- Actual:   N snapshots per checkpoint (N = parallelism)

Error Exception

No exception thrown. This is a silent data correctness bug.                                                                      
  Partial commits and duplicate data occur without any error log.

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions