From 6ee8bd2b3923fb6e438e135743ae73068865eee8 Mon Sep 17 00:00:00 2001 From: Dylan Date: Thu, 1 Feb 2024 17:01:06 +0800 Subject: [PATCH] fix(batch): fix sequential exchange (#14924) --- proto/batch_plan.proto | 1 + src/frontend/src/optimizer/plan_node/batch_exchange.rs | 2 ++ src/frontend/src/scheduler/distributed/stage.rs | 6 +++--- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index f81637dd89f8c..689217a4f011d 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -241,6 +241,7 @@ message ExchangeSource { message ExchangeNode { repeated ExchangeSource sources = 1; + // sequential means each tasks of the exchange node will be executed sequentially. bool sequential = 2; repeated plan_common.Field input_schema = 3; } diff --git a/src/frontend/src/optimizer/plan_node/batch_exchange.rs b/src/frontend/src/optimizer/plan_node/batch_exchange.rs index 5927dce274497..cd0f6ef8e1e0b 100644 --- a/src/frontend/src/optimizer/plan_node/batch_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/batch_exchange.rs @@ -30,6 +30,8 @@ use crate::optimizer::property::{Distribution, DistributionDisplay, Order, Order pub struct BatchExchange { pub base: PlanBase, input: PlanRef, + // sequential means each tasks of the exchange node will be executed sequentially. + // Currently, it is used to avoid spawn too many tasks for limit operator. sequential: bool, } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 5d5d6755b27d5..9585e18713d58 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -935,12 +935,12 @@ impl StageRunner { let exchange_sources = child_stage.all_exchange_sources_for(task_id); match &execution_plan_node.node { - NodeBody::Exchange(_exchange_node) => PlanNodePb { + NodeBody::Exchange(exchange_node) => PlanNodePb { children: vec![], identity, node_body: Some(NodeBody::Exchange(ExchangeNode { sources: exchange_sources, - sequential: true, + sequential: exchange_node.sequential, input_schema: execution_plan_node.schema.clone(), })), }, @@ -950,7 +950,7 @@ impl StageRunner { node_body: Some(NodeBody::MergeSortExchange(MergeSortExchangeNode { exchange: Some(ExchangeNode { sources: exchange_sources, - sequential: true, + sequential: false, input_schema: execution_plan_node.schema.clone(), }), column_orders: sort_merge_exchange_node.column_orders.clone(),