Skip to content

Commit c3f04d6

Browse files
committed
[SPARK-56125][SQL] Simplify schema calculation for Merge Into Schema Evolution
1 parent d580b65 commit c3f04d6

2 files changed

Lines changed: 59 additions & 119 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala

Lines changed: 30 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -98,64 +98,48 @@ object ResolveSchemaEvolution extends Rule[LogicalPlan] {
9898
}
9999

100100
/**
101-
* Computes the set of table changes needed to evolve `originalTarget` schema
102-
* to accommodate `originalSource` schema. When `isByName` is true, fields are matched
101+
* Computes the set of table changes needed to evolve `target` schema
102+
* to accommodate `source` schema. When `isByName` is true, fields are matched
103103
* by name. When false, fields are matched by position.
104104
*/
105105
def computeSchemaChanges(
106-
originalTarget: StructType,
107-
originalSource: StructType,
106+
target: StructType,
107+
source: StructType,
108108
isByName: Boolean): Array[TableChange] =
109109
computeSchemaChanges(
110-
originalTarget,
111-
originalSource,
112-
originalTarget,
113-
originalSource,
110+
target,
111+
source,
114112
fieldPath = Nil,
115-
isByName)
113+
isByName,
114+
error = throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError(
115+
target, source, null))
116116

117117
private def computeSchemaChanges(
118118
currentType: DataType,
119119
newType: DataType,
120-
originalTarget: StructType,
121-
originalSource: StructType,
122120
fieldPath: List[String],
123-
isByName: Boolean): Array[TableChange] = {
121+
isByName: Boolean,
122+
error: => Nothing): Array[TableChange] = {
124123
(currentType, newType) match {
125124
case (StructType(currentFields), StructType(newFields)) =>
126125
if (isByName) {
127-
computeSchemaChangesByName(
128-
currentFields, newFields, originalTarget, originalSource, fieldPath)
126+
computeSchemaChangesByName(currentFields, newFields, fieldPath, error)
129127
} else {
130-
computeSchemaChangesByPosition(
131-
currentFields, newFields, originalTarget, originalSource, fieldPath)
128+
computeSchemaChangesByPosition(currentFields, newFields, fieldPath, error)
132129
}
133130

134131
case (ArrayType(currentElementType, _), ArrayType(newElementType, _)) =>
135132
computeSchemaChanges(
136-
currentElementType,
137-
newElementType,
138-
originalTarget,
139-
originalSource,
140-
fieldPath :+ "element",
141-
isByName)
142-
143-
case (MapType(currentKeyType, currentValueType, _),
144-
MapType(newKeyType, newValueType, _)) =>
133+
currentElementType, newElementType,
134+
fieldPath :+ "element", isByName, error)
135+
136+
case (MapType(currentKeyType, currentValueType, _), MapType(newKeyType, newValueType, _)) =>
145137
val keyChanges = computeSchemaChanges(
146-
currentKeyType,
147-
newKeyType,
148-
originalTarget,
149-
originalSource,
150-
fieldPath :+ "key",
151-
isByName)
138+
currentKeyType, newKeyType,
139+
fieldPath :+ "key", isByName, error)
152140
val valueChanges = computeSchemaChanges(
153-
currentValueType,
154-
newValueType,
155-
originalTarget,
156-
originalSource,
157-
fieldPath :+ "value",
158-
isByName)
141+
currentValueType, newValueType,
142+
fieldPath :+ "value", isByName, error)
159143
keyChanges ++ valueChanges
160144

161145
case (currentType: AtomicType, newType: AtomicType) if currentType != newType =>
@@ -167,8 +151,7 @@ object ResolveSchemaEvolution extends Rule[LogicalPlan] {
167151

168152
case _ =>
169153
// Do not support change between atomic and complex types for now
170-
throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError(
171-
originalTarget, originalSource, null)
154+
error
172155
}
173156
}
174157

