Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support sequential exchange #14795

Merged
merged 2 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ message ExchangeSource {

message ExchangeNode {
repeated ExchangeSource sources = 1;
bool sequential = 2;
repeated plan_common.Field input_schema = 3;
}

Expand Down
53 changes: 32 additions & 21 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::util::iter_util::ZipEqFast;
Expand All @@ -39,6 +38,7 @@ pub struct GenericExchangeExecutor<CS, C> {
proto_sources: Vec<PbExchangeSource>,
/// Mock-able CreateSource.
source_creators: Vec<CS>,
sequential: bool,
context: C,

schema: Schema,
Expand Down Expand Up @@ -126,6 +126,8 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
NodeBody::Exchange
)?;

let sequential = node.get_sequential();

ensure!(!node.get_sources().is_empty());
let proto_sources: Vec<PbExchangeSource> = node.get_sources().to_vec();
let source_creators =
Expand All @@ -136,6 +138,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
Ok(Box::new(ExchangeExecutor::<C> {
proto_sources,
source_creators,
sequential,
context: source.context().clone(),
schema: Schema { fields },
task_id: source.task_id.clone(),
Expand Down Expand Up @@ -164,26 +167,33 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> Executor
impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExecutor<CS, C> {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let mut stream = select_all(
self.proto_sources
.into_iter()
.zip_eq_fast(self.source_creators)
.map(|(prost_source, source_creator)| {
Self::data_chunk_stream(
prost_source,
source_creator,
self.context.clone(),
self.metrics.clone(),
self.identity.clone(),
)
})
.collect_vec(),
)
.boxed();

while let Some(data_chunk) = stream.next().await {
let data_chunk = data_chunk?;
yield data_chunk
let streams = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about move this part to the self.sequential block? It's not easy to understand the difference of collection.

.proto_sources
.into_iter()
.zip_eq_fast(self.source_creators)
.map(|(prost_source, source_creator)| {
Self::data_chunk_stream(
prost_source,
source_creator,
self.context.clone(),
self.metrics.clone(),
self.identity.clone(),
)
});

if self.sequential {
for mut stream in streams {
while let Some(data_chunk) = stream.next().await {
let data_chunk = data_chunk?;
yield data_chunk
}
}
} else {
let mut stream = select_all(streams).boxed();
while let Some(data_chunk) = stream.next().await {
let data_chunk = data_chunk?;
yield data_chunk
}
}
}

Expand Down Expand Up @@ -262,6 +272,7 @@ mod tests {
metrics: None,
proto_sources,
source_creators,
sequential: false,
context,
schema: Schema {
fields: vec![Field::unnamed(DataType::Int32)],
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>

let exchange_node = NodeBody::Exchange(ExchangeNode {
sources,
sequential: true,
input_schema: self.inner_side_schema.to_prost(),
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@
select * from t limit 1
batch_plan: |-
BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchExchange { order: [], dist: Single, sequential: true }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t, columns: [t.v1, t.v2], limit: 1, distribution: SomeShard }
- sql: |
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/except.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@
└─BatchHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchExchange { order: [], dist: Single, sequential: true }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchExchange { order: [], dist: Single, sequential: true }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
Expand All @@ -247,12 +247,12 @@
└─BatchHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchExchange { order: [], dist: Single, sequential: true }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchExchange { order: [], dist: Single, sequential: true }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@
└─BatchHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchExchange { order: [], dist: Single, sequential: true }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchExchange { order: [], dist: Single, sequential: true }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
Expand All @@ -247,12 +247,12 @@
└─BatchHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchExchange { order: [], dist: Single, sequential: true }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchExchange { order: [], dist: Single, sequential: true }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/limit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
select * from t limit 1;
batch_plan: |-
BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchExchange { order: [], dist: Single, sequential: true }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t, columns: [t.a], limit: 1, distribution: UpstreamHashShard(t.a) }
stream_plan: |-
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/order_by.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@
select * from t limit 3 offset 4;
batch_plan: |-
BatchLimit { limit: 3, offset: 4 }
└─BatchExchange { order: [], dist: Single }
└─BatchExchange { order: [], dist: Single, sequential: true }
└─BatchLimit { limit: 7, offset: 0 }
└─BatchScan { table: t, columns: [t.v1, t.v2], limit: 7, distribution: SomeShard }
- sql: |
create table t (v1 bigint, v2 double precision);
select * from t limit 5;
batch_plan: |-
BatchLimit { limit: 5, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchExchange { order: [], dist: Single, sequential: true }
└─BatchLimit { limit: 5, offset: 0 }
└─BatchScan { table: t, columns: [t.v1, t.v2], limit: 5, distribution: SomeShard }
- sql: |
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/union.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@
└─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
└─BatchUnion { all: true }
├─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchExchange { order: [], dist: Single, sequential: true }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchExchange { order: [], dist: Single, sequential: true }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
Expand All @@ -278,11 +278,11 @@
└─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
└─BatchUnion { all: true }
├─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchExchange { order: [], dist: Single, sequential: true }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchExchange { order: [], dist: Single, sequential: true }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
Expand Down
31 changes: 28 additions & 3 deletions src/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,27 @@ use crate::optimizer::property::{Distribution, DistributionDisplay, Order, Order
pub struct BatchExchange {
pub base: PlanBase<Batch>,
input: PlanRef,
sequential: bool,
}

impl BatchExchange {
pub fn new(input: PlanRef, order: Order, dist: Distribution) -> Self {
Self::new_inner(input, order, dist, false)
}

pub fn new_with_sequential(input: PlanRef, order: Order, dist: Distribution) -> Self {
Self::new_inner(input, order, dist, true)
}

fn new_inner(input: PlanRef, order: Order, dist: Distribution, sequential: bool) -> Self {
let ctx = input.ctx();
let schema = input.schema().clone();
let base = PlanBase::new_batch(ctx, schema, dist, order);
BatchExchange { base, input }
BatchExchange {
base,
input,
sequential,
}
}
}

Expand All @@ -53,7 +66,11 @@ impl Distill for BatchExchange {
distribution: self.base.distribution(),
input_schema,
});
childless_record("BatchExchange", vec![("order", order), ("dist", dist)])
let mut fields = vec![("order", order), ("dist", dist)];
if self.sequential {
fields.push(("sequential", Pretty::display(&true)));
}
childless_record("BatchExchange", fields)
}
}

Expand All @@ -63,7 +80,12 @@ impl PlanTreeNodeUnary for BatchExchange {
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(input, self.order().clone(), self.distribution().clone())
Self::new_inner(
input,
self.order().clone(),
self.distribution().clone(),
self.sequential,
)
}
}
impl_plan_tree_node_for_unary! {BatchExchange}
Expand All @@ -80,12 +102,15 @@ impl ToBatchPb for BatchExchange {
if self.base.order().is_any() {
NodeBody::Exchange(ExchangeNode {
sources: vec![],
sequential: self.sequential,
input_schema: self.base.schema().to_prost(),
})
} else {
assert!(!self.sequential);
NodeBody::MergeSortExchange(MergeSortExchangeNode {
exchange: Some(ExchangeNode {
sources: vec![],
sequential: self.sequential,
input_schema: self.base.schema().to_prost(),
}),
column_orders: self.base.order().to_protobuf(),
Expand Down
18 changes: 15 additions & 3 deletions src/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::plan_node::{BatchExchange, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};

/// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -33,6 +33,8 @@ pub struct BatchLimit {
core: generic::Limit<PlanRef>,
}

const LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD: u64 = 1024;

impl BatchLimit {
pub fn new(core: generic::Limit<PlanRef>) -> Self {
let base = PlanBase::new_batch_with_core(
Expand All @@ -52,7 +54,17 @@ impl BatchLimit {

let single_dist = RequiredDist::single();
let ensure_single_dist = if !batch_partial_limit.distribution().satisfies(&single_dist) {
single_dist.enforce_if_not_satisfies(batch_partial_limit.into(), &any_order)?
if new_limit < LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD {
BatchExchange::new_with_sequential(
batch_partial_limit.into(),
any_order,
Distribution::Single,
)
.into()
} else {
BatchExchange::new(batch_partial_limit.into(), any_order, Distribution::Single)
.into()
}
} else {
// The input's distribution is singleton, so use one phase limit is enough.
return Ok(self.clone_with_input(new_input).into());
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,7 @@ impl StageRunner {
identity,
node_body: Some(NodeBody::Exchange(ExchangeNode {
sources: exchange_sources,
sequential: true,
input_schema: execution_plan_node.schema.clone(),
})),
},
Expand All @@ -949,6 +950,7 @@ impl StageRunner {
node_body: Some(NodeBody::MergeSortExchange(MergeSortExchangeNode {
exchange: Some(ExchangeNode {
sources: exchange_sources,
sequential: true,
input_schema: execution_plan_node.schema.clone(),
}),
column_orders: sort_merge_exchange_node.column_orders.clone(),
Expand Down
Loading