From 951d5b91d594b83bff686dcc83254e56e420b909 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 25 Jan 2024 15:34:37 +0800 Subject: [PATCH 1/2] support sequential exchange --- proto/batch_plan.proto | 1 + src/batch/src/executor/generic_exchange.rs | 53 +++++++++++-------- .../src/executor/join/local_lookup_join.rs | 1 + src/batch/src/task/task_execution.rs | 6 +++ .../tests/testdata/output/basic_query.yaml | 2 +- .../tests/testdata/output/except.yaml | 8 +-- .../tests/testdata/output/intersect.yaml | 8 +-- .../tests/testdata/output/limit.yaml | 2 +- .../tests/testdata/output/order_by.yaml | 4 +- .../tests/testdata/output/union.yaml | 8 +-- .../src/optimizer/plan_node/batch_exchange.rs | 31 +++++++++-- .../src/optimizer/plan_node/batch_limit.rs | 18 +++++-- .../src/scheduler/distributed/stage.rs | 2 + 13 files changed, 101 insertions(+), 43 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index f6164f12226bf..f81637dd89f8c 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -241,6 +241,7 @@ message ExchangeSource { message ExchangeNode { repeated ExchangeSource sources = 1; + bool sequential = 2; repeated plan_common.Field input_schema = 3; } diff --git a/src/batch/src/executor/generic_exchange.rs b/src/batch/src/executor/generic_exchange.rs index 704a085fec245..4a50be2c2665a 100644 --- a/src/batch/src/executor/generic_exchange.rs +++ b/src/batch/src/executor/generic_exchange.rs @@ -14,7 +14,6 @@ use futures::StreamExt; use futures_async_stream::try_stream; -use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::util::iter_util::ZipEqFast; @@ -39,6 +38,7 @@ pub struct GenericExchangeExecutor { proto_sources: Vec, /// Mock-able CreateSource. source_creators: Vec, + sequential: bool, context: C, schema: Schema, @@ -126,6 +126,8 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder { NodeBody::Exchange )?; + let sequential = node.get_sequential(); + ensure!(!node.get_sources().is_empty()); let proto_sources: Vec = node.get_sources().to_vec(); let source_creators = @@ -136,6 +138,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder { Ok(Box::new(ExchangeExecutor:: { proto_sources, source_creators, + sequential, context: source.context().clone(), schema: Schema { fields }, task_id: source.task_id.clone(), @@ -164,26 +167,33 @@ impl Executor impl GenericExchangeExecutor { #[try_stream(boxed, ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { - let mut stream = select_all( - self.proto_sources - .into_iter() - .zip_eq_fast(self.source_creators) - .map(|(prost_source, source_creator)| { - Self::data_chunk_stream( - prost_source, - source_creator, - self.context.clone(), - self.metrics.clone(), - self.identity.clone(), - ) - }) - .collect_vec(), - ) - .boxed(); - - while let Some(data_chunk) = stream.next().await { - let data_chunk = data_chunk?; - yield data_chunk + let streams = self + .proto_sources + .into_iter() + .zip_eq_fast(self.source_creators) + .map(|(prost_source, source_creator)| { + Self::data_chunk_stream( + prost_source, + source_creator, + self.context.clone(), + self.metrics.clone(), + self.identity.clone(), + ) + }); + + if self.sequential { + for mut stream in streams { + while let Some(data_chunk) = stream.next().await { + let data_chunk = data_chunk?; + yield data_chunk + } + } + } else { + let mut stream = select_all(streams).boxed(); + while let Some(data_chunk) = stream.next().await { + let data_chunk = data_chunk?; + yield data_chunk + } } } @@ -262,6 +272,7 @@ mod tests { metrics: None, proto_sources, source_creators, + sequential: false, context, schema: Schema { fields: vec![Field::unnamed(DataType::Int32)], diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 85762d5831a60..3ad165b1032d6 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -224,6 +224,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder let exchange_node = NodeBody::Exchange(ExchangeNode { sources, + sequential: true, input_schema: self.inner_side_schema.to_prost(), }); diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 9c5f94953ea20..d50cb025f9902 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -196,6 +196,12 @@ pub struct TaskOutput { failure: Arc>>>, } +impl Drop for TaskOutput { + fn drop(&mut self) { + println!("drop TaskOutput {:?}", self.output_id); + } +} + impl std::fmt::Debug for TaskOutput { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("TaskOutput") diff --git a/src/frontend/planner_test/tests/testdata/output/basic_query.yaml b/src/frontend/planner_test/tests/testdata/output/basic_query.yaml index 2bd37bdadf606..9342f68bcb161 100644 --- a/src/frontend/planner_test/tests/testdata/output/basic_query.yaml +++ b/src/frontend/planner_test/tests/testdata/output/basic_query.yaml @@ -189,7 +189,7 @@ select * from t limit 1 batch_plan: |- BatchLimit { limit: 1, offset: 0 } - └─BatchExchange { order: [], dist: Single } + └─BatchExchange { order: [], dist: Single, sequential: true } └─BatchLimit { limit: 1, offset: 0 } └─BatchScan { table: t, columns: [t.v1, t.v2], limit: 1, distribution: SomeShard } - sql: | diff --git a/src/frontend/planner_test/tests/testdata/output/except.yaml b/src/frontend/planner_test/tests/testdata/output/except.yaml index e14327988af6e..87c041d65532e 100644 --- a/src/frontend/planner_test/tests/testdata/output/except.yaml +++ b/src/frontend/planner_test/tests/testdata/output/except.yaml @@ -220,12 +220,12 @@ └─BatchHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } ├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) } │ └─BatchLimit { limit: 1, offset: 0 } - │ └─BatchExchange { order: [], dist: Single } + │ └─BatchExchange { order: [], dist: Single, sequential: true } │ └─BatchLimit { limit: 1, offset: 0 } │ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard } └─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) } └─BatchLimit { limit: 1, offset: 0 } - └─BatchExchange { order: [], dist: Single } + └─BatchExchange { order: [], dist: Single, sequential: true } └─BatchLimit { limit: 1, offset: 0 } └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard } - sql: | @@ -247,12 +247,12 @@ └─BatchHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } ├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) } │ └─BatchLimit { limit: 1, offset: 0 } - │ └─BatchExchange { order: [], dist: Single } + │ └─BatchExchange { order: [], dist: Single, sequential: true } │ └─BatchLimit { limit: 1, offset: 0 } │ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard } └─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) } └─BatchLimit { limit: 1, offset: 0 } - └─BatchExchange { order: [], dist: Single } + └─BatchExchange { order: [], dist: Single, sequential: true } └─BatchLimit { limit: 1, offset: 0 } └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard } - sql: | diff --git a/src/frontend/planner_test/tests/testdata/output/intersect.yaml b/src/frontend/planner_test/tests/testdata/output/intersect.yaml index 579ee4ffe0665..ed5ed65a0e9ce 100644 --- a/src/frontend/planner_test/tests/testdata/output/intersect.yaml +++ b/src/frontend/planner_test/tests/testdata/output/intersect.yaml @@ -220,12 +220,12 @@ └─BatchHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } ├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) } │ └─BatchLimit { limit: 1, offset: 0 } - │ └─BatchExchange { order: [], dist: Single } + │ └─BatchExchange { order: [], dist: Single, sequential: true } │ └─BatchLimit { limit: 1, offset: 0 } │ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard } └─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) } └─BatchLimit { limit: 1, offset: 0 } - └─BatchExchange { order: [], dist: Single } + └─BatchExchange { order: [], dist: Single, sequential: true } └─BatchLimit { limit: 1, offset: 0 } └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard } - sql: | @@ -247,12 +247,12 @@ └─BatchHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } ├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) } │ └─BatchLimit { limit: 1, offset: 0 } - │ └─BatchExchange { order: [], dist: Single } + │ └─BatchExchange { order: [], dist: Single, sequential: true } │ └─BatchLimit { limit: 1, offset: 0 } │ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard } └─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) } └─BatchLimit { limit: 1, offset: 0 } - └─BatchExchange { order: [], dist: Single } + └─BatchExchange { order: [], dist: Single, sequential: true } └─BatchLimit { limit: 1, offset: 0 } └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard } - sql: | diff --git a/src/frontend/planner_test/tests/testdata/output/limit.yaml b/src/frontend/planner_test/tests/testdata/output/limit.yaml index 679e6ba9bd0a7..71e9d94f20ac8 100644 --- a/src/frontend/planner_test/tests/testdata/output/limit.yaml +++ b/src/frontend/planner_test/tests/testdata/output/limit.yaml @@ -158,7 +158,7 @@ select * from t limit 1; batch_plan: |- BatchLimit { limit: 1, offset: 0 } - └─BatchExchange { order: [], dist: Single } + └─BatchExchange { order: [], dist: Single, sequential: true } └─BatchLimit { limit: 1, offset: 0 } └─BatchScan { table: t, columns: [t.a], limit: 1, distribution: UpstreamHashShard(t.a) } stream_plan: |- diff --git a/src/frontend/planner_test/tests/testdata/output/order_by.yaml b/src/frontend/planner_test/tests/testdata/output/order_by.yaml index 30da302f20220..f1c83d79f2537 100644 --- a/src/frontend/planner_test/tests/testdata/output/order_by.yaml +++ b/src/frontend/planner_test/tests/testdata/output/order_by.yaml @@ -96,7 +96,7 @@ select * from t limit 3 offset 4; batch_plan: |- BatchLimit { limit: 3, offset: 4 } - └─BatchExchange { order: [], dist: Single } + └─BatchExchange { order: [], dist: Single, sequential: true } └─BatchLimit { limit: 7, offset: 0 } └─BatchScan { table: t, columns: [t.v1, t.v2], limit: 7, distribution: SomeShard } - sql: | @@ -104,7 +104,7 @@ select * from t limit 5; batch_plan: |- BatchLimit { limit: 5, offset: 0 } - └─BatchExchange { order: [], dist: Single } + └─BatchExchange { order: [], dist: Single, sequential: true } └─BatchLimit { limit: 5, offset: 0 } └─BatchScan { table: t, columns: [t.v1, t.v2], limit: 5, distribution: SomeShard } - sql: | diff --git a/src/frontend/planner_test/tests/testdata/output/union.yaml b/src/frontend/planner_test/tests/testdata/output/union.yaml index 6df8d08264dec..667d7cb1c5055 100644 --- a/src/frontend/planner_test/tests/testdata/output/union.yaml +++ b/src/frontend/planner_test/tests/testdata/output/union.yaml @@ -252,11 +252,11 @@ └─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) } └─BatchUnion { all: true } ├─BatchLimit { limit: 1, offset: 0 } - │ └─BatchExchange { order: [], dist: Single } + │ └─BatchExchange { order: [], dist: Single, sequential: true } │ └─BatchLimit { limit: 1, offset: 0 } │ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard } └─BatchLimit { limit: 1, offset: 0 } - └─BatchExchange { order: [], dist: Single } + └─BatchExchange { order: [], dist: Single, sequential: true } └─BatchLimit { limit: 1, offset: 0 } └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard } - sql: | @@ -278,11 +278,11 @@ └─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) } └─BatchUnion { all: true } ├─BatchLimit { limit: 1, offset: 0 } - │ └─BatchExchange { order: [], dist: Single } + │ └─BatchExchange { order: [], dist: Single, sequential: true } │ └─BatchLimit { limit: 1, offset: 0 } │ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard } └─BatchLimit { limit: 1, offset: 0 } - └─BatchExchange { order: [], dist: Single } + └─BatchExchange { order: [], dist: Single, sequential: true } └─BatchLimit { limit: 1, offset: 0 } └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard } - sql: | diff --git a/src/frontend/src/optimizer/plan_node/batch_exchange.rs b/src/frontend/src/optimizer/plan_node/batch_exchange.rs index d9985aabde8f7..5927dce274497 100644 --- a/src/frontend/src/optimizer/plan_node/batch_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/batch_exchange.rs @@ -30,14 +30,27 @@ use crate::optimizer::property::{Distribution, DistributionDisplay, Order, Order pub struct BatchExchange { pub base: PlanBase, input: PlanRef, + sequential: bool, } impl BatchExchange { pub fn new(input: PlanRef, order: Order, dist: Distribution) -> Self { + Self::new_inner(input, order, dist, false) + } + + pub fn new_with_sequential(input: PlanRef, order: Order, dist: Distribution) -> Self { + Self::new_inner(input, order, dist, true) + } + + fn new_inner(input: PlanRef, order: Order, dist: Distribution, sequential: bool) -> Self { let ctx = input.ctx(); let schema = input.schema().clone(); let base = PlanBase::new_batch(ctx, schema, dist, order); - BatchExchange { base, input } + BatchExchange { + base, + input, + sequential, + } } } @@ -53,7 +66,11 @@ impl Distill for BatchExchange { distribution: self.base.distribution(), input_schema, }); - childless_record("BatchExchange", vec![("order", order), ("dist", dist)]) + let mut fields = vec![("order", order), ("dist", dist)]; + if self.sequential { + fields.push(("sequential", Pretty::display(&true))); + } + childless_record("BatchExchange", fields) } } @@ -63,7 +80,12 @@ impl PlanTreeNodeUnary for BatchExchange { } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(input, self.order().clone(), self.distribution().clone()) + Self::new_inner( + input, + self.order().clone(), + self.distribution().clone(), + self.sequential, + ) } } impl_plan_tree_node_for_unary! {BatchExchange} @@ -80,12 +102,15 @@ impl ToBatchPb for BatchExchange { if self.base.order().is_any() { NodeBody::Exchange(ExchangeNode { sources: vec![], + sequential: self.sequential, input_schema: self.base.schema().to_prost(), }) } else { + assert!(!self.sequential); NodeBody::MergeSortExchange(MergeSortExchangeNode { exchange: Some(ExchangeNode { sources: vec![], + sequential: self.sequential, input_schema: self.base.schema().to_prost(), }), column_orders: self.base.order().to_protobuf(), diff --git a/src/frontend/src/optimizer/plan_node/batch_limit.rs b/src/frontend/src/optimizer/plan_node/batch_limit.rs index e91f2192356f6..2425c98b12e1d 100644 --- a/src/frontend/src/optimizer/plan_node/batch_limit.rs +++ b/src/frontend/src/optimizer/plan_node/batch_limit.rs @@ -23,8 +23,8 @@ use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, }; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::plan_node::ToLocalBatch; -use crate::optimizer::property::{Order, RequiredDist}; +use crate::optimizer::plan_node::{BatchExchange, ToLocalBatch}; +use crate::optimizer::property::{Distribution, Order, RequiredDist}; /// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -33,6 +33,8 @@ pub struct BatchLimit { core: generic::Limit, } +const LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD: u64 = 1024; + impl BatchLimit { pub fn new(core: generic::Limit) -> Self { let base = PlanBase::new_batch_with_core( @@ -52,7 +54,17 @@ impl BatchLimit { let single_dist = RequiredDist::single(); let ensure_single_dist = if !batch_partial_limit.distribution().satisfies(&single_dist) { - single_dist.enforce_if_not_satisfies(batch_partial_limit.into(), &any_order)? + if new_limit < LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD { + BatchExchange::new_with_sequential( + batch_partial_limit.into(), + any_order, + Distribution::Single, + ) + .into() + } else { + BatchExchange::new(batch_partial_limit.into(), any_order, Distribution::Single) + .into() + } } else { // The input's distribution is singleton, so use one phase limit is enough. return Ok(self.clone_with_input(new_input).into()); diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index b9a271e75dbd3..5d5d6755b27d5 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -940,6 +940,7 @@ impl StageRunner { identity, node_body: Some(NodeBody::Exchange(ExchangeNode { sources: exchange_sources, + sequential: true, input_schema: execution_plan_node.schema.clone(), })), }, @@ -949,6 +950,7 @@ impl StageRunner { node_body: Some(NodeBody::MergeSortExchange(MergeSortExchangeNode { exchange: Some(ExchangeNode { sources: exchange_sources, + sequential: true, input_schema: execution_plan_node.schema.clone(), }), column_orders: sort_merge_exchange_node.column_orders.clone(), From 354d961035486d98d0034b32f3b9706549edc689 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 26 Jan 2024 15:22:51 +0800 Subject: [PATCH 2/2] remove print --- src/batch/src/task/task_execution.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index d50cb025f9902..9c5f94953ea20 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -196,12 +196,6 @@ pub struct TaskOutput { failure: Arc>>>, } -impl Drop for TaskOutput { - fn drop(&mut self) { - println!("drop TaskOutput {:?}", self.output_id); - } -} - impl std::fmt::Debug for TaskOutput { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("TaskOutput")