Skip to content

Commit

Permalink
fix(batch): fix sequential exchange (#14924) (#14926)
Browse files Browse the repository at this point in the history
Co-authored-by: Dylan <[email protected]>
Co-authored-by: lmatz <[email protected]>
  • Loading branch information
3 people authored Feb 4, 2024
1 parent b502587 commit a2ec7cf
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 3 deletions.
1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ message ExchangeSource {

message ExchangeNode {
repeated ExchangeSource sources = 1;
// sequential means each tasks of the exchange node will be executed sequentially.
bool sequential = 2;
repeated plan_common.Field input_schema = 3;
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::optimizer::property::{Distribution, DistributionDisplay, Order, Order
pub struct BatchExchange {
pub base: PlanBase<Batch>,
input: PlanRef,
// sequential means each tasks of the exchange node will be executed sequentially.
// Currently, it is used to avoid spawn too many tasks for limit operator.
sequential: bool,
}

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,12 +935,12 @@ impl StageRunner {
let exchange_sources = child_stage.all_exchange_sources_for(task_id);

match &execution_plan_node.node {
NodeBody::Exchange(_exchange_node) => PlanNodePb {
NodeBody::Exchange(exchange_node) => PlanNodePb {
children: vec![],
identity,
node_body: Some(NodeBody::Exchange(ExchangeNode {
sources: exchange_sources,
sequential: true,
sequential: exchange_node.sequential,
input_schema: execution_plan_node.schema.clone(),
})),
},
Expand All @@ -950,7 +950,7 @@ impl StageRunner {
node_body: Some(NodeBody::MergeSortExchange(MergeSortExchangeNode {
exchange: Some(ExchangeNode {
sources: exchange_sources,
sequential: true,
sequential: false,
input_schema: execution_plan_node.schema.clone(),
}),
column_orders: sort_merge_exchange_node.column_orders.clone(),
Expand Down

0 comments on commit a2ec7cf

Please sign in to comment.