[SPARK-56125][SQL] Simplify schema calculation for Merge Into Schema Evolution#55124
[SPARK-56125][SQL] Simplify schema calculation for Merge Into Schema Evolution#55124szehon-ho wants to merge 2 commits intoapache:masterfrom
Conversation
389ffac to
5637683
Compare
| // * INSERT (a, nested.b) VALUES (source.a, source.nested.b) | ||
| // New columns/nested fields in this schema that are not existing in target schema | ||
| // will be added for schema evolution. | ||
| def sourceSchemaForSchemaEvolution(merge: MergeIntoTable): StructType = { |
There was a problem hiding this comment.
This was the most complicated method that did many recursive calls, so I agree we should avoid it.
The idea that I had was compatible with changes that Johan did and could look like this:
private def computeSchemaChanges(merge: MergeIntoTable): Seq[TableChange] = {
val actions = merge.matchedActions ++ merge.notMatchedActions
val assignments = actions.flatMap {
case a: UpdateAction => a.assignments
case a: InsertAction => a.assignments
case _ => Seq.empty
}
val changes = new mutable.HashSet[TableChange]()
assignments.foreach {
case a if isFieldAdditionCandidate(a, merge) =>
val fieldPath = extractFieldPath(a.key)
changes += TableChange.addColumn(fieldPath.toArray, a.value.dataType)
case a if a.resolved && a.key.dataType != a.value.dataType =>
changes ++= ResolveSchemaEvolution.computeSchemaChanges(
a.key.dataType,
a.value.dataType,
merge.targetTable.schema,
merge.sourceTable.schema,
fieldPath = extractFieldPath(a.key),
isByName = true)
case _ =>
// OK
}
changes.toSeq
}
private def extractFieldPath(expr: Expression): Seq[String] = {
expr match {
case UnresolvedAttribute(nameParts) => nameParts
case a: AttributeReference => Seq(a.name)
case Alias(child, _) => extractFieldPath(child)
case GetStructField(child, ordinal, nameOpt) =>
extractFieldPath(child) :+ nameOpt.getOrElse(s"col$ordinal")
case _ => Seq.empty
}
}
private def areSchemaEvolutionReady(
assignments: Seq[Assignment],
merge: MergeIntoTable): Boolean = {
assignments.forall(assign => assign.resolved || isFieldAdditionCandidate(assign, merge))
}
// TODO: clean up and add doc
private def isFieldAdditionCandidate(
assignment: Assignment,
merge: MergeIntoTable): Boolean = {
val key = assignment.key
val keyPath = extractFieldPath(key)
val value = assignment.value
val valuePath = extractFieldPath(value)
!key.resolved &&
value.resolved &&
keyPath == valuePath &&
assignment.value.references.subsetOf(merge.sourceTable.outputSet) &&
merge.targetTable.resolve(keyPath, SQLConf.get.resolver).isEmpty
}
There was a problem hiding this comment.
With that said, yours may be better. Let me explore it in more detail.
|
chat offline with @aokolnychyi , we want to check one scenario still behaves the same before and after this refactor. so added the test in a separate pr so it runs before : #55173 and add it here too to verify it runs after. |
| }) | ||
|
|
||
| filterSchema(merge.sourceTable.schema, Seq.empty) | ||
| val evolutionPaths = assignments |
There was a problem hiding this comment.
I wonder why can't we do it the other way around, though.
- Find all isSameColumn assignments (compares key and value column paths that must match)
- Iterate over assignments
- If the key is unresolved and target table doesn't have the column yet, add it.
- If the key and value are resolved but types differ, recursively call
ResolveSchemaEvolution?
|
I put an alternate pr here: #55302 based on @aokolnychyi 's above comment to compare. Its longer but maybe cleaner (it explicitly shows the two paths- missing or existing column) |
johanl-db
left a comment
There was a problem hiding this comment.
My vote goes to the approach in this PR, the alternative in https://github.com/apache/spark/pull/55302/changes is a lot less intuitive in my opinion.
Collect all schema changes + filter after the fact is a simpler model.
On the other hand, going over all assignments and collecting all schema changes that an assignment triggers (both at its level and fields nested under it) and deduplicating across all assignments is more complex, e.p. due to the split recursion: computeSchemaChanges recurses into nested fields to find schema changes, and assignments are themselves already nested (in a sense) with e.g. both a and a.x both overlapping
059b966 to
e815446
Compare
What changes were proposed in this pull request?
Simplifies the schema evolution logic for MERGE INTO in two ways:
Replace pre-filtering source schema with post-filtering schema changes: Instead of building a pruned source schema (
sourceSchemaForSchemaEvolution) and then diffing it against the target, we now compute all schema diffs against the full source schema and filter the resultingTableChangeobjects to only those whose field paths fall under identity assignment paths. This eliminates the recursivefilterSchemafunction, theisEqualhelper (which was a duplicate ofisSameColumnAssignment), and theisPrefixhelper.Replace
originalTarget/originalSourcethreading withonIncompatiblecallback inResolveSchemaEvolution.computeSchemaChanges: The original target and source schemas were passed through every recursive call (7+ call sites across 3 methods) but only used once in the error case. A by-nameerrorparameter replaces both, reducing parameter count and improving readability.Also removes dead code: the
containsStarActioncheck insourceSchemaForSchemaEvolutionwas unreachable becauseactionsSchemaEvolutionReadyreturnsfalsefor star actions (viacase _ => false), preventingpendingSchemaChangesfrom callingsourceSchemaForSchemaEvolutionuntilResolveReferenceshas expanded all star actions.Why are the changes needed?
Code simplification and dead code removal. The existing logic was harder to follow due to the recursive schema pruning approach and redundant parameter threading.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests (the logic is semantically equivalent).