From d270f10d43d3f87af3afe4d6ef35ed7075b1c7f5 Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 11 Dec 2023 11:53:00 +0800 Subject: [PATCH] chore(batch): deprecate chunk size in scan (#13692) --- proto/batch_plan.proto | 7 +------ src/batch/src/executor/join/local_lookup_join.rs | 1 - src/batch/src/executor/row_seq_scan.rs | 8 +++----- .../src/optimizer/plan_node/batch_seq_scan.rs | 5 ----- .../src/optimizer/plan_node/generic/scan.rs | 5 +---- .../src/optimizer/plan_node/logical_scan.rs | 10 ---------- .../rule/batch/batch_push_limit_to_scan_rule.rs | 4 +--- .../src/optimizer/rule/top_n_on_index_rule.rs | 13 ++----------- 8 files changed, 8 insertions(+), 45 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 9e409f057807a..856317a867661 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -25,13 +25,8 @@ message RowSeqScanNode { // Whether the order on output columns should be preserved. bool ordered = 5; - message ChunkSize { - uint32 chunk_size = 1; - } - // If along with `batch_limit`, `chunk_size` will be set. - ChunkSize chunk_size = 6; // The pushed down `batch_limit`. Max rows needed to return. - optional uint64 limit = 7; + optional uint64 limit = 6; } message SysRowSeqScanNode { diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 4113316a8542a..cb795aeef93fc 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -112,7 +112,6 @@ impl InnerSideExecutorBuilder { scan_ranges, ordered: false, vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()), - chunk_size: None, limit: None, }); diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index f45342d4d1363..20d4a4a540ff5 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -248,14 +248,12 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { let ordered = seq_scan_node.ordered; let epoch = source.epoch.clone(); - let chunk_size = if let Some(chunk_size_) = &seq_scan_node.chunk_size { - chunk_size_ - .get_chunk_size() - .min(source.context.get_config().developer.chunk_size as u32) + let limit = seq_scan_node.limit; + let chunk_size = if let Some(limit) = seq_scan_node.limit { + (limit as u32).min(source.context.get_config().developer.chunk_size as u32) } else { source.context.get_config().developer.chunk_size as u32 }; - let limit = seq_scan_node.limit; let metrics = source.context().batch_metrics(); dispatch_state_store!(source.context().state_store(), state_store, { diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index e43673350fb43..e9c39bc99738c 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -20,7 +20,6 @@ use risingwave_common::error::Result; use risingwave_common::types::ScalarImpl; use risingwave_common::util::scan_range::{is_full_range, ScanRange}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::row_seq_scan_node::ChunkSize; use risingwave_pb::batch_plan::RowSeqScanNode; use super::batch::prelude::*; @@ -247,10 +246,6 @@ impl ToBatchPb for BatchSeqScan { // To be filled by the scheduler. vnode_bitmap: None, ordered: !self.order().is_any(), - chunk_size: self - .core - .chunk_size - .map(|chunk_size| ChunkSize { chunk_size }), limit: *self.limit(), }) } diff --git a/src/frontend/src/optimizer/plan_node/generic/scan.rs b/src/frontend/src/optimizer/plan_node/generic/scan.rs index dbfc65313a8ea..c32d942242683 100644 --- a/src/frontend/src/optimizer/plan_node/generic/scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/scan.rs @@ -43,8 +43,6 @@ pub struct Scan { pub indexes: Vec>, /// The pushed down predicates. It refers to column indexes of the table. pub predicate: Condition, - /// Help RowSeqScan executor use a better chunk size - pub chunk_size: Option, /// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join. pub for_system_time_as_of_proctime: bool, /// The cardinality of the table **without** applying the predicate. @@ -290,10 +288,9 @@ impl Scan { table_desc, indexes, predicate, - chunk_size: None, for_system_time_as_of_proctime, - ctx, table_cardinality, + ctx, } } diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 379e30606049d..269633d5d74bd 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -193,16 +193,6 @@ impl LogicalScan { } } - /// used by optimizer (currently `top_n_on_index_rule`) to help reduce useless `chunk_size` at - /// executor - pub fn set_chunk_size(&mut self, chunk_size: u32) { - self.core.chunk_size = Some(chunk_size); - } - - pub fn chunk_size(&self) -> Option { - self.core.chunk_size - } - pub fn primary_key(&self) -> &[ColumnOrder] { self.core.primary_key() } diff --git a/src/frontend/src/optimizer/rule/batch/batch_push_limit_to_scan_rule.rs b/src/frontend/src/optimizer/rule/batch/batch_push_limit_to_scan_rule.rs index 8de0a12715772..b1dcb4dd1fbfd 100644 --- a/src/frontend/src/optimizer/rule/batch/batch_push_limit_to_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/batch/batch_push_limit_to_scan_rule.rs @@ -35,10 +35,8 @@ impl Rule for BatchPushLimitToScanRule { return None; } let pushed_limit = limit.limit() + limit.offset(); - let mut scan_core = scan.core().clone(); - scan_core.chunk_size = Some((u32::MAX as u64).min(pushed_limit) as u32); let new_scan = BatchSeqScan::new_with_dist( - scan_core, + scan.core().clone(), scan.base.distribution().clone(), scan.scan_ranges().iter().cloned().collect_vec(), Some(pushed_limit), diff --git a/src/frontend/src/optimizer/rule/top_n_on_index_rule.rs b/src/frontend/src/optimizer/rule/top_n_on_index_rule.rs index a08c0b9e258fe..f25456084abe7 100644 --- a/src/frontend/src/optimizer/rule/top_n_on_index_rule.rs +++ b/src/frontend/src/optimizer/rule/top_n_on_index_rule.rs @@ -60,12 +60,7 @@ impl TopNOnIndexRule { ) -> Option { let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order); for index in order_satisfied_index { - if let Some(mut index_scan) = logical_scan.to_index_scan_if_index_covered(index) { - index_scan.set_chunk_size( - ((u32::MAX as u64) - .min(logical_top_n.limit_attr().limit() + logical_top_n.offset())) - as u32, - ); + if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) { return Some(logical_top_n.clone_with_input(index_scan.into()).into()); } } @@ -76,7 +71,7 @@ impl TopNOnIndexRule { fn try_on_pk( &self, logical_top_n: &LogicalTopN, - mut logical_scan: LogicalScan, + logical_scan: LogicalScan, order: &Order, ) -> Option { let output_col_map = logical_scan @@ -102,10 +97,6 @@ impl TopNOnIndexRule { .collect::>(), }; if primary_key_order.satisfies(order) { - logical_scan.set_chunk_size( - ((u32::MAX as u64).min(logical_top_n.limit_attr().limit() + logical_top_n.offset())) - as u32, - ); Some(logical_top_n.clone_with_input(logical_scan.into()).into()) } else { None