Skip to content

[AURON #2183] Implement native support for ORC InsertIntoHiveTable writes#2191

Open
weimingdiit wants to merge 1 commit intoapache:masterfrom
weimingdiit:feat/orc-sink_native_iceberg
Open

[AURON #2183] Implement native support for ORC InsertIntoHiveTable writes#2191
weimingdiit wants to merge 1 commit intoapache:masterfrom
weimingdiit:feat/orc-sink_native_iceberg

Conversation

@weimingdiit
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2183

Rationale for this change

Auron already supports native Parquet InsertIntoHiveTable writes, but ORC Hive writes still fall back to Spark’s regular execution path. This leaves native write coverage incomplete for a common Hive storage format.

This PR adds native support for ORC InsertIntoHiveTable writes so eligible Hive ORC write workloads can stay on the native path instead of falling back.

What changes are included in this PR?

This PR:

  • adds native ORC sink support in the native engine
  • adds planner / proto support for ORC sink execution
  • adds Spark-side physical plan support for native ORC InsertIntoHiveTable
  • extends AuronConverters to convert supported Hive ORC write plans to the native path
  • adds ORC sink utilities for task output path generation and output completion
  • preserves dynamic partition write handling on the native ORC write path
  • adapts input batches to the expected ORC/Hive output schema before writing
  • records output row and byte metrics for native ORC writes
  • adds execution coverage in AuronExecSuite

Are there any user-facing changes?

Yes.

Hive table writes using ORC may now remain on the native execution path when they match the supported InsertIntoHiveTable write pattern, instead of falling back to Spark’s regular write execution.

How was this patch tested?

CI.

@weimingdiit weimingdiit force-pushed the feat/orc-sink_native_iceberg branch 4 times, most recently from 4682ae0 to 65f7ae7 Compare April 13, 2026 05:27
Shims.get.createNativeParquetInsertIntoHiveTableExec(cmd, sortedChild)
Shims.get.createNativeParquetInsertIntoHiveTableExec(cmd, sortInsertChild(cmd, child))

case DataWritingCommandExec(cmd: InsertIntoHiveTable, child)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently Auron only has auron.enable.data.writing to control whether writing is converted to Native, but it is not enabled for different formats. It is recommended to add it for separate control.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I changed the write gating to support separate format-level controls on top of the existing global spark.auron.enable.data.writing switch. The converter now checks spark.auron.enable.data.writing.parquet and spark.auron.enable.data.writing.orc before converting InsertIntoHiveTable.

@weimingdiit weimingdiit force-pushed the feat/orc-sink_native_iceberg branch from 65f7ae7 to bc8c720 Compare April 13, 2026 09:44
…ble writes

Signed-off-by: weimingdiit <weimingdiit@gmail.com>
@weimingdiit weimingdiit force-pushed the feat/orc-sink_native_iceberg branch from bc8c720 to 28946b0 Compare April 13, 2026 11:20
@weimingdiit weimingdiit marked this pull request as ready for review April 13, 2026 12:31
@cxzl25 cxzl25 requested a review from Copilot April 14, 2026 04:24
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements native-engine execution for Hive InsertIntoHiveTable writes targeting ORC tables, reducing fallbacks to Spark’s non-native write path.

Changes:

  • Added Spark-side ORC native write physical operators (native ORC insert + ORC sink) and planner conversion support.
  • Added native-engine ORC sink execution plan plus proto/planner/runtime/JNI wiring.
  • Added Hive-focused execution tests to cover ORC insert conversion and execution (including dynamic partitions).

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcSinkBase.scala Spark-side native ORC sink node building a native plan with ORC properties + schema.
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcInsertIntoHiveTableBase.scala Base Spark exec for ORC InsertIntoHiveTable rewrite using a dummy output format and per-task context.
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala Allows converting columnar children via executeColumnar() to avoid casting failures.
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala Accepts mixed InternalRow / ColumnarBatch input and normalizes to row iteration.
spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala Adds shims hooks for native ORC insert + ORC sink creation.
spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala Adds ORC write plan conversion, ORC write schema/type support checks, and per-format write toggles.
spark-extension/src/main/java/org/apache/spark/sql/execution/auron/plan/NativeOrcSinkUtils.java JNI entrypoints for task output-path handoff and completion stats reporting.
spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java Adds config toggles for Parquet/ORC data writing conversions.
spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronHiveExecSuite.scala Adds conversion + execution tests for native ORC InsertIntoHiveTable (dynamic/static partitions).
spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala Minor formatting-only change.
spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronHiveSQLSuite.scala Introduces Hive-enabled base test suite configuration.
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcSinkExec.scala Spark shims exec wrapper for NativeOrcSinkBase.
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcInsertIntoHiveTableExec.scala Spark shims exec wrapper for ORC InsertIntoHiveTable across Spark versions.
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala Wires shims implementations for native ORC insert + ORC sink.
native-engine/datafusion-ext-plans/src/orc_sink_exec.rs Implements native ORC sink execution (dynamic partitioning, schema adaptation, metrics, JNI hooks).
native-engine/datafusion-ext-plans/src/lib.rs Exposes orc_sink_exec module.
native-engine/auron/src/rt.rs Excludes ORC sink exec from output stream coalescing.
native-engine/auron-planner/src/planner.rs Adds planner mapping from proto to OrcSinkExec.
native-engine/auron-planner/proto/auron.proto Adds OrcSinkExecNode + OrcProp proto definitions.
native-engine/auron-jni-bridge/src/jni_bridge.rs Registers Java class/method bindings for NativeOrcSinkUtils.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

},
};
use datafusion_ext_commons::{
arrow::{array_size::BatchSize, cast::cast},
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchSize is imported but never used, which will generate an unused-import warning (and can fail builds if warnings are denied). Remove the array_size::BatchSize import or use it if intended.

Suggested change
arrow::{array_size::BatchSize, cast::cast},
arrow::cast::cast,

Copilot uses AI. Check for mistakes.
class OrcSinkTaskContext {
var isNative: Boolean = false
val processingOutputFiles = new LinkedBlockingDeque[String]()
val processedOutputFiles = new util.ArrayDeque[OutputFileStat]()
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processedOutputFiles is a plain java.util.ArrayDeque, but it is mutated from both the native side (via NativeOrcSinkUtils.completeOutput) and the Spark task thread (via stats tracker remove() calls). ArrayDeque is not thread-safe; use a concurrent queue/deque (e.g., ConcurrentLinkedDeque/LinkedBlockingDeque) or add synchronization around push/remove to avoid racy failures under load.

Suggested change
val processedOutputFiles = new util.ArrayDeque[OutputFileStat]()
val processedOutputFiles = new LinkedBlockingDeque[OutputFileStat]()

Copilot uses AI. Check for mistakes.
Comment on lines +185 to +195
object OrcSinkTaskContext {
private val instances = mutable.Map[Long, OrcSinkTaskContext]()

def get: OrcSinkTaskContext = {
val taskId = TaskContext.get.taskAttemptId()
instances.getOrElseUpdate(
taskId, {
TaskContext.get().addTaskCompletionListener(_ => instances.remove(taskId))
new OrcSinkTaskContext
})
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrcSinkTaskContext.instances is a global mutable.Map accessed/updated from multiple Spark tasks concurrently, but it is not synchronized. This can race (e.g., concurrent getOrElseUpdate / remove) and lead to incorrect context reuse or runtime exceptions. Prefer a concurrent map (scala.collection.concurrent.TrieMap / ConcurrentHashMap) or use TaskContext.getLocalProperty/ThreadLocal for per-task storage.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement native support for ORC InsertIntoHiveTable writes

3 participants