Skip to content

Commit

Permalink
support batch limit push down
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Nov 24, 2023
1 parent 9ae8705 commit 88b1ff3
Show file tree
Hide file tree
Showing 18 changed files with 167 additions and 29 deletions.
5 changes: 5 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ message RowSeqScanNode {
}
// If along with `batch_limit`, `chunk_size` will be set.
ChunkSize chunk_size = 6;
message Limit {
uint64 limit = 1;
}
// The pushed down `batch_limit`. Max rows needed to return.
Limit limit = 7;
}

message SysRowSeqScanNode {
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
ordered: false,
vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
chunk_size: None,
limit: None,
});

Ok(row_seq_scan_node)
Expand Down
30 changes: 28 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct RowSeqScanExecutor<S: StateStore> {
scan_ranges: Vec<ScanRange>,
ordered: bool,
epoch: BatchQueryEpoch,
limit: Option<u64>,
}

/// Range for batch scan.
Expand Down Expand Up @@ -138,6 +139,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
epoch: BatchQueryEpoch,
chunk_size: usize,
identity: String,
limit: Option<u64>,
metrics: Option<BatchMetricsWithTaskLabels>,
) -> Self {
Self {
Expand All @@ -148,6 +150,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
scan_ranges,
ordered,
epoch,
limit,
}
}
}
Expand Down Expand Up @@ -252,6 +255,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
} else {
source.context.get_config().developer.chunk_size as u32
};
let limit = seq_scan_node.limit.clone().map(|limit| limit.limit);
let metrics = source.context().batch_metrics();

dispatch_state_store!(source.context().state_store(), state_store, {
Expand All @@ -275,6 +279,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
epoch,
chunk_size as usize,
source.plan_node().get_identity().clone(),
limit,
metrics,
)))
})
Expand Down Expand Up @@ -306,6 +311,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
scan_ranges,
ordered,
epoch,
limit,
} = *self;
let table = Arc::new(table);

Expand All @@ -328,6 +334,11 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
.into_iter()
.partition(|x| x.pk_prefix.len() == table.pk_indices().len());

// the number of rows have been returned as execute result
let mut returned = 0;
if let Some(limit) = &limit && returned >= *limit {
return Ok(());
}
let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
// Point Get
for point_get in point_gets {
Expand All @@ -336,12 +347,20 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
Self::execute_point_get(table, point_get, epoch.clone(), histogram.clone()).await?
{
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
returned += chunk.cardinality() as u64;
yield chunk;
if let Some(limit) = &limit && returned >= *limit {
return Ok(());
}
}
}
}
if let Some(chunk) = data_chunk_builder.consume_all() {
returned += chunk.cardinality() as u64;
yield chunk;
if let Some(limit) = &limit && returned >= *limit {
return Ok(());
}
}

// Range Scan
Expand All @@ -354,12 +373,18 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
ordered,
epoch.clone(),
chunk_size,
limit,
histogram,
))
}));
#[for_await]
for chunk in range_scans {
yield chunk?;
let chunk = chunk?;
returned += chunk.cardinality() as u64;
yield chunk;
if let Some(limit) = &limit && returned >= *limit {
return Ok(());
}
}
}

Expand Down Expand Up @@ -391,6 +416,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
ordered: bool,
epoch: BatchQueryEpoch,
chunk_size: usize,
limit: Option<u64>,
histogram: Option<impl Deref<Target = Histogram>>,
) {
let ScanRange {
Expand Down Expand Up @@ -443,7 +469,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
},
),
ordered,
PrefetchOptions::new_for_large_range_scan(),
PrefetchOptions::new_with_exhaust_iter(limit.is_none()),
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
1024,
"RowSeqExecutor2".to_string(),
None,
None,
));
let mut stream = scan.execute();
while let Some(message) = stream.next().await {
Expand Down
4 changes: 4 additions & 0 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ async fn test_table_materialize() -> StreamResult<()> {
1024,
"RowSeqExecutor2".to_string(),
None,
None,
));
let mut stream = scan.execute();
let result = stream.next().await;
Expand Down Expand Up @@ -333,6 +334,7 @@ async fn test_table_materialize() -> StreamResult<()> {
1024,
"RowSeqScanExecutor2".to_string(),
None,
None,
));

let mut stream = scan.execute();
Expand Down Expand Up @@ -409,6 +411,7 @@ async fn test_table_materialize() -> StreamResult<()> {
1024,
"RowSeqScanExecutor2".to_string(),
None,
None,
));

let mut stream = scan.execute();
Expand Down Expand Up @@ -479,6 +482,7 @@ async fn test_row_seq_scan() -> Result<()> {
1,
"RowSeqScanExecutor2".to_string(),
None,
None,
));

assert_eq!(executor.schema().fields().len(), 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@
BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
└─BatchScan { table: t, columns: [t.v1, t.v2], limit: 1, distribution: SomeShard }
- sql: |
create table t (v1 bigint, v2 double precision);
select * from t order by v1 limit 1
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/except.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,12 @@
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, c bigint);
Expand All @@ -257,12 +257,12 @@
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
select 1 except select 2 except select 3 except select 4 except select 5 except select 5
optimized_logical_plan_for_batch: |-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@
BatchTopN { order: [idx1.a ASC], limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b], distribution: UpstreamHashShard(idx1.a) }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b], limit: 1, distribution: UpstreamHashShard(idx1.a) }
- name: topn on primary key
sql: |
create table t1 (a int primary key, b int);
Expand All @@ -597,7 +597,7 @@
BatchTopN { order: [t1.a ASC], limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t1, columns: [t1.a, t1.b], distribution: UpstreamHashShard(t1.a) }
└─BatchScan { table: t1, columns: [t1.a, t1.b], limit: 1, distribution: UpstreamHashShard(t1.a) }
- name: topn on index with descending ordering
sql: |
create table t1 (a int, b int);
Expand All @@ -607,7 +607,7 @@
BatchTopN { order: [idx1.a DESC], limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b], distribution: UpstreamHashShard(idx1.a) }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b], limit: 1, distribution: UpstreamHashShard(idx1.a) }
- name: topn on pk streaming case, should NOT optimized
sql: |
create table t1 (a int primary key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,12 @@
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, c bigint);
Expand All @@ -257,12 +257,12 @@
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
select 1 intersect select 2 intersect select 3 intersect select 4 intersect select 5 intersect select 5
optimized_logical_plan_for_batch: |-
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/limit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t, columns: [t.a], distribution: UpstreamHashShard(t.a) }
└─BatchScan { table: t, columns: [t.a], limit: 1, distribution: UpstreamHashShard(t.a) }
stream_plan: |-
StreamMaterialize { columns: [a], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.a] }
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/order_by.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@
BatchLimit { limit: 3, offset: 4 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 7, offset: 0 }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
└─BatchScan { table: t, columns: [t.v1, t.v2], limit: 7, distribution: SomeShard }
- sql: |
create table t (v1 bigint, v2 double precision);
select * from t limit 5;
batch_plan: |-
BatchLimit { limit: 5, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 5, offset: 0 }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
└─BatchScan { table: t, columns: [t.v1, t.v2], limit: 5, distribution: SomeShard }
- sql: |
create table t (v1 bigint, v2 double precision);
select * from t order by v1 desc limit 5 offset 7;
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/union.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@
├─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, c bigint);
Expand All @@ -286,11 +286,11 @@
├─BatchLimit { limit: 1, offset: 0 }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchLimit { limit: 1, offset: 0 }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard }
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 1, offset: 0 }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
- sql: |
select 1 union all select 1
optimized_logical_plan_for_batch: 'LogicalValues { rows: [[1:Int32], [1:Int32]], schema: Schema { fields: [1:Int32:Int32] } }'
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ impl PlanRoot {
BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
}

let plan = plan.optimize_by_rules(&OptimizationStage::new(
"Push Limit To Scan",
vec![BatchPushLimitToScanRule::create()],
ApplyOrder::BottomUp,
));

Ok(plan)
}

Expand Down Expand Up @@ -292,6 +298,12 @@ impl PlanRoot {
ctx.trace(plan.explain_to_string());
}

let plan = plan.optimize_by_rules(&OptimizationStage::new(
"Push Limit To Scan",
vec![BatchPushLimitToScanRule::create()],
ApplyOrder::BottomUp,
));

Ok(plan)
}

Expand Down
Loading

0 comments on commit 88b1ff3

Please sign in to comment.