Skip to content

Commit 28946b0

Browse files
committed
[AURON #2183] Implement native support for ORC InsertIntoHiveTable writes
Signed-off-by: weimingdiit <weimingdiit@gmail.com>
1 parent 193b5cb commit 28946b0

File tree

20 files changed

+1855
-20
lines changed

20 files changed

+1855
-20
lines changed

native-engine/auron-jni-bridge/src/jni_bridge.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,7 @@ pub struct JavaClasses<'a> {
448448
pub cSparkUDAFMemTracker: SparkUDAFMemTracker<'a>,
449449
pub cAuronRssPartitionWriterBase: AuronRssPartitionWriterBase<'a>,
450450
pub cAuronOnHeapSpillManager: AuronOnHeapSpillManager<'a>,
451+
pub cAuronNativeOrcSinkUtils: AuronNativeOrcSinkUtils<'a>,
451452
pub cAuronNativeParquetSinkUtils: AuronNativeParquetSinkUtils<'a>,
452453
pub cAuronBlockObject: AuronBlockObject<'a>,
453454
pub cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper<'a>,
@@ -504,6 +505,7 @@ impl JavaClasses<'static> {
504505
c_spark_udaf_mem_tracker,
505506
c_auron_rss_partition_writer_base,
506507
c_auron_on_heap_spill_manager,
508+
c_auron_native_orc_sink_utils,
507509
c_auron_native_parquet_sink_utils,
508510
c_auron_block_object,
509511
c_auron_json_fallback_wrapper,
@@ -517,6 +519,7 @@ impl JavaClasses<'static> {
517519
SparkUDAFMemTracker::new(env)?,
518520
AuronRssPartitionWriterBase::new(env)?,
519521
AuronOnHeapSpillManager::new(env)?,
522+
AuronNativeOrcSinkUtils::new(env)?,
520523
AuronNativeParquetSinkUtils::new(env)?,
521524
AuronBlockObject::new(env)?,
522525
AuronJsonFallbackWrapper::new(env)?,
@@ -530,6 +533,7 @@ impl JavaClasses<'static> {
530533
SparkUDAFMemTracker::default(),
531534
AuronRssPartitionWriterBase::default(),
532535
AuronOnHeapSpillManager::default(),
536+
AuronNativeOrcSinkUtils::default(),
533537
AuronNativeParquetSinkUtils::default(),
534538
AuronBlockObject::default(),
535539
AuronJsonFallbackWrapper::default(),
@@ -568,6 +572,7 @@ impl JavaClasses<'static> {
568572
cSparkUDAFMemTracker: c_spark_udaf_mem_tracker,
569573
cAuronRssPartitionWriterBase: c_auron_rss_partition_writer_base,
570574
cAuronOnHeapSpillManager: c_auron_on_heap_spill_manager,
575+
cAuronNativeOrcSinkUtils: c_auron_native_orc_sink_utils,
571576
cAuronNativeParquetSinkUtils: c_auron_native_parquet_sink_utils,
572577
cAuronBlockObject: c_auron_block_object,
573578
cAuronJsonFallbackWrapper: c_auron_json_fallback_wrapper,
@@ -1603,6 +1608,42 @@ impl<'a> AuronNativeParquetSinkUtils<'a> {
16031608
}
16041609
}
16051610

1611+
#[allow(non_snake_case)]
1612+
pub struct AuronNativeOrcSinkUtils<'a> {
1613+
pub class: JClass<'a>,
1614+
pub method_getTaskOutputPath: JStaticMethodID,
1615+
pub method_getTaskOutputPath_ret: ReturnType,
1616+
pub method_completeOutput: JStaticMethodID,
1617+
pub method_completeOutput_ret: ReturnType,
1618+
}
1619+
impl<'a> AuronNativeOrcSinkUtils<'a> {
1620+
pub const SIG_TYPE: &'static str =
1621+
"org/apache/spark/sql/execution/auron/plan/NativeOrcSinkUtils";
1622+
1623+
pub fn new(env: &JNIEnv<'a>) -> JniResult<AuronNativeOrcSinkUtils<'a>> {
1624+
let class = get_global_jclass(env, Self::SIG_TYPE)?;
1625+
Ok(AuronNativeOrcSinkUtils {
1626+
class,
1627+
method_getTaskOutputPath: env.get_static_method_id(
1628+
class,
1629+
"getTaskOutputPath",
1630+
"()Ljava/lang/String;",
1631+
)?,
1632+
method_getTaskOutputPath_ret: ReturnType::Object,
1633+
method_completeOutput: env.get_static_method_id(
1634+
class,
1635+
"completeOutput",
1636+
"(Ljava/lang/String;JJ)V",
1637+
)?,
1638+
method_completeOutput_ret: ReturnType::Primitive(Primitive::Void),
1639+
})
1640+
}
1641+
1642+
fn default() -> Self {
1643+
unsafe { std::mem::zeroed() }
1644+
}
1645+
}
1646+
16061647
#[allow(non_snake_case)]
16071648
pub struct AuronBlockObject<'a> {
16081649
pub class: JClass<'a>,

native-engine/auron-planner/proto/auron.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ message PhysicalPlanNode {
5252
ParquetSinkExecNode parquet_sink = 24;
5353
OrcScanExecNode orc_scan = 25;
5454
KafkaScanExecNode kafka_scan = 26;
55+
OrcSinkExecNode orc_sink = 27;
5556
}
5657
}
5758

@@ -622,6 +623,19 @@ message ParquetProp {
622623
string value = 2;
623624
}
624625

626+
message OrcSinkExecNode {
627+
PhysicalPlanNode input = 1;
628+
string fs_resource_id = 2;
629+
int32 num_dyn_parts = 3;
630+
Schema schema = 4;
631+
repeated OrcProp prop = 5;
632+
}
633+
634+
message OrcProp {
635+
string key = 1;
636+
string value = 2;
637+
}
638+
625639
message IpcWriterExecNode {
626640
PhysicalPlanNode input = 1;
627641
string ipc_consumer_resource_id = 2;

native-engine/auron-planner/src/planner.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ use datafusion_ext_plans::{
7979
ipc_writer_exec::IpcWriterExec,
8080
limit_exec::LimitExec,
8181
orc_exec::OrcExec,
82+
orc_sink_exec::OrcSinkExec,
8283
parquet_exec::ParquetExec,
8384
parquet_sink_exec::ParquetSinkExec,
8485
project_exec::ProjectExec,
@@ -802,6 +803,19 @@ impl PhysicalPlanner {
802803
props,
803804
)))
804805
}
806+
PhysicalPlanType::OrcSink(orc_sink) => {
807+
let mut props: Vec<(String, String)> = vec![];
808+
for prop in &orc_sink.prop {
809+
props.push((prop.key.clone(), prop.value.clone()));
810+
}
811+
Ok(Arc::new(OrcSinkExec::new(
812+
convert_box_required!(self, orc_sink.input)?,
813+
orc_sink.fs_resource_id.clone(),
814+
orc_sink.num_dyn_parts as usize,
815+
Arc::new(convert_required!(orc_sink.schema)?),
816+
props,
817+
)))
818+
}
805819
PhysicalPlanType::KafkaScan(kafka_scan) => {
806820
let schema = Arc::new(convert_required!(kafka_scan.schema)?);
807821
if !kafka_scan.mock_data_json_array.is_empty() {

native-engine/auron/src/rt.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use datafusion_ext_commons::{df_execution_err, downcast_any};
4646
use datafusion_ext_plans::{
4747
common::execution_context::{ExecutionContext, cancel_all_tasks},
4848
ipc_writer_exec::IpcWriterExec,
49+
orc_sink_exec::OrcSinkExec,
4950
parquet_sink_exec::ParquetSinkExec,
5051
shuffle_writer_exec::ShuffleWriterExec,
5152
};
@@ -156,6 +157,7 @@ impl NativeExecutionRuntime {
156157

157158
// coalesce output stream if necessary
158159
if downcast_any!(execution_plan_cloned, EmptyExec).is_err()
160+
&& downcast_any!(execution_plan_cloned, OrcSinkExec).is_err()
159161
&& downcast_any!(execution_plan_cloned, ParquetSinkExec).is_err()
160162
&& downcast_any!(execution_plan_cloned, IpcWriterExec).is_err()
161163
&& downcast_any!(execution_plan_cloned, ShuffleWriterExec).is_err()

native-engine/datafusion-ext-plans/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub mod ipc_writer_exec;
4747
pub mod joins;
4848
pub mod limit_exec;
4949
pub mod orc_exec;
50+
pub mod orc_sink_exec;
5051
pub mod parquet_exec;
5152
pub mod parquet_sink_exec;
5253
pub mod project_exec;

0 commit comments

Comments
 (0)