Skip to content

Commit 0cbfeed

Browse files
authored
[AURON #2163] Support native Iceberg scans with residual filters via scan pruning and post-scan native filter (#2164)
# Which issue does this PR close? Closes #2163 # Rationale for this change The previous behavior was too conservative for Iceberg scans with residual filters. Even when the scan could still be executed natively and the remaining filter logic could be handled above the scan, the planner would fall back entirely. This PR improves native coverage for Iceberg reads by: - preserving correctness for unsupported predicates - increasing native scan applicability for common filter patterns - reusing the existing native filter path instead of requiring full scan-level predicate support up front This is an incremental improvement to Iceberg native execution, not full Iceberg feature coverage. # What changes are included in this PR? This PR: - removes the unconditional fallback for Iceberg scans with non-`alwaysTrue` residual filters - extends `IcebergScanPlan` to carry `pruningPredicates` - extracts Iceberg scan filter expressions and converts a supported subset into Spark expressions - converts those Spark expressions into native scan pruning predicates - passes pruning predicates down through `NativeIcebergTableScanExec` - keeps unsupported predicates on the upper `NativeFilter` path - adds integration coverage for: - equality-based pruning - `IN`-based pruning - partial pushdown where only part of the predicate is pushed to scan pruning ## Supported predicate scope in this PR The scan-pruning conversion added here supports a limited subset of Iceberg expressions, including: - `AND` - `OR` - `NOT` - `IS NULL` - `IS NOT NULL` - `IS NAN` - `NOT NAN` - comparison predicates such as `=`, `!=`, `<`, `<=`, `>`, `>=` - `IN` - `NOT IN` The current implementation intentionally avoids pushing some types through scan pruning, including: - `StringType` - `BinaryType` - `DecimalType` Unsupported predicates are not pushed into scan pruning and are instead left for post-scan native filtering. # How was this patch tested? Integration coverage was added in `AuronIcebergIntegrationSuite`
1 parent f251bbb commit 0cbfeed

4 files changed

Lines changed: 326 additions & 18 deletions

File tree

spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,8 @@ object AuronConverters extends Logging {
11021102
case _: NativeParquetScanBase | _: NativeOrcScanBase | _: NativeHiveTableScanBase |
11031103
_: NativeUnionBase =>
11041104
true
1105+
case exec if exec.nodeName == "NativeIcebergTableScan" =>
1106+
true
11051107
case _: ConvertToNativeBase => needRenameColumns(plan.children.head)
11061108
case exec if NativeHelper.isNative(exec) =>
11071109
NativeHelper.getUnderlyingNativePlan(exec).output != plan.output

thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala

Lines changed: 248 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,22 @@ import scala.collection.JavaConverters._
2020
import scala.util.control.NonFatal
2121

2222
import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns}
23-
import org.apache.iceberg.expressions.Expressions
23+
import org.apache.iceberg.expressions.{And => IcebergAnd, BoundPredicate, Expression => IcebergExpression, Not => IcebergNot, Or => IcebergOr, UnboundPredicate}
2424
import org.apache.spark.internal.Logging
2525
import org.apache.spark.sql.auron.NativeConverters
26+
import org.apache.spark.sql.catalyst.expressions.{And => SparkAnd, AttributeReference, EqualTo, Expression => SparkExpression, GreaterThan, GreaterThanOrEqual, In, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not => SparkNot, Or => SparkOr}
2627
import org.apache.spark.sql.connector.read.InputPartition
2728
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
28-
import org.apache.spark.sql.types.StructType
29+
import org.apache.spark.sql.internal.SQLConf
30+
import org.apache.spark.sql.types.{BinaryType, DataType, DecimalType, StringType, StructField, StructType}
31+
32+
import org.apache.auron.{protobuf => pb}
2933

3034
final case class IcebergScanPlan(
3135
fileTasks: Seq[FileScanTask],
3236
fileFormat: FileFormat,
33-
readSchema: StructType)
37+
readSchema: StructType,
38+
pruningPredicates: Seq[pb.PhysicalExprNode])
3439

3540
object IcebergScanSupport extends Logging {
3641

@@ -61,7 +66,7 @@ object IcebergScanSupport extends Logging {
6166
// Empty scan (e.g. empty table) should still build a plan to return no rows.
6267
if (partitions.isEmpty) {
6368
logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.")
64-
return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema))
69+
return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema, Seq.empty))
6570
}
6671

6772
val icebergPartitions = partitions.flatMap(icebergPartition)
@@ -77,11 +82,6 @@ object IcebergScanSupport extends Logging {
7782
return None
7883
}
7984

80-
// Residual filters require row-level evaluation, not supported in native scan.
81-
if (!fileTasks.forall(task => Expressions.alwaysTrue().equals(task.residual()))) {
82-
return None
83-
}
84-
8585
// Native scan handles a single file format; mixed formats must fallback.
8686
val formats = fileTasks.map(_.file().format()).distinct
8787
if (formats.size > 1) {
@@ -93,7 +93,9 @@ object IcebergScanSupport extends Logging {
9393
return None
9494
}
9595

96-
Some(IcebergScanPlan(fileTasks, format, readSchema))
96+
val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema)
97+
98+
Some(IcebergScanPlan(fileTasks, format, readSchema, pruningPredicates))
9799
}
98100

99101
private def hasMetadataColumns(schema: StructType): Boolean =
@@ -188,4 +190,240 @@ object IcebergScanSupport extends Logging {
188190
None
189191
}
190192
}
193+
194+
private def collectPruningPredicates(
195+
scan: AnyRef,
196+
readSchema: StructType): Seq[pb.PhysicalExprNode] = {
197+
scanFilterExpressions(scan).flatMap { expr =>
198+
convertIcebergFilterExpression(expr, readSchema) match {
199+
case Some(converted) =>
200+
Some(NativeConverters.convertScanPruningExpr(converted))
201+
case None =>
202+
logDebug(s"Skip unsupported Iceberg pruning expression: $expr")
203+
None
204+
}
205+
}
206+
}
207+
208+
private def scanFilterExpressions(scan: AnyRef): Seq[IcebergExpression] = {
209+
invokeDeclaredMethod(scan, "filterExpressions") match {
210+
case Some(values: java.util.Collection[_]) =>
211+
values.asScala.collect { case expr: IcebergExpression => expr }.toSeq
212+
case Some(values: Seq[_]) =>
213+
values.collect { case expr: IcebergExpression => expr }
214+
case _ =>
215+
Seq.empty
216+
}
217+
}
218+
219+
private def invokeDeclaredMethod(target: AnyRef, methodName: String): Option[Any] = {
220+
try {
221+
var cls: Class[_] = target.getClass
222+
while (cls != null) {
223+
cls.getDeclaredMethods.find(_.getName == methodName) match {
224+
case Some(method) =>
225+
method.setAccessible(true)
226+
return Some(method.invoke(target))
227+
case None =>
228+
cls = cls.getSuperclass
229+
}
230+
}
231+
None
232+
} catch {
233+
case NonFatal(t) =>
234+
logDebug(s"Failed to invoke $methodName on ${target.getClass.getName}.", t)
235+
None
236+
}
237+
}
238+
239+
private def convertIcebergFilterExpression(
240+
expr: IcebergExpression,
241+
readSchema: StructType): Option[SparkExpression] = {
242+
expr match {
243+
case and: IcebergAnd =>
244+
for {
245+
left <- convertIcebergFilterExpression(and.left(), readSchema)
246+
right <- convertIcebergFilterExpression(and.right(), readSchema)
247+
} yield SparkAnd(left, right)
248+
case or: IcebergOr =>
249+
for {
250+
left <- convertIcebergFilterExpression(or.left(), readSchema)
251+
right <- convertIcebergFilterExpression(or.right(), readSchema)
252+
} yield SparkOr(left, right)
253+
case not: IcebergNot =>
254+
convertIcebergFilterExpression(not.child(), readSchema).map(SparkNot)
255+
case predicate: UnboundPredicate[_] =>
256+
convertUnboundPredicate(predicate, readSchema)
257+
case predicate: BoundPredicate[_] =>
258+
convertBoundPredicate(predicate, readSchema)
259+
case _ =>
260+
expr.op() match {
261+
case org.apache.iceberg.expressions.Expression.Operation.TRUE =>
262+
Some(Literal(true))
263+
case org.apache.iceberg.expressions.Expression.Operation.FALSE =>
264+
Some(Literal(false))
265+
case _ =>
266+
None
267+
}
268+
}
269+
}
270+
271+
private def convertUnboundPredicate(
272+
predicate: UnboundPredicate[_],
273+
readSchema: StructType): Option[SparkExpression] = {
274+
findField(predicate.ref().name(), readSchema).flatMap { field =>
275+
val attr = toAttribute(field)
276+
val op = predicate.op()
277+
278+
op match {
279+
case org.apache.iceberg.expressions.Expression.Operation.IS_NULL =>
280+
Some(IsNull(attr))
281+
case org.apache.iceberg.expressions.Expression.Operation.NOT_NULL =>
282+
Some(IsNotNull(attr))
283+
case org.apache.iceberg.expressions.Expression.Operation.IS_NAN =>
284+
Some(IsNaN(attr))
285+
case org.apache.iceberg.expressions.Expression.Operation.NOT_NAN =>
286+
Some(SparkNot(IsNaN(attr)))
287+
case org.apache.iceberg.expressions.Expression.Operation.IN =>
288+
convertInPredicate(
289+
attr,
290+
field.dataType,
291+
predicate.literals().asScala.map(_.value()).toSeq)
292+
case org.apache.iceberg.expressions.Expression.Operation.NOT_IN =>
293+
convertInPredicate(
294+
attr,
295+
field.dataType,
296+
predicate.literals().asScala.map(_.value()).toSeq).map(SparkNot)
297+
case _ =>
298+
convertBinaryPredicate(attr, field.dataType, op, predicate.literal().value())
299+
}
300+
}
301+
}
302+
303+
private def convertBoundPredicate(
304+
predicate: BoundPredicate[_],
305+
readSchema: StructType): Option[SparkExpression] = {
306+
findField(predicate.ref().name(), readSchema).flatMap { field =>
307+
val attr = toAttribute(field)
308+
val op = predicate.op()
309+
310+
if (predicate.isUnaryPredicate()) {
311+
op match {
312+
case org.apache.iceberg.expressions.Expression.Operation.IS_NULL =>
313+
Some(IsNull(attr))
314+
case org.apache.iceberg.expressions.Expression.Operation.NOT_NULL =>
315+
Some(IsNotNull(attr))
316+
case org.apache.iceberg.expressions.Expression.Operation.IS_NAN =>
317+
Some(IsNaN(attr))
318+
case org.apache.iceberg.expressions.Expression.Operation.NOT_NAN =>
319+
Some(SparkNot(IsNaN(attr)))
320+
case _ =>
321+
None
322+
}
323+
} else if (predicate.isLiteralPredicate()) {
324+
val literalValue = predicate.asLiteralPredicate().literal().value()
325+
op match {
326+
case _ =>
327+
convertBinaryPredicate(attr, field.dataType, op, literalValue)
328+
}
329+
} else if (predicate.isSetPredicate()) {
330+
val values = predicate.asSetPredicate().literalSet().asScala.toSeq
331+
op match {
332+
case org.apache.iceberg.expressions.Expression.Operation.IN =>
333+
convertInPredicate(attr, field.dataType, values)
334+
case org.apache.iceberg.expressions.Expression.Operation.NOT_IN =>
335+
convertInPredicate(attr, field.dataType, values).map(SparkNot)
336+
case _ =>
337+
None
338+
}
339+
} else {
340+
None
341+
}
342+
}
343+
}
344+
345+
private def convertBinaryPredicate(
346+
attr: AttributeReference,
347+
dataType: DataType,
348+
op: org.apache.iceberg.expressions.Expression.Operation,
349+
literalValue: Any): Option[SparkExpression] = {
350+
if (!supportsScanPruningLiteralType(dataType)) {
351+
return None
352+
}
353+
toLiteral(literalValue, dataType).flatMap { literal =>
354+
op match {
355+
case org.apache.iceberg.expressions.Expression.Operation.EQ =>
356+
Some(EqualTo(attr, literal))
357+
case org.apache.iceberg.expressions.Expression.Operation.NOT_EQ =>
358+
Some(SparkNot(EqualTo(attr, literal)))
359+
case org.apache.iceberg.expressions.Expression.Operation.LT =>
360+
Some(LessThan(attr, literal))
361+
case org.apache.iceberg.expressions.Expression.Operation.LT_EQ =>
362+
Some(LessThanOrEqual(attr, literal))
363+
case org.apache.iceberg.expressions.Expression.Operation.GT =>
364+
Some(GreaterThan(attr, literal))
365+
case org.apache.iceberg.expressions.Expression.Operation.GT_EQ =>
366+
Some(GreaterThanOrEqual(attr, literal))
367+
case _ =>
368+
None
369+
}
370+
}
371+
}
372+
373+
private def convertInPredicate(
374+
attr: AttributeReference,
375+
dataType: DataType,
376+
values: Seq[Any]): Option[SparkExpression] = {
377+
if (!supportsScanPruningLiteralType(dataType)) {
378+
return None
379+
}
380+
val literals = values.map(toLiteral(_, dataType))
381+
if (literals.forall(_.nonEmpty)) {
382+
Some(In(attr, literals.flatten))
383+
} else {
384+
None
385+
}
386+
}
387+
388+
private def supportsScanPruningLiteralType(dataType: DataType): Boolean = {
389+
dataType match {
390+
case StringType | BinaryType => false
391+
case _: DecimalType => false
392+
case _ => true
393+
}
394+
}
395+
396+
private def toLiteral(value: Any, dataType: DataType): Option[Literal] = {
397+
if (value == null) {
398+
return Some(Literal.create(null, dataType))
399+
}
400+
dataType match {
401+
case _: DecimalType =>
402+
None
403+
case BinaryType =>
404+
value match {
405+
case bytes: Array[Byte] =>
406+
Some(Literal(bytes, BinaryType))
407+
case byteBuffer: java.nio.ByteBuffer =>
408+
val duplicated = byteBuffer.duplicate()
409+
val bytes = new Array[Byte](duplicated.remaining())
410+
duplicated.get(bytes)
411+
Some(Literal(bytes, BinaryType))
412+
case _ =>
413+
None
414+
}
415+
case StringType =>
416+
Some(Literal.create(value.toString, StringType))
417+
case _ =>
418+
Some(Literal.create(value, dataType))
419+
}
420+
}
421+
422+
private def toAttribute(field: StructField): AttributeReference =
423+
AttributeReference(field.name, field.dataType, nullable = true)()
424+
425+
private def findField(name: String, readSchema: StructType): Option[StructField] = {
426+
val resolver = SQLConf.get.resolver
427+
readSchema.fields.find(field => resolver(field.name, name))
428+
}
191429
}

thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
5959

6060
private lazy val readSchema: StructType = plan.readSchema
6161
private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks
62+
private lazy val pruningPredicates: Seq[pb.PhysicalExprNode] = plan.pruningPredicates
6263

6364
private lazy val partitions: Array[FilePartition] = buildFilePartitions()
6465
private lazy val fileSizes: Map[String, Long] = buildFileSizes()
@@ -166,8 +167,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
166167
.newBuilder()
167168
.setBaseConf(nativeFileScanConf)
168169
.setFsResourceId(resourceId)
169-
// No pruning predicates are pushed down in the native scan yet.
170-
.addAllPruningPredicates(new java.util.ArrayList())
170+
.addAllPruningPredicates(pruningPredicates.asJava)
171171

172172
pb.PhysicalPlanNode
173173
.newBuilder()
@@ -178,8 +178,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
178178
.newBuilder()
179179
.setBaseConf(nativeFileScanConf)
180180
.setFsResourceId(resourceId)
181-
// No pruning predicates are pushed down in the native scan yet.
182-
.addAllPruningPredicates(new java.util.ArrayList())
181+
.addAllPruningPredicates(pruningPredicates.asJava)
183182

184183
pb.PhysicalPlanNode
185184
.newBuilder()

0 commit comments

Comments
 (0)