Skip to content

Commit 193b5cb

Browse files
authored
[AURON #2175] Add native support for the _file metadata column (#2184)
# Which issue does this PR close? Closes #2175 # Rationale for this change This PR adds native support for Iceberg metadata columns in Auron, starting with `_file`. Previously, Iceberg scans fell back whenever metadata columns were projected. With this change, queries that read `_file` can remain on the native Iceberg scan path. Iceberg metadata columns are useful in real workloads for debugging, lineage, and inspection queries. However, Auron previously treated metadata columns as unsupported and fell back to Spark. This PR improves native Iceberg scan coverage by supporting metadata columns that can be represented as file-level constant values, while still falling back for unsupported row-level metadata columns. # What changes are included in this PR? This PR: - adds native support for the Iceberg `_file` metadata column - keeps unsupported metadata columns such as `_pos` on the fallback path - extends `IcebergScanPlan` to distinguish between: - file-backed data columns - metadata columns materialized outside the file payload - updates `IcebergScanSupport` to stop rejecting all metadata columns unconditionally - passes supported metadata values through the native Iceberg scan path as per-file constant values - updates `NativeIcebergTableScanExec` to project both normal data columns and supported metadata columns - adds integration tests in `AuronIcebergIntegrationSuite` # Scope of support in this PR This PR intentionally takes a conservative approach. Supported in native scan: - `_file` Still falls back: - `_pos` - other unsupported metadata columns that require row-level metadata handling # Why this design? `_file` is a file-level metadata column: every row coming from the same file shares the same value. That makes it a good fit for the existing native file-scan path by treating it as a per-file constant column. In contrast, `_pos` is row-level metadata and cannot be represented correctly with the same mechanism, so it remains unsupported in native execution for now. # How was this patch tested? CI. --------- Signed-off-by: weimingdiit <weimingdiit@gmail.com>
1 parent 0cbfeed commit 193b5cb

5 files changed

Lines changed: 136 additions & 25 deletions

File tree

spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
358358
val err = intercept[Exception] {
359359
df.collect()
360360
}
361-
assert(err.getMessage.contains("null map keys"))
361+
assert(allCauseMessages(err).toLowerCase.contains("null map keys"))
362362
val plan = stripAQEPlan(df.queryExecution.executedPlan)
363363
plan
364364
.collectFirst { case op if !isNativeOrPassThrough(op) => op }
@@ -406,7 +406,7 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
406406
|plan:
407407
|${plan}""".stripMargin)
408408
}
409-
assert(err.getMessage.toLowerCase.contains("null map key"))
409+
assert(allCauseMessages(err).toLowerCase.contains("null map key"))
410410
}
411411
}
412412

@@ -430,10 +430,20 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
430430
|plan:
431431
|${plan}""".stripMargin)
432432
}
433-
assert(err.getMessage.toLowerCase.contains("duplicate key"))
433+
assert(allCauseMessages(err).toLowerCase.contains("duplicate key"))
434434
}
435435
}
436436

437+
private def allCauseMessages(err: Throwable): String = {
438+
val messages = scala.collection.mutable.ArrayBuffer.empty[String]
439+
var current = err
440+
while (current != null) {
441+
Option(current.getMessage).foreach(messages += _)
442+
current = current.getCause
443+
}
444+
messages.mkString(" | caused by: ")
445+
}
446+
437447
test("map_from_entries last win dedup policy") {
438448
withTable("t1") {
439449
sql("create table t1(c1 array<struct<k:string,v:int>>) using parquet")

spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,9 @@ object NativeConverters extends Logging {
469469
.setReturnNullable(subquery.nullable))
470470
}
471471

472+
case expr if isNoOpAnsiCast(expr) =>
473+
convertExprWithFallback(expr.children.head, isPruningExpr, fallback)
474+
472475
// cast
473476
case cast: Cast =>
474477
val involvesDateOrTimestamp =
@@ -1439,6 +1442,11 @@ object NativeConverters extends Logging {
14391442
Cast(expr, dataType)
14401443
}
14411444

1445+
private def isNoOpAnsiCast(expr: Expression): Boolean =
1446+
expr.getClass.getSimpleName == "AnsiCast" &&
1447+
expr.children.size == 1 &&
1448+
expr.children.head.dataType == expr.dataType
1449+
14421450
def unpackBinaryTypeCast(expr: Expression): Expression =
14431451
expr match {
14441452
case expr: Cast if expr.dataType == BinaryType => expr.child

thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,14 @@ import org.apache.spark.sql.types.{BinaryType, DataType, DecimalType, StringType
3131

3232
import org.apache.auron.{protobuf => pb}
3333

34+
// fileSchema is read from the data files. partitionSchema carries supported metadata columns
35+
// (for example _file) that are materialized as per-file constant values in the native scan.
3436
final case class IcebergScanPlan(
3537
fileTasks: Seq[FileScanTask],
3638
fileFormat: FileFormat,
3739
readSchema: StructType,
40+
fileSchema: StructType,
41+
partitionSchema: StructType,
3842
pruningPredicates: Seq[pb.PhysicalExprNode])
3943

4044
object IcebergScanSupport extends Logging {
@@ -53,20 +57,39 @@ object IcebergScanSupport extends Logging {
5357
}
5458

5559
val readSchema = scan.readSchema
56-
// Native scan does not support Iceberg metadata columns (e.g. _file, _pos).
57-
if (hasMetadataColumns(readSchema)) {
60+
val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema)
61+
// Native scan can project file-level metadata columns such as _file via partition values.
62+
// Metadata columns that require per-row materialization (for example _pos) still fallback.
63+
if (unsupportedMetadataColumns.nonEmpty) {
5864
return None
5965
}
6066

61-
if (!readSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
67+
val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn))
68+
// Supported metadata columns are materialized via per-file constant values rather than
69+
// read from the Iceberg data file payload.
70+
val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn))
71+
72+
if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
73+
return None
74+
}
75+
76+
if (!partitionSchema.fields.forall(field =>
77+
NativeConverters.isTypeSupported(field.dataType))) {
6278
return None
6379
}
6480

