Skip to content

Commit

Permalink
refactor(batch): simplify batch executor builder (#19731)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Dec 10, 2024
1 parent d2bb1db commit 152cab2
Show file tree
Hide file tree
Showing 49 changed files with 165 additions and 230 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ clap = { version = "4", features = ["cargo", "derive", "env"] }
deltalake = { version = "0.20.1", features = ["s3", "gcs", "datafusion"] }
itertools = "0.13.0"
jsonbb = "0.1.4"
linkme = { version = "0.3", features = ["used_linker"] }
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" }
parquet = { version = "53.2", features = ["async"] }
mysql_async = { version = "0.34", default-features = false, features = [
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/execution/local_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct LocalExchangeSource {
impl LocalExchangeSource {
pub fn create(
output_id: TaskOutputId,
context: impl BatchTaskContext,
context: &dyn BatchTaskContext,
task_id: TaskId,
) -> Result<Self> {
let task_output = context.get_task_output(output_id)?;
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

/// [`DeleteExecutor`] implements table deletion with values from its child executor.
// Note: multiple `DELETE`s in a single epoch, or concurrent `DELETE`s may lead to conflicting
Expand Down Expand Up @@ -164,10 +163,9 @@ impl DeleteExecutor {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for DeleteExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
use crate::error::{BatchError, Result};
use crate::task::BatchTaskContext;

pub struct ExpandExecutor {
column_subsets: Vec<Vec<usize>>,
Expand Down Expand Up @@ -90,10 +89,9 @@ impl ExpandExecutor {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for ExpandExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let expand_node = try_match_expand!(
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

pub struct FilterExecutor {
expr: BoxedExpression,
Expand Down Expand Up @@ -76,10 +75,9 @@ impl FilterExecutor {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for FilterExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [input]: [_; 1] = inputs.try_into().unwrap();
Expand Down
56 changes: 25 additions & 31 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use futures::StreamExt;
Expand All @@ -33,16 +34,16 @@ use crate::execution::local_exchange::LocalExchangeSource;
use crate::executor::ExecutorBuilder;
use crate::task::{BatchTaskContext, TaskId};

pub type ExchangeExecutor<C> = GenericExchangeExecutor<DefaultCreateSource, C>;
pub type ExchangeExecutor = GenericExchangeExecutor<DefaultCreateSource>;
use crate::executor::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor};
use crate::monitor::BatchMetrics;

pub struct GenericExchangeExecutor<CS, C> {
pub struct GenericExchangeExecutor<CS> {
proto_sources: Vec<PbExchangeSource>,
/// Mock-able `CreateSource`.
source_creators: Vec<CS>,
sequential: bool,
context: C,
context: Arc<dyn BatchTaskContext>,

schema: Schema,
#[expect(dead_code)]
Expand All @@ -59,7 +60,7 @@ pub struct GenericExchangeExecutor<CS, C> {
pub trait CreateSource: Send {
async fn create_source(
&self,
context: impl BatchTaskContext,
context: &dyn BatchTaskContext,
prost_source: &PbExchangeSource,
) -> Result<ExchangeSourceImpl>;
}
Expand All @@ -79,7 +80,7 @@ impl DefaultCreateSource {
impl CreateSource for DefaultCreateSource {
async fn create_source(
&self,
context: impl BatchTaskContext,
context: &dyn BatchTaskContext,
prost_source: &PbExchangeSource,
) -> Result<ExchangeSourceImpl> {
let peer_addr = prost_source.get_host()?.into();
Expand Down Expand Up @@ -146,10 +147,9 @@ impl CreateSource for DefaultCreateSource {

pub struct GenericExchangeExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
ensure!(
Expand All @@ -170,7 +170,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {

let input_schema: Vec<NodeField> = node.get_input_schema().to_vec();
let fields = input_schema.iter().map(Field::from).collect::<Vec<Field>>();
Ok(Box::new(ExchangeExecutor::<C> {
Ok(Box::new(ExchangeExecutor {
proto_sources,
source_creators,
sequential,
Expand All @@ -183,9 +183,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
}
}

impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> Executor
for GenericExchangeExecutor<CS, C>
{
impl<CS: 'static + Send + CreateSource> Executor for GenericExchangeExecutor<CS> {
fn schema(&self) -> &Schema {
&self.schema
}
Expand All @@ -199,7 +197,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> Executor
}
}

impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExecutor<CS, C> {
impl<CS: 'static + Send + CreateSource> GenericExchangeExecutor<CS> {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let streams = self
Expand All @@ -210,7 +208,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec
Self::data_chunk_stream(
prost_source,
source_creator,
self.context.clone(),
&*self.context,
self.metrics.clone(),
)
});
Expand All @@ -235,12 +233,10 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec
async fn data_chunk_stream(
prost_source: PbExchangeSource,
source_creator: CS,
context: C,
context: &dyn BatchTaskContext,
metrics: Option<BatchMetrics>,
) {
let mut source = source_creator
.create_source(context.clone(), &prost_source)
.await?;
let mut source = source_creator.create_source(context, &prost_source).await?;
// create the collector
let counter = metrics
.as_ref()
Expand Down Expand Up @@ -290,20 +286,18 @@ mod tests {
source_creators.push(fake_create_source);
}

let executor = Box::new(
GenericExchangeExecutor::<FakeCreateSource, ComputeNodeContext> {
metrics: None,
proto_sources,
source_creators,
sequential: false,
context,
schema: Schema {
fields: vec![Field::unnamed(DataType::Int32)],
},
task_id: TaskId::default(),
identity: "GenericExchangeExecutor2".to_string(),
let executor = Box::new(GenericExchangeExecutor::<FakeCreateSource> {
metrics: None,
proto_sources,
source_creators,
sequential: false,
context,
schema: Schema {
fields: vec![Field::unnamed(DataType::Int32)],
},
);
task_id: TaskId::default(),
identity: "GenericExchangeExecutor2".to_string(),
});

let mut stream = executor.execute();
let mut chunks: Vec<DataChunk> = vec![];
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

/// Group Top-N Executor
///
Expand Down Expand Up @@ -90,10 +89,9 @@ impl HashKeyDispatcher for GroupTopNExecutorBuilder {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::spill::spill_op::SpillBackend::Disk;
use crate::spill::spill_op::{
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
use crate::task::{ShutdownToken, TaskId};

type AggHashMap<K, A> = hashbrown::HashMap<K, Vec<AggregateState>, PrecomputedBuildHasher, A>;

Expand Down Expand Up @@ -149,10 +149,9 @@ impl HashAggExecutorBuilder {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for HashAggExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

pub struct HopWindowExecutor {
child: BoxedExecutor,
identity: String,
Expand All @@ -39,10 +39,9 @@ pub struct HopWindowExecutor {
output_indices: Vec<usize>,
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for HopWindowExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};
use crate::monitor::BatchMetrics;
use crate::task::BatchTaskContext;

static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0;
static POSITION_DELETE_FILE_POS: usize = 1;
Expand Down Expand Up @@ -225,10 +224,9 @@ impl IcebergScanExecutor {

pub struct IcebergScanExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> crate::error::Result<BoxedExecutor> {
ensure!(
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

/// [`InsertExecutor`] implements table insertion with values from its child executor.
pub struct InsertExecutor {
Expand Down Expand Up @@ -208,10 +207,9 @@ impl InsertExecutor {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for InsertExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::executor::{
unix_timestamp_sec_to_epoch, AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder,
BufferChunkExecutor, Executor, ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase,
};
use crate::task::{BatchTaskContext, ShutdownToken};
use crate::task::ShutdownToken;

/// Distributed Lookup Join Executor.
/// High level Execution flow:
Expand Down Expand Up @@ -81,10 +81,9 @@ impl<K> DistributedLookupJoinExecutor<K> {

pub struct DistributedLookupJoinExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::spill::spill_op::SpillBackend::Disk;
use crate::spill::spill_op::{
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken};
use crate::task::ShutdownToken;

/// Hash Join Executor
///
Expand Down Expand Up @@ -2144,10 +2144,9 @@ impl DataChunkMutator {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for HashJoinExecutor<()> {
async fn new_boxed_executor<C: BatchTaskContext>(
context: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
context: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
Expand Down
Loading

0 comments on commit 152cab2

Please sign in to comment.