Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): Support log iter batch for table or mv #16487

Merged
merged 10 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ message FilterNode {
expr.ExprNode search_condition = 1;
}

message LogRowSeqScanNode{
plan_common.StorageTableDesc table_desc = 1;
repeated int32 column_ids = 2;
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
// The partition to read for scan tasks.
//
// Will be filled by the scheduler.
common.Buffer vnode_bitmap = 3;
common.BatchQueryEpoch old_epoch = 4;
common.BatchQueryEpoch new_epoch = 5;
}

message InsertNode {
// Id of the table to perform inserting.
uint32 table_id = 1;
Expand Down Expand Up @@ -326,6 +337,7 @@ message PlanNode {
SourceNode source = 34;
SortOverWindowNode sort_over_window = 35;
MaxOneRowNode max_one_row = 36;
LogRowSeqScanNode log_row_seq_scan = 37;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
260 changes: 260 additions & 0 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::{Bound, Deref};
use std::sync::Arc;

use futures::prelude::stream::StreamExt;
use futures_async_stream::try_stream;
use futures_util::pin_mut;
use prometheus::Histogram;
use risingwave_common::array::DataChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::{dispatch_state_store, StateStore};

use super::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, ScanRange,
};
use crate::error::{BatchError, Result};
use crate::monitor::BatchMetricsWithTaskLabels;
use crate::task::BatchTaskContext;

pub struct LogRowSeqScanExecutor<S: StateStore> {
chunk_size: usize,
identity: String,

/// Batch metrics.
/// None: Local mode don't record mertics.
metrics: Option<BatchMetricsWithTaskLabels>,

table: StorageTable<S>,
scan_ranges: Vec<ScanRange>,
old_epoch: BatchQueryEpoch,
new_epoch: BatchQueryEpoch,
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
pub fn new(
table: StorageTable<S>,
scan_ranges: Vec<ScanRange>,
old_epoch: BatchQueryEpoch,
new_epoch: BatchQueryEpoch,
chunk_size: usize,
identity: String,
metrics: Option<BatchMetricsWithTaskLabels>,
) -> Self {
Self {
chunk_size,
identity,
metrics,
table,
scan_ranges,
old_epoch,
new_epoch,
}
}
}

pub struct LogStoreRowSeqScanExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
ensure!(
inputs.is_empty(),
"LogStore row sequential scan should not have input executor!"
);
let log_store_seq_scan_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::LogRowSeqScan
)?;

let table_desc: &StorageTableDesc = log_store_seq_scan_node.get_table_desc()?;
let column_ids = log_store_seq_scan_node
.column_ids
.iter()
.copied()
.map(ColumnId::from)
.collect();

let vnodes = match &log_store_seq_scan_node.vnode_bitmap {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
None => Some(TableDistribution::all_vnodes()),
};
let scan_ranges = vec![ScanRange::full()];

let chunk_size = source.context.get_config().developer.chunk_size as u32;
let metrics = source.context().batch_metrics();

dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
Ok(Box::new(LogRowSeqScanExecutor::new(
table,
scan_ranges,
log_store_seq_scan_node.old_epoch.clone().unwrap(),
log_store_seq_scan_node.new_epoch.clone().unwrap(),
chunk_size as usize,
source.plan_node().get_identity().clone(),
metrics,
)))
})
}
}
impl<S: StateStore> Executor for LogRowSeqScanExecutor<S> {
fn schema(&self) -> &Schema {
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
self.table.schema()
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.do_execute().boxed()
}
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let Self {
chunk_size,
identity,
metrics,
table,
scan_ranges,
old_epoch,
new_epoch,
} = *self;
let table = std::sync::Arc::new(table);
let mut schema = table.schema().clone();
// Add op column
schema.fields.push(Field::with_name(
risingwave_common::types::DataType::Int16,
"op",
));
let schema = Arc::new(schema);

// Create collector.
let histogram = metrics.as_ref().map(|metrics| {
metrics
.executor_metrics()
.row_seq_scan_next_duration
.with_guarded_label_values(&metrics.executor_labels(&identity))
});
// Range Scan
// WARN: DO NOT use `select` to execute range scans concurrently
// it can consume too much memory if there're too many ranges.
for range in scan_ranges {
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
let stream = Self::execute_range(
table.clone(),
range,
old_epoch.clone(),
new_epoch.clone(),
chunk_size,
histogram.clone(),
schema.clone(),
);
#[for_await]
for chunk in stream {
let chunk = chunk?;
yield chunk;
}
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
async fn execute_range(
table: Arc<StorageTable<S>>,
scan_range: ScanRange,
old_epoch: BatchQueryEpoch,
new_epoch: BatchQueryEpoch,
chunk_size: usize,
histogram: Option<impl Deref<Target = Histogram>>,
schema: Arc<Schema>,
) {
let ScanRange {
pk_prefix,
next_col_bounds,
} = scan_range;

let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
let (start_bound, end_bound) = (next_col_bounds.0, next_col_bounds.1);
assert!(pk_prefix.len() < table.pk_indices().len());
// Range Scan.
let iter = table
.batch_iter_log_with_pk_bounds(
old_epoch.into(),
new_epoch.into(),
&pk_prefix,
(
match start_bound {
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
Bound::Unbounded => {
if order_type.nulls_are_first() {
// `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
_ => unimplemented!("Log iter range need full"),
},
match end_bound {
Bound::Unbounded => {
if order_type.nulls_are_last() {
// `NULL`s are at the end bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
_ => unimplemented!("Log iter range need full"),
},
),
)
.await?;

pin_mut!(iter);
loop {
let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to do it in a loop. Currently if there is more than chunk_size rows in iter, we will only collect the first chunk_size rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to the loop above, or is it a nested loop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I thought the iter is created inside the loop. So now we won't miss any rows.

nits: we can do iter.map(|r| { ... }) before the pin_mut, so that we don't have to recreate the map stream in every iteration.

.await
.map_err(BatchError::from)?;
if let Some(timer) = timer {
timer.observe_duration()
}

if let Some(chunk) = chunk {
yield chunk
} else {
break;
}
}
}
}
3 changes: 3 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod iceberg_scan;
mod insert;
mod join;
mod limit;
mod log_row_seq_scan;
mod managed;
mod max_one_row;
mod merge_sort_exchange;
Expand Down Expand Up @@ -80,6 +81,7 @@ pub use update::*;
pub use utils::*;
pub use values::*;

use self::log_row_seq_scan::LogStoreRowSeqScanExecutorBuilder;
use self::test_utils::{BlockExecutorBuilder, BusyLoopExecutorBuilder};
use crate::error::Result;
use crate::executor::sys_row_seq_scan::SysRowSeqScanExecutorBuilder;
Expand Down Expand Up @@ -239,6 +241,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuilder,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder,
NodeBody::LogRowSeqScan => LogStoreRowSeqScanExecutorBuilder,
}
.await?;

Expand Down
1 change: 0 additions & 1 deletion src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.copied()
.map(ColumnId::from)
.collect();

let vnodes = match &seq_scan_node.vnode_bitmap {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::types::DataType;

/// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is
/// not globally unique.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
pub struct ColumnId(i32);

impl std::fmt::Debug for ColumnId {
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ pub use insert::BoundInsert;
use pgwire::pg_server::{Session, SessionId};
pub use query::BoundQuery;
pub use relation::{
BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable, BoundWatermark,
BoundWindowTableFunction, Relation, ResolveQualifiedNameError, WindowTableFunctionKind,
BoundBaseTable, BoundJoin, BoundLogTable, BoundShare, BoundSource, BoundSystemTable,
BoundWatermark, BoundWindowTableFunction, Relation, ResolveQualifiedNameError,
WindowTableFunctionKind,
};
pub use select::{BoundDistinct, BoundSelect};
pub use set_expr::*;
Expand Down
Loading
Loading