Skip to content

Commit

Permalink
chore(batch): deprecate chunk size in scan (#13692)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Dec 11, 2023
1 parent ddbd74b commit d270f10
Show file tree
Hide file tree
Showing 8 changed files with 8 additions and 45 deletions.
7 changes: 1 addition & 6 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,8 @@ message RowSeqScanNode {
// Whether the order on output columns should be preserved.
bool ordered = 5;

message ChunkSize {
uint32 chunk_size = 1;
}
// If along with `batch_limit`, `chunk_size` will be set.
ChunkSize chunk_size = 6;
// The pushed down `batch_limit`. Max rows needed to return.
optional uint64 limit = 7;
optional uint64 limit = 6;
}

message SysRowSeqScanNode {
Expand Down
1 change: 0 additions & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
scan_ranges,
ordered: false,
vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
chunk_size: None,
limit: None,
});

Expand Down
8 changes: 3 additions & 5 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,12 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
let ordered = seq_scan_node.ordered;

let epoch = source.epoch.clone();
let chunk_size = if let Some(chunk_size_) = &seq_scan_node.chunk_size {
chunk_size_
.get_chunk_size()
.min(source.context.get_config().developer.chunk_size as u32)
let limit = seq_scan_node.limit;
let chunk_size = if let Some(limit) = seq_scan_node.limit {
(limit as u32).min(source.context.get_config().developer.chunk_size as u32)
} else {
source.context.get_config().developer.chunk_size as u32
};
let limit = seq_scan_node.limit;
let metrics = source.context().batch_metrics();

dispatch_state_store!(source.context().state_store(), state_store, {
Expand Down
5 changes: 0 additions & 5 deletions src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use risingwave_common::error::Result;
use risingwave_common::types::ScalarImpl;
use risingwave_common::util::scan_range::{is_full_range, ScanRange};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::row_seq_scan_node::ChunkSize;
use risingwave_pb::batch_plan::RowSeqScanNode;

use super::batch::prelude::*;
Expand Down Expand Up @@ -247,10 +246,6 @@ impl ToBatchPb for BatchSeqScan {
// To be filled by the scheduler.
vnode_bitmap: None,
ordered: !self.order().is_any(),
chunk_size: self
.core
.chunk_size
.map(|chunk_size| ChunkSize { chunk_size }),
limit: *self.limit(),
})
}
Expand Down
5 changes: 1 addition & 4 deletions src/frontend/src/optimizer/plan_node/generic/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ pub struct Scan {
pub indexes: Vec<Rc<IndexCatalog>>,
/// The pushed down predicates. It refers to column indexes of the table.
pub predicate: Condition,
/// Help RowSeqScan executor use a better chunk size
pub chunk_size: Option<u32>,
/// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join.
pub for_system_time_as_of_proctime: bool,
/// The cardinality of the table **without** applying the predicate.
Expand Down Expand Up @@ -290,10 +288,9 @@ impl Scan {
table_desc,
indexes,
predicate,
chunk_size: None,
for_system_time_as_of_proctime,
ctx,
table_cardinality,
ctx,
}
}

Expand Down
10 changes: 0 additions & 10 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,6 @@ impl LogicalScan {
}
}

/// used by optimizer (currently `top_n_on_index_rule`) to help reduce useless `chunk_size` at
/// executor
pub fn set_chunk_size(&mut self, chunk_size: u32) {
self.core.chunk_size = Some(chunk_size);
}

pub fn chunk_size(&self) -> Option<u32> {
self.core.chunk_size
}

pub fn primary_key(&self) -> &[ColumnOrder] {
self.core.primary_key()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ impl Rule for BatchPushLimitToScanRule {
return None;
}
let pushed_limit = limit.limit() + limit.offset();
let mut scan_core = scan.core().clone();
scan_core.chunk_size = Some((u32::MAX as u64).min(pushed_limit) as u32);
let new_scan = BatchSeqScan::new_with_dist(
scan_core,
scan.core().clone(),
scan.base.distribution().clone(),
scan.scan_ranges().iter().cloned().collect_vec(),
Some(pushed_limit),
Expand Down
13 changes: 2 additions & 11 deletions src/frontend/src/optimizer/rule/top_n_on_index_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,7 @@ impl TopNOnIndexRule {
) -> Option<PlanRef> {
let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order);
for index in order_satisfied_index {
if let Some(mut index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
index_scan.set_chunk_size(
((u32::MAX as u64)
.min(logical_top_n.limit_attr().limit() + logical_top_n.offset()))
as u32,
);
if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
return Some(logical_top_n.clone_with_input(index_scan.into()).into());
}
}
Expand All @@ -76,7 +71,7 @@ impl TopNOnIndexRule {
fn try_on_pk(
&self,
logical_top_n: &LogicalTopN,
mut logical_scan: LogicalScan,
logical_scan: LogicalScan,
order: &Order,
) -> Option<PlanRef> {
let output_col_map = logical_scan
Expand All @@ -102,10 +97,6 @@ impl TopNOnIndexRule {
.collect::<Vec<_>>(),
};
if primary_key_order.satisfies(order) {
logical_scan.set_chunk_size(
((u32::MAX as u64).min(logical_top_n.limit_attr().limit() + logical_top_n.offset()))
as u32,
);
Some(logical_top_n.clone_with_input(logical_scan.into()).into())
} else {
None
Expand Down

0 comments on commit d270f10

Please sign in to comment.