6581
val partitions = inputPartitions(exec)
6682
// Empty scan (e.g. empty table) should still build a plan to return no rows.
6783
if (partitions.isEmpty) {
6884
logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.")
69-
return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema, Seq.empty))
85+
return Some(
86+
IcebergScanPlan(
87+
Seq.empty,
88+
FileFormat.PARQUET,
89+
readSchema,
90+
fileSchema,
91+
partitionSchema,
92+
Seq.empty))
7093
}
7194

7295
val icebergPartitions = partitions.flatMap(icebergPartition)
@@ -94,12 +117,26 @@ object IcebergScanSupport extends Logging {
94117
}
95118

96119
val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema)
97-
98-
Some(IcebergScanPlan(fileTasks, format, readSchema, pruningPredicates))
120+
Some(
121+
IcebergScanPlan(
122+
fileTasks,
123+
format,
124+
readSchema,
125+
fileSchema,
126+
partitionSchema,
127+
pruningPredicates))
99128
}
100129

101-
private def hasMetadataColumns(schema: StructType): Boolean =
102-
schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name))
130+
private def collectUnsupportedMetadataColumns(schema: StructType): Seq[String] =
131+
schema.fields.collect {
132+
case field
133+
if MetadataColumns.isMetadataColumn(field.name) &&
134+
!isSupportedMetadataColumn(field) =>
135+
field.name
136+
}
137+
138+
private def isSupportedMetadataColumn(field: org.apache.spark.sql.types.StructField): Boolean =
139+
field.name == MetadataColumns.FILE_PATH.name()
103140

104141
private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = {
105142
// Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection.

thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.util.UUID
2424
import scala.collection.JavaConverters._
2525

2626
import org.apache.hadoop.fs.FileSystem
27-
import org.apache.iceberg.{FileFormat, FileScanTask}
27+
import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns}
2828
import org.apache.spark.Partition
2929
import org.apache.spark.TaskContext
3030
import org.apache.spark.broadcast.Broadcast
@@ -33,13 +33,14 @@ import org.apache.spark.sql.SparkSession
3333
import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims}
3434
import org.apache.spark.sql.auron.iceberg.IcebergScanPlan
3535
import org.apache.spark.sql.catalyst.InternalRow
36+
import org.apache.spark.sql.catalyst.expressions.Literal
3637
import org.apache.spark.sql.execution.LeafExecNode
3738
import org.apache.spark.sql.execution.SparkPlan
3839
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
3940
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
4041
import org.apache.spark.sql.execution.metric.SQLMetric
4142
import org.apache.spark.sql.internal.SQLConf
42-
import org.apache.spark.sql.types.StructType
43+
import org.apache.spark.sql.types.{StringType, StructType}
4344
import org.apache.spark.util.SerializableConfiguration
4445

4546
import org.apache.auron.{protobuf => pb}
@@ -57,32 +58,37 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
5758
override val output = basedScan.output
5859
override val outputPartitioning = basedScan.outputPartitioning
5960

60-
private lazy val readSchema: StructType = plan.readSchema
61+
private lazy val fileSchema: StructType = plan.fileSchema
62+
private lazy val partitionSchema: StructType = plan.partitionSchema
63+
private lazy val projectableSchema: StructType =
64+
StructType(fileSchema.fields ++ partitionSchema.fields)
6165
private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks
6266
private lazy val pruningPredicates: Seq[pb.PhysicalExprNode] = plan.pruningPredicates
6367

6468
private lazy val partitions: Array[FilePartition] = buildFilePartitions()
6569
private lazy val fileSizes: Map[String, Long] = buildFileSizes()
6670

67-
private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(readSchema)
71+
private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema)
6872
private lazy val nativePartitionSchema: pb.Schema =
69-
NativeConverters.convertSchema(StructType(Nil))
73+
NativeConverters.convertSchema(partitionSchema)
7074

7175
private lazy val caseSensitive: Boolean = SQLConf.get.caseSensitiveAnalysis
7276

7377
private lazy val fieldIndexByName: Map[String, Int] = {
7478
if (caseSensitive) {
75-
readSchema.fieldNames.zipWithIndex.toMap
79+
projectableSchema.fieldNames.zipWithIndex.toMap
7680
} else {
77-
readSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap
81+
projectableSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap
7882
}
7983
}
8084

8185
private def fieldIndexFor(name: String): Int = {
8286
if (caseSensitive) {
83-
fieldIndexByName.getOrElse(name, readSchema.fieldIndex(name))
87+
fieldIndexByName.getOrElse(name, projectableSchema.fieldIndex(name))
8488
} else {
85-
fieldIndexByName.getOrElse(name.toLowerCase(Locale.ROOT), readSchema.fieldIndex(name))
89+
fieldIndexByName.getOrElse(
90+
name.toLowerCase(Locale.ROOT),
91+
projectableSchema.fieldIndex(name))
8692
}
8793
}
8894

@@ -99,6 +105,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
99105
.setPath(filePath)
100106
.setSize(size)
101107
.setLastModifiedNs(0)
108+
.addAllPartitionValues(metadataPartitionValues(filePath).asJava)
102109
.setRange(
103110
pb.FileRange
104111
.newBuilder()
@@ -113,6 +120,17 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
113120
.build()
114121
}
115122

123+
private def metadataPartitionValues(filePath: String): Seq[pb.ScalarValue] =
124+
partitionSchema.fields.map { field =>
125+
field.name match {
126+
case name if name == MetadataColumns.FILE_PATH.name() =>
127+
NativeConverters.convertExpr(Literal.create(filePath, StringType)).getLiteral
128+
case name =>
129+
throw new IllegalStateException(
130+
s"unsupported Iceberg metadata column in native scan: $name")
131+
}
132+
}
133+
116134
override def doExecuteNative(): NativeRDD = {
117135
if (partitions.isEmpty) {
118136
return new EmptyNativeRDD(sparkContext)

thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,36 @@ class AuronIcebergIntegrationSuite
207207
}
208208
}
209209

210-
test("iceberg scan falls back when reading metadata columns") {
210+
test("iceberg native scan supports _file metadata column") {
211211
withTable("local.db.t4") {
212212
sql("create table local.db.t4 using iceberg as select 1 as id, 'a' as v")
213-
val df = sql("select _file from local.db.t4")
214-
df.collect()
215-
val plan = df.queryExecution.executedPlan.toString()
216-
assert(!plan.contains("NativeIcebergTableScan"))
213+
checkSparkAnswerAndOperator("select _file from local.db.t4")
214+
}
215+
}
216+
217+
test("iceberg native scan supports data columns with _file metadata column") {
218+
withTable("local.db.t4_mixed") {
219+
sql("create table local.db.t4_mixed using iceberg as select 1 as id, 'a' as v")
220+
checkSparkAnswerAndOperator("select id, _file from local.db.t4_mixed")
221+
}
222+
}
223+
224+
test("iceberg native scan preserves projected order for _file metadata column") {
225+
withTable("local.db.t4_metadata_first") {
226+
sql("create table local.db.t4_metadata_first using iceberg as select 1 as id, 'a' as v")
227+
checkSparkAnswerAndOperator("select _file, id from local.db.t4_metadata_first")
228+
}
229+
}
230+
231+
test("iceberg scan falls back when reading unsupported metadata columns") {
232+
withTable("local.db.t4_pos") {
233+
sql("create table local.db.t4_pos using iceberg as select 1 as id, 'a' as v")
234+
withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true") {
235+
val df = sql("select _pos from local.db.t4_pos")
236+
df.collect()
237+
val plan = df.queryExecution.executedPlan.toString()
238+
assert(!plan.contains("NativeIcebergTableScan"))
239+
}
217240
}
218241
}
219242

@@ -303,6 +326,21 @@ class AuronIcebergIntegrationSuite
303326
}
304327
}
305328

329+
private def checkSparkAnswerAndOperator(sqlText: String): DataFrame = {
330+
var expected: Seq[Row] = Nil
331+
withSQLConf("spark.auron.enable" -> "false") {
332+
expected = sql(sqlText).collect().toSeq
333+
}
334+
335+
var df: DataFrame = null
336+
withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true") {
337+
df = sql(sqlText)
338+
checkAnswer(df, expected)
339+
val plan = df.queryExecution.executedPlan.toString()
340+
assert(plan.contains("NativeIcebergTableScan"))
341+
}
342+
df
343+
}
306344
private def icebergScanPlan(df: DataFrame) =
307345
df.queryExecution.sparkPlan.collectFirst { case scan: BatchScanExec =>
308346
IcebergScanSupport.plan(scan)

0 commit comments

Comments
 (0)