@@ -179,9 +162,8 @@ object ResolveSchemaEvolution extends Rule[LogicalPlan] {
179162
private def computeSchemaChangesByName(
180163
currentFields: Array[StructField],
181164
newFields: Array[StructField],
182-
originalTarget: StructType,
183-
originalSource: StructType,
184-
fieldPath: List[String]): Array[TableChange] = {
165+
fieldPath: List[String],
166+
onIncompatible: => Nothing): Array[TableChange] = {
185167
val currentFieldMap = toFieldMap(currentFields)
186168
val newFieldMap = toFieldMap(newFields)
187169

@@ -190,12 +172,8 @@ object ResolveSchemaEvolution extends Rule[LogicalPlan] {
190172
.filter(f => newFieldMap.contains(f.name))
191173
.flatMap { f =>
192174
computeSchemaChanges(
193-
f.dataType,
194-
newFieldMap(f.name).dataType,
195-
originalTarget,
196-
originalSource,
197-
fieldPath :+ f.name,
198-
isByName = true)
175+
f.dataType, newFieldMap(f.name).dataType,
176+
fieldPath :+ f.name, isByName = true, onIncompatible)
199177
}
200178

201179
// Collect newly added fields
@@ -213,18 +191,13 @@ object ResolveSchemaEvolution extends Rule[LogicalPlan] {
213191
private def computeSchemaChangesByPosition(
214192
currentFields: Array[StructField],
215193
newFields: Array[StructField],
216-
originalTarget: StructType,
217-
originalSource: StructType,
218-
fieldPath: List[String]): Array[TableChange] = {
194+
fieldPath: List[String],
195+
onIncompatible: => Nothing): Array[TableChange] = {
219196
// Update existing field types by pairing fields at the same position.
220197
val updates = currentFields.zip(newFields).flatMap { case (currentField, newField) =>
221198
computeSchemaChanges(
222-
currentField.dataType,
223-
newField.dataType,
224-
originalTarget,
225-
originalSource,
226-
fieldPath :+ currentField.name,
227-
isByName = false)
199+
currentField.dataType, newField.dataType,
200+
fieldPath :+ currentField.name, isByName = false, onIncompatible)
228201
}
229202

230203
// Extra source fields beyond the target's field count are new additions.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 29 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,9 +1041,10 @@ case class MergeIntoTable(
10411041

10421042
override lazy val pendingSchemaChanges: Seq[TableChange] = {
10431043
if (schemaEvolutionEnabled && schemaEvolutionReady) {
1044-
val referencedSourceSchema = MergeIntoTable.sourceSchemaForSchemaEvolution(this)
1045-
ResolveSchemaEvolution.computeSchemaChanges(
1046-
targetTable.schema, referencedSourceSchema, isByName = true).toSeq
1044+
val allChanges = ResolveSchemaEvolution.computeSchemaChanges(
1045+
targetTable.schema, sourceTable.schema, isByName = true)
1046+
MergeIntoTable.filterValidSchemaEvolution(
1047+
allChanges, matchedActions ++ notMatchedActions, sourceTable)
10471048
} else {
10481049
Seq.empty
10491050
}
@@ -1097,52 +1098,36 @@ object MergeIntoTable {
10971098
.toSet
10981099
}
10991100

1100-
// A pruned version of source schema that only contains columns/nested fields
1101-
// explicitly and directly assigned to a target counterpart in MERGE INTO actions,
1102-
// which are relevant for schema evolution.
1103-
// Examples:
1104-
// * UPDATE SET target.a = source.a
1105-
// * UPDATE SET nested.a = source.nested.a
1106-
// * INSERT (a, nested.b) VALUES (source.a, source.nested.b)
1107-
// New columns/nested fields in this schema that are not existing in target schema
1108-
// will be added for schema evolution.
1109-
def sourceSchemaForSchemaEvolution(merge: MergeIntoTable): StructType = {
1110-
val actions = merge.matchedActions ++ merge.notMatchedActions
1101+
/**
1102+
* Filters schema changes to only those relevant to identity assignments
1103+
* (e.g. `target.x = source.x`) in the MERGE actions. Only identity assignments can
1104+
* introduce new columns or type changes via schema evolution.
1105+
*
1106+
* A schema change is kept if its field path is equal to or nested under the key path
1107+
* of an identity assignment.
1108+
*/
1109+
private def filterValidSchemaEvolution(
1110+
changes: Array[TableChange],
1111+
actions: Seq[MergeAction],
1112+
source: LogicalPlan): Seq[TableChange] = {
11111113
val assignments = actions.collect {
11121114
case a: UpdateAction => a.assignments
11131115
case a: InsertAction => a.assignments
11141116
}.flatten
11151117

1116-
val containsStarAction = actions.exists {
1117-
case _: UpdateStarAction => true
1118-
case _: InsertStarAction => true
1119-
case _ => false
1120-
}
1121-
1122-
def filterSchema(sourceSchema: StructType, basePath: Seq[String]): StructType =
1123-
StructType(sourceSchema.flatMap { field =>
1124-
val fieldPath = basePath :+ field.name
1125-
1126-
field.dataType match {
1127-
// Specifically assigned to in one clause:
1128-
// always keep, including all nested attributes
1129-
case _ if assignments.exists(isEqual(_, fieldPath)) => Some(field)
1130-
// If this is a struct and one of the children is being assigned to in a merge clause,
1131-
// keep it and continue filtering children.
1132-
case struct: StructType if assignments.exists(assign =>
1133-
isPrefix(fieldPath, extractFieldPath(assign.key, allowUnresolved = true))) =>
1134-
Some(field.copy(dataType = filterSchema(struct, fieldPath)))
1135-
// The field isn't assigned to directly or indirectly (i.e. its children) in any non-*
1136-
// clause. Check if it should be kept with any * action.
1137-
case struct: StructType if containsStarAction =>
1138-
Some(field.copy(dataType = filterSchema(struct, fieldPath)))
1139-
case _ if containsStarAction => Some(field)
1140-
// The field and its children are not assigned to in any * or non-* action, drop it.
1141-
case _ => None
1142-
}
1143-
})
1144-
1145-
filterSchema(merge.sourceTable.schema, Seq.empty)
1118+
val evolutionPaths = assignments
1119+
.filter(isSameColumnAssignment(_, source))
1120+
.map(a => extractFieldPath(a.key, allowUnresolved = true))
1121+
.filter(_.nonEmpty)
1122+
1123+
val resolver = SQLConf.get.resolver
1124+
changes.filter { case change: TableChange.ColumnChange =>
1125+
val changePath = change.fieldNames().toSeq
1126+
evolutionPaths.exists { ep =>
1127+
ep.length <= changePath.length &&
1128+
ep.zip(changePath).forall { case (a, b) => resolver(a, b) }
1129+
}
1130+
}.toSeq
11461131
}
11471132

11481133
// Helper method to extract field path from an Expression.
@@ -1156,24 +1141,6 @@ object MergeIntoTable {
11561141
}
11571142
}
11581143

1159-
// Helper method to check if a given field path is a prefix of another path.
1160-
private def isPrefix(prefix: Seq[String], path: Seq[String]): Boolean =
1161-
prefix.length <= path.length && prefix.zip(path).forall {
1162-
case (prefixNamePart, pathNamePart) =>
1163-
SQLConf.get.resolver(prefixNamePart, pathNamePart)
1164-
}
1165-
1166-
// Helper method to check if an assignment key is equal to a source column
1167-
// and if the assignment value is that same source column.
1168-
// Example: UPDATE SET target.a = source.a
1169-
private def isEqual(assignment: Assignment, sourceFieldPath: Seq[String]): Boolean = {
1170-
// key must be a non-qualified field path that may be added to target schema via evolution
1171-
val assignmentKeyExpr = extractFieldPath(assignment.key, allowUnresolved = true)
1172-
// value should always be resolved (from source)
1173-
val assignmentValueExpr = extractFieldPath(assignment.value, allowUnresolved = false)
1174-
assignmentKeyExpr == assignmentValueExpr && assignmentKeyExpr == sourceFieldPath
1175-
}
1176-
11771144
private def areSchemaEvolutionReady(
11781145
assignments: Seq[Assignment],
11791146
source: LogicalPlan): Boolean = {

0 commit comments

Comments
 (0)