[AURON #2183] Implement native support for ORC InsertIntoHiveTable writes#2191
[AURON #2183] Implement native support for ORC InsertIntoHiveTable writes#2191weimingdiit wants to merge 1 commit intoapache:masterfrom
Conversation
4682ae0 to
65f7ae7
Compare
| Shims.get.createNativeParquetInsertIntoHiveTableExec(cmd, sortedChild) | ||
| Shims.get.createNativeParquetInsertIntoHiveTableExec(cmd, sortInsertChild(cmd, child)) | ||
|
|
||
| case DataWritingCommandExec(cmd: InsertIntoHiveTable, child) |
There was a problem hiding this comment.
Currently Auron only has auron.enable.data.writing to control whether writing is converted to Native, but it is not enabled for different formats. It is recommended to add it for separate control.
There was a problem hiding this comment.
Good point. I changed the write gating to support separate format-level controls on top of the existing global spark.auron.enable.data.writing switch. The converter now checks spark.auron.enable.data.writing.parquet and spark.auron.enable.data.writing.orc before converting InsertIntoHiveTable.
65f7ae7 to
bc8c720
Compare
…ble writes Signed-off-by: weimingdiit <weimingdiit@gmail.com>
bc8c720 to
28946b0
Compare
There was a problem hiding this comment.
Pull request overview
Implements native-engine execution for Hive InsertIntoHiveTable writes targeting ORC tables, reducing fallbacks to Spark’s non-native write path.
Changes:
- Added Spark-side ORC native write physical operators (native ORC insert + ORC sink) and planner conversion support.
- Added native-engine ORC sink execution plan plus proto/planner/runtime/JNI wiring.
- Added Hive-focused execution tests to cover ORC insert conversion and execution (including dynamic partitions).
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcSinkBase.scala | Spark-side native ORC sink node building a native plan with ORC properties + schema. |
| spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcInsertIntoHiveTableBase.scala | Base Spark exec for ORC InsertIntoHiveTable rewrite using a dummy output format and per-task context. |
| spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala | Allows converting columnar children via executeColumnar() to avoid casting failures. |
| spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala | Accepts mixed InternalRow / ColumnarBatch input and normalizes to row iteration. |
| spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala | Adds shims hooks for native ORC insert + ORC sink creation. |
| spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala | Adds ORC write plan conversion, ORC write schema/type support checks, and per-format write toggles. |
| spark-extension/src/main/java/org/apache/spark/sql/execution/auron/plan/NativeOrcSinkUtils.java | JNI entrypoints for task output-path handoff and completion stats reporting. |
| spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java | Adds config toggles for Parquet/ORC data writing conversions. |
| spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronHiveExecSuite.scala | Adds conversion + execution tests for native ORC InsertIntoHiveTable (dynamic/static partitions). |
| spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala | Minor formatting-only change. |
| spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronHiveSQLSuite.scala | Introduces Hive-enabled base test suite configuration. |
| spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcSinkExec.scala | Spark shims exec wrapper for NativeOrcSinkBase. |
| spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcInsertIntoHiveTableExec.scala | Spark shims exec wrapper for ORC InsertIntoHiveTable across Spark versions. |
| spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala | Wires shims implementations for native ORC insert + ORC sink. |
| native-engine/datafusion-ext-plans/src/orc_sink_exec.rs | Implements native ORC sink execution (dynamic partitioning, schema adaptation, metrics, JNI hooks). |
| native-engine/datafusion-ext-plans/src/lib.rs | Exposes orc_sink_exec module. |
| native-engine/auron/src/rt.rs | Excludes ORC sink exec from output stream coalescing. |
| native-engine/auron-planner/src/planner.rs | Adds planner mapping from proto to OrcSinkExec. |
| native-engine/auron-planner/proto/auron.proto | Adds OrcSinkExecNode + OrcProp proto definitions. |
| native-engine/auron-jni-bridge/src/jni_bridge.rs | Registers Java class/method bindings for NativeOrcSinkUtils. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| }, | ||
| }; | ||
| use datafusion_ext_commons::{ | ||
| arrow::{array_size::BatchSize, cast::cast}, |
There was a problem hiding this comment.
BatchSize is imported but never used, which will generate an unused-import warning (and can fail builds if warnings are denied). Remove the array_size::BatchSize import or use it if intended.
| arrow::{array_size::BatchSize, cast::cast}, | |
| arrow::cast::cast, |
| class OrcSinkTaskContext { | ||
| var isNative: Boolean = false | ||
| val processingOutputFiles = new LinkedBlockingDeque[String]() | ||
| val processedOutputFiles = new util.ArrayDeque[OutputFileStat]() |
There was a problem hiding this comment.
processedOutputFiles is a plain java.util.ArrayDeque, but it is mutated from both the native side (via NativeOrcSinkUtils.completeOutput) and the Spark task thread (via stats tracker remove() calls). ArrayDeque is not thread-safe; use a concurrent queue/deque (e.g., ConcurrentLinkedDeque/LinkedBlockingDeque) or add synchronization around push/remove to avoid racy failures under load.
| val processedOutputFiles = new util.ArrayDeque[OutputFileStat]() | |
| val processedOutputFiles = new LinkedBlockingDeque[OutputFileStat]() |
| object OrcSinkTaskContext { | ||
| private val instances = mutable.Map[Long, OrcSinkTaskContext]() | ||
|
|
||
| def get: OrcSinkTaskContext = { | ||
| val taskId = TaskContext.get.taskAttemptId() | ||
| instances.getOrElseUpdate( | ||
| taskId, { | ||
| TaskContext.get().addTaskCompletionListener(_ => instances.remove(taskId)) | ||
| new OrcSinkTaskContext | ||
| }) | ||
| } |
There was a problem hiding this comment.
OrcSinkTaskContext.instances is a global mutable.Map accessed/updated from multiple Spark tasks concurrently, but it is not synchronized. This can race (e.g., concurrent getOrElseUpdate / remove) and lead to incorrect context reuse or runtime exceptions. Prefer a concurrent map (scala.collection.concurrent.TrieMap / ConcurrentHashMap) or use TaskContext.getLocalProperty/ThreadLocal for per-task storage.
Which issue does this PR close?
Closes #2183
Rationale for this change
Auron already supports native Parquet InsertIntoHiveTable writes, but ORC Hive writes still fall back to Spark’s regular execution path. This leaves native write coverage incomplete for a common Hive storage format.
This PR adds native support for ORC InsertIntoHiveTable writes so eligible Hive ORC write workloads can stay on the native path instead of falling back.
What changes are included in this PR?
This PR:
Are there any user-facing changes?
Yes.
Hive table writes using ORC may now remain on the native execution path when they match the supported InsertIntoHiveTable write pattern, instead of falling back to Spark’s regular write execution.
How was this patch tested?
CI.