Skip to content

Commit

Permalink
AggExec: implement columnar accumulator states.
Browse files Browse the repository at this point in the history
refactor execution context
  • Loading branch information
zhangli20 committed Nov 20, 2024
1 parent 8204cdc commit deffd25
Show file tree
Hide file tree
Showing 81 changed files with 5,019 additions and 7,279 deletions.
16 changes: 3 additions & 13 deletions .github/workflows/tpcds-reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,13 @@ jobs:
- name: Install Blaze JAR
run: cp blaze-engine-*${{ inputs.sparkver }}*.jar spark-bin-${{ inputs.sparkver }}/jars/

- name: Run with BHJ
- name: Run
run: |
export RUST_LOG=ERROR
export RUST_BACKTRACE=1
SPARK_HOME=spark-bin-${{ inputs.sparkver }} dev/run-tpcds-test \
--data-location dev/tpcds_1g \
--conf spark.driver.memory=3g \
--conf spark.driver.memoryOverhead=2048 \
--query-filter ${{ matrix.query }}
- name: Run without BHJ
run: |
export RUST_LOG=ERROR
export RUST_BACKTRACE=1
SPARK_HOME=spark-bin-${{ inputs.sparkver }} dev/run-tpcds-test \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.driver.memory=3g \
--conf spark.driver.memoryOverhead=2048 \
--data-location dev/tpcds_1g \
--conf spark.driver.memoryOverhead=2536 \
--conf spark.sql.broadcastTimeout=900s \
--query-filter ${{ matrix.query }}
42 changes: 9 additions & 33 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use datafusion_ext_exprs::{
string_ends_with::StringEndsWithExpr, string_starts_with::StringStartsWithExpr,
};
use datafusion_ext_plans::{
agg::{create_agg, AggExecMode, AggExpr, AggFunction, AggMode, GroupingExpr},
agg::{agg::create_agg, AggExecMode, AggExpr, AggFunction, AggMode, GroupingExpr},
agg_exec::AggExec,
broadcast_join_build_hash_map_exec::BroadcastJoinBuildHashMapExec,
broadcast_join_exec::BroadcastJoinExec,
Expand Down Expand Up @@ -536,7 +536,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
exec_mode,
physical_groupings,
physical_aggs,
agg.initial_input_buffer_offset as usize,
agg.supports_partial_skipping,
input,
)?))
Expand Down
54 changes: 26 additions & 28 deletions native-engine/blaze/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,22 @@ use datafusion::{
common::Result,
error::DataFusionError,
execution::context::TaskContext,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
ExecutionPlan,
},
physical_plan::{metrics::ExecutionPlanMetricsSet, ExecutionPlan},
};
use datafusion_ext_commons::df_execution_err;
use datafusion_ext_plans::{
common::execution_context::ExecutionContext, parquet_sink_exec::ParquetSinkExec,
};
use datafusion_ext_commons::{df_execution_err, streams::coalesce_stream::CoalesceInput};
use datafusion_ext_plans::{common::output::TaskOutputter, parquet_sink_exec::ParquetSinkExec};
use futures::{FutureExt, StreamExt};
use jni::objects::{GlobalRef, JObject};
use tokio::runtime::Runtime;

use crate::{handle_unwinded_scope, metrics::update_spark_metric_node};

pub struct NativeExecutionRuntime {
exec_ctx: Arc<ExecutionContext>,
native_wrapper: GlobalRef,
plan: Arc<dyn ExecutionPlan>,
task_context: Arc<TaskContext>,
partition: usize,
batch_receiver: Receiver<Result<Option<RecordBatch>>>,
rt: Runtime,
}
Expand All @@ -60,22 +58,15 @@ impl NativeExecutionRuntime {
partition: usize,
context: Arc<TaskContext>,
) -> Result<Self> {
// execute plan to output stream
let stream = plan.execute(partition, context.clone())?;
let schema = stream.schema();

// coalesce
let mut stream = if plan.as_any().downcast_ref::<ParquetSinkExec>().is_some() {
stream // cannot coalesce parquet sink output
} else {
context.coalesce_with_default_batch_size(
stream,
&BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), partition),
)?
};
let exec_ctx = ExecutionContext::new(
context.clone(),
partition,
plan.schema(),
&ExecutionPlanMetricsSet::new(),
);

// init ffi schema
let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?;
let ffi_schema = FFI_ArrowSchema::try_from(exec_ctx.output_schema().as_ref())?;
jni_call!(BlazeCallNativeWrapper(native_wrapper.as_obj())
.importSchema(&ffi_schema as *const FFI_ArrowSchema as i64) -> ()
)?;
Expand All @@ -98,17 +89,24 @@ impl NativeExecutionRuntime {

let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);
let nrt = Self {
exec_ctx: exec_ctx.clone(),
native_wrapper: native_wrapper.clone(),
plan,
partition,
plan: plan.clone(),
rt,
batch_receiver,
task_context: context,
};

// spawn batch producer
let err_sender = batch_sender.clone();
let consume_stream = async move {
// execute and coalesce plan to output stream
let stream = exec_ctx.execute(&plan)?;
let mut stream = if plan.as_any().downcast_ref::<ParquetSinkExec>().is_some() {
stream // cannot coalesce parquet sink output
} else {
exec_ctx.coalesce_with_default_batch_size(stream)
};

while let Some(batch) = AssertUnwindSafe(stream.next())
.catch_unwind()
.await
Expand Down Expand Up @@ -188,7 +186,7 @@ impl NativeExecutionRuntime {
}
};

let partition = self.partition;
let partition = self.exec_ctx.partition_id();
match next_batch() {
Ok(ret) => return ret,
Err(err) => {
Expand All @@ -203,13 +201,13 @@ impl NativeExecutionRuntime {
}

pub fn finalize(self) {
let partition = self.partition;
let partition = self.exec_ctx.partition_id();

log::info!("[partition={partition}] native execution finalizing");
self.update_metrics().unwrap_or_default();
drop(self.plan);

self.task_context.cancel_task(); // cancel all pending streams
self.exec_ctx.cancel_task(); // cancel all pending streams
self.rt.shutdown_background();
log::info!("[partition={partition}] native execution finalized");
}
Expand Down
1 change: 0 additions & 1 deletion native-engine/datafusion-ext-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ num = "0.4.2"
once_cell = "1.20.2"
paste = "1.0.15"
radsort = "0.1.1"
slimmer_box = "0.6.5"
tempfile = "3"
thrift = "0.17.0"
tokio = "1.41"
Expand Down
99 changes: 0 additions & 99 deletions native-engine/datafusion-ext-commons/src/bytes_arena.rs

This file was deleted.

Loading

0 comments on commit deffd25

Please sign in to comment.