Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): Support log iter batch for table or mv #16487

Merged
merged 10 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ message FilterNode {
expr.ExprNode search_condition = 1;
}

message LogRowSeqScanNode {
plan_common.StorageTableDesc table_desc = 1;
// This records the mandatory column_ids of the original table, excluding op
repeated int32 column_ids = 2;
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
// The partition to read for scan tasks.
//
// Will be filled by the scheduler.
common.Buffer vnode_bitmap = 3;
common.BatchQueryEpoch old_epoch = 4;
common.BatchQueryEpoch new_epoch = 5;
}

message InsertNode {
// Id of the table to perform inserting.
uint32 table_id = 1;
Expand Down Expand Up @@ -326,6 +338,7 @@ message PlanNode {
SourceNode source = 34;
SortOverWindowNode sort_over_window = 35;
MaxOneRowNode max_one_row = 36;
LogRowSeqScanNode log_row_seq_scan = 37;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ arrow-schema = { workspace = true }
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
bytes = { version = "1", features = ["serde"] }
either = "1"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand Down
251 changes: 251 additions & 0 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::{Bound, Deref};
use std::sync::Arc;

use bytes::Bytes;
use futures::prelude::stream::StreamExt;
use futures_async_stream::try_stream;
use futures_util::pin_mut;
use itertools::Itertools;
use prometheus::Histogram;
use risingwave_common::array::DataChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::ScalarImpl;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, KeyedRow, TableDistribution};
use risingwave_storage::{dispatch_state_store, StateStore};

use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
use crate::error::{BatchError, Result};
use crate::monitor::BatchMetricsWithTaskLabels;
use crate::task::BatchTaskContext;

pub struct LogRowSeqScanExecutor<S: StateStore> {
chunk_size: usize,
identity: String,
// It is table schema + op column
schema: Schema,

/// Batch metrics.
/// None: Local mode don't record mertics.
metrics: Option<BatchMetricsWithTaskLabels>,

table: StorageTable<S>,
old_epoch: BatchQueryEpoch,
new_epoch: BatchQueryEpoch,
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
pub fn new(
table: StorageTable<S>,
old_epoch: BatchQueryEpoch,
new_epoch: BatchQueryEpoch,
chunk_size: usize,
identity: String,
metrics: Option<BatchMetricsWithTaskLabels>,
) -> Self {
let mut schema = table.schema().clone();
schema.fields.push(Field::with_name(
risingwave_common::types::DataType::Int16,
"op",
));
Self {
chunk_size,
identity,
schema,
metrics,
table,
old_epoch,
new_epoch,
}
}
}

pub struct LogStoreRowSeqScanExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
ensure!(
inputs.is_empty(),
"LogStore row sequential scan should not have input executor!"
);
let log_store_seq_scan_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::LogRowSeqScan
)?;

let table_desc: &StorageTableDesc = log_store_seq_scan_node.get_table_desc()?;
let column_ids = log_store_seq_scan_node
.column_ids
.iter()
.copied()
.map(ColumnId::from)
.collect();

let vnodes = match &log_store_seq_scan_node.vnode_bitmap {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
None => Some(TableDistribution::all_vnodes()),
};

let chunk_size = source.context.get_config().developer.chunk_size as u32;
let metrics = source.context().batch_metrics();

dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
Ok(Box::new(LogRowSeqScanExecutor::new(
table,
log_store_seq_scan_node.old_epoch.clone().unwrap(),
log_store_seq_scan_node.new_epoch.clone().unwrap(),
chunk_size as usize,
source.plan_node().get_identity().clone(),
metrics,
)))
})
}
}
impl<S: StateStore> Executor for LogRowSeqScanExecutor<S> {
fn schema(&self) -> &Schema {
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.do_execute().boxed()
}
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let Self {
chunk_size,
identity,
metrics,
table,
old_epoch,
new_epoch,
schema,
} = *self;
let table = std::sync::Arc::new(table);

// Create collector.
let histogram = metrics.as_ref().map(|metrics| {
metrics
.executor_metrics()
.row_seq_scan_next_duration
.with_guarded_label_values(&metrics.executor_labels(&identity))
});
// Range Scan
// WARN: DO NOT use `select` to execute range scans concurrently
// it can consume too much memory if there're too many ranges.
let stream = Self::execute_range(
table.clone(),
old_epoch.clone(),
new_epoch.clone(),
chunk_size,
histogram.clone(),
Arc::new(schema.clone()),
);
#[for_await]
for chunk in stream {
let chunk = chunk?;
yield chunk;
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
async fn execute_range(
table: Arc<StorageTable<S>>,
old_epoch: BatchQueryEpoch,
new_epoch: BatchQueryEpoch,
chunk_size: usize,
histogram: Option<impl Deref<Target = Histogram>>,
schema: Arc<Schema>,
) {
let pk_prefix = OwnedRow::default();

let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
// Range Scan.
let iter = table
.batch_iter_log_with_pk_bounds(
old_epoch.into(),
new_epoch.into(),
&pk_prefix,
(
if order_type.nulls_are_first() {
// `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
},
if order_type.nulls_are_last() {
// `NULL`s are at the end bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
},
),
)
.await?;

pin_mut!(iter);
loop {
let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

let mut iter = iter.as_mut().map(|r| match r {
Ok((op, value)) => {
let (k, row) = value.into_owned_row_key();
// Todo! To avoid create a full row.
let full_row = row
.into_iter()
.chain(vec![Some(ScalarImpl::Int16(op.to_i16()))])
.collect_vec();
let row = OwnedRow::new(full_row);
Ok(KeyedRow::<Bytes>::new(k, row))
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
}
Err(e) => Err(e),
});
let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to do it in a loop. Currently if there is more than chunk_size rows in iter, we will only collect the first chunk_size rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to the loop above, or is it a nested loop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I thought the iter is created inside the loop. So now we won't miss any rows.

nits: we can do iter.map(|r| { ... }) before the pin_mut, so that we don't have to recreate the map stream in every iteration.

.await
.map_err(BatchError::from)?;
if let Some(timer) = timer {
timer.observe_duration()
}

if let Some(chunk) = chunk {
yield chunk
} else {
break;
}
}
}
}
3 changes: 3 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod iceberg_scan;
mod insert;
mod join;
mod limit;
mod log_row_seq_scan;
mod managed;
mod max_one_row;
mod merge_sort_exchange;
Expand Down Expand Up @@ -80,6 +81,7 @@ pub use update::*;
pub use utils::*;
pub use values::*;

use self::log_row_seq_scan::LogStoreRowSeqScanExecutorBuilder;
use self::test_utils::{BlockExecutorBuilder, BusyLoopExecutorBuilder};
use crate::error::Result;
use crate::executor::sys_row_seq_scan::SysRowSeqScanExecutorBuilder;
Expand Down Expand Up @@ -239,6 +241,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuilder,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder,
NodeBody::LogRowSeqScan => LogStoreRowSeqScanExecutorBuilder,
}
.await?;

Expand Down
1 change: 0 additions & 1 deletion src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.copied()
.map(ColumnId::from)
.collect();

let vnodes = match &seq_scan_node.vnode_bitmap {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
Expand Down
9 changes: 9 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ impl Op {
Op::UpdateInsert => Op::Insert,
}
}

pub fn to_i16(self) -> i16 {
match self {
Op::Insert => 1,
Op::Delete => 2,
Op::UpdateInsert => 3,
Op::UpdateDelete => 4,
}
}
}

pub type Ops<'a> = &'a [Op];
Expand Down
Loading
Loading