diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index b6c00b1a14aa5..6a254332fc712 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -71,6 +71,18 @@ message FilterNode { expr.ExprNode search_condition = 1; } +message LogRowSeqScanNode { + plan_common.StorageTableDesc table_desc = 1; + // This records the mandatory column_ids of the original table, excluding op + repeated int32 column_ids = 2; + // The partition to read for scan tasks. + // + // Will be filled by the scheduler. + common.Buffer vnode_bitmap = 3; + common.BatchQueryEpoch old_epoch = 4; + common.BatchQueryEpoch new_epoch = 5; +} + message InsertNode { // Id of the table to perform inserting. uint32 table_id = 1; @@ -326,6 +338,7 @@ message PlanNode { SourceNode source = 34; SortOverWindowNode sort_over_window = 35; MaxOneRowNode max_one_row = 36; + LogRowSeqScanNode log_row_seq_scan = 37; // The following nodes are used for testing. bool block_executor = 100; bool busy_loop_executor = 101; diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs new file mode 100644 index 0000000000000..c46de5b3a2944 --- /dev/null +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -0,0 +1,250 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::{Bound, Deref}; +use std::sync::Arc; + +use futures::prelude::stream::StreamExt; +use futures_async_stream::try_stream; +use futures_util::pin_mut; +use itertools::Itertools; +use prometheus::Histogram; +use risingwave_common::array::DataChunk; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::{ColumnId, Field, Schema}; +use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::types::ScalarImpl; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::common::BatchQueryEpoch; +use risingwave_pb::plan_common::StorageTableDesc; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::table::{collect_data_chunk, KeyedRow, TableDistribution}; +use risingwave_storage::{dispatch_state_store, StateStore}; + +use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; +use crate::error::{BatchError, Result}; +use crate::monitor::BatchMetricsWithTaskLabels; +use crate::task::BatchTaskContext; + +pub struct LogRowSeqScanExecutor { + chunk_size: usize, + identity: String, + // It is table schema + op column + schema: Schema, + + /// Batch metrics. + /// None: Local mode don't record mertics. + metrics: Option, + + table: StorageTable, + old_epoch: BatchQueryEpoch, + new_epoch: BatchQueryEpoch, +} + +impl LogRowSeqScanExecutor { + pub fn new( + table: StorageTable, + old_epoch: BatchQueryEpoch, + new_epoch: BatchQueryEpoch, + chunk_size: usize, + identity: String, + metrics: Option, + ) -> Self { + let mut schema = table.schema().clone(); + schema.fields.push(Field::with_name( + risingwave_common::types::DataType::Int16, + "op", + )); + Self { + chunk_size, + identity, + schema, + metrics, + table, + old_epoch, + new_epoch, + } + } +} + +pub struct LogStoreRowSeqScanExecutorBuilder {} + +#[async_trait::async_trait] +impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { + async fn new_boxed_executor( + source: &ExecutorBuilder<'_, C>, + inputs: Vec, + ) -> Result { + ensure!( + inputs.is_empty(), + "LogStore row sequential scan should not have input executor!" + ); + let log_store_seq_scan_node = try_match_expand!( + source.plan_node().get_node_body().unwrap(), + NodeBody::LogRowSeqScan + )?; + + let table_desc: &StorageTableDesc = log_store_seq_scan_node.get_table_desc()?; + let column_ids = log_store_seq_scan_node + .column_ids + .iter() + .copied() + .map(ColumnId::from) + .collect(); + + let vnodes = match &log_store_seq_scan_node.vnode_bitmap { + Some(vnodes) => Some(Bitmap::from(vnodes).into()), + // This is possible for dml. vnode_bitmap is not filled by scheduler. + // Or it's single distribution, e.g., distinct agg. We scan in a single executor. + None => Some(TableDistribution::all_vnodes()), + }; + + let chunk_size = source.context.get_config().developer.chunk_size as u32; + let metrics = source.context().batch_metrics(); + + dispatch_state_store!(source.context().state_store(), state_store, { + let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); + Ok(Box::new(LogRowSeqScanExecutor::new( + table, + log_store_seq_scan_node.old_epoch.clone().unwrap(), + log_store_seq_scan_node.new_epoch.clone().unwrap(), + chunk_size as usize, + source.plan_node().get_identity().clone(), + metrics, + ))) + }) + } +} +impl Executor for LogRowSeqScanExecutor { + fn schema(&self) -> &Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl LogRowSeqScanExecutor { + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + let Self { + chunk_size, + identity, + metrics, + table, + old_epoch, + new_epoch, + schema, + } = *self; + let table = std::sync::Arc::new(table); + + // Create collector. + let histogram = metrics.as_ref().map(|metrics| { + metrics + .executor_metrics() + .row_seq_scan_next_duration + .with_guarded_label_values(&metrics.executor_labels(&identity)) + }); + // Range Scan + // WARN: DO NOT use `select` to execute range scans concurrently + // it can consume too much memory if there're too many ranges. + let stream = Self::execute_range( + table.clone(), + old_epoch.clone(), + new_epoch.clone(), + chunk_size, + histogram.clone(), + Arc::new(schema.clone()), + ); + #[for_await] + for chunk in stream { + let chunk = chunk?; + yield chunk; + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn execute_range( + table: Arc>, + old_epoch: BatchQueryEpoch, + new_epoch: BatchQueryEpoch, + chunk_size: usize, + histogram: Option>, + schema: Arc, + ) { + let pk_prefix = OwnedRow::default(); + + let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()]; + // Range Scan. + let iter = table + .batch_iter_log_with_pk_bounds( + old_epoch.into(), + new_epoch.into(), + &pk_prefix, + ( + if order_type.nulls_are_first() { + // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics. + Bound::Excluded(OwnedRow::new(vec![None])) + } else { + // Both start and end are unbounded, so we need to select all rows. + Bound::Unbounded + }, + if order_type.nulls_are_last() { + // `NULL`s are at the end bound side, we should exclude them to meet SQL semantics. + Bound::Excluded(OwnedRow::new(vec![None])) + } else { + // Both start and end are unbounded, so we need to select all rows. + Bound::Unbounded + }, + ), + ) + .await? + .map(|r| match r { + Ok((op, value)) => { + let (k, row) = value.into_owned_row_key(); + // Todo! To avoid create a full row. + let full_row = row + .into_iter() + .chain(vec![Some(ScalarImpl::Int16(op.to_i16()))]) + .collect_vec(); + let row = OwnedRow::new(full_row); + Ok(KeyedRow::<_>::new(k, row)) + } + Err(e) => Err(e), + }); + + pin_mut!(iter); + loop { + let timer = histogram.as_ref().map(|histogram| histogram.start_timer()); + + let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size)) + .await + .map_err(BatchError::from)?; + if let Some(timer) = timer { + timer.observe_duration() + } + + if let Some(chunk) = chunk { + yield chunk + } else { + break; + } + } + } +} diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index ded2af7089826..66c9058cc52b4 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -24,6 +24,7 @@ mod iceberg_scan; mod insert; mod join; mod limit; +mod log_row_seq_scan; mod managed; mod max_one_row; mod merge_sort_exchange; @@ -80,6 +81,7 @@ pub use update::*; pub use utils::*; pub use values::*; +use self::log_row_seq_scan::LogStoreRowSeqScanExecutorBuilder; use self::test_utils::{BlockExecutorBuilder, BusyLoopExecutorBuilder}; use crate::error::Result; use crate::executor::sys_row_seq_scan::SysRowSeqScanExecutorBuilder; @@ -239,6 +241,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { // Follow NodeBody only used for test NodeBody::BlockExecutor => BlockExecutorBuilder, NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder, + NodeBody::LogRowSeqScan => LogStoreRowSeqScanExecutorBuilder, } .await?; diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 4c1261363a72b..1dc52c1c9bb6a 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -177,7 +177,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { .copied() .map(ColumnId::from) .collect(); - let vnodes = match &seq_scan_node.vnode_bitmap { Some(vnodes) => Some(Bitmap::from(vnodes).into()), // This is possible for dml. vnode_bitmap is not filled by scheduler. diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index 776574dc843c9..e3776de6d0fbc 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -80,6 +80,15 @@ impl Op { Op::UpdateInsert => Op::Insert, } } + + pub fn to_i16(self) -> i16 { + match self { + Op::Insert => 1, + Op::Delete => 2, + Op::UpdateInsert => 3, + Op::UpdateDelete => 4, + } + } } pub type Ops<'a> = &'a [Op]; diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index 27f029bf3e4a5..db4616770bc93 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -12,18 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::rc::Rc; + +use fixedbitset::FixedBitSet; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::session_config::QueryMode; use risingwave_common::util::epoch::Epoch; use risingwave_sqlparser::ast::{DeclareCursorStatement, ObjectName, Query, Since, Statement}; -use super::query::{gen_batch_plan_by_statement, gen_batch_plan_fragmenter}; +use super::query::{gen_batch_plan_by_statement, gen_batch_plan_fragmenter, BatchQueryPlanResult}; use super::util::{convert_epoch_to_logstore_i64, convert_unix_millis_to_logstore_i64}; use super::RwPgResponse; use crate::error::{ErrorCode, Result}; use crate::handler::query::create_stream; use crate::handler::HandlerArgs; -use crate::{Binder, OptimizerContext, PgResponseStream}; +use crate::optimizer::plan_node::{generic, BatchLogSeqScan}; +use crate::optimizer::property::{Order, RequiredDist}; +use crate::optimizer::PlanRoot; +use crate::{Binder, OptimizerContext, PgResponseStream, PlanRef, TableCatalog}; pub async fn handle_declare_cursor( handle_args: HandlerArgs, @@ -132,3 +139,60 @@ pub async fn create_stream_for_cursor( }; create_stream(session, plan_fragmenter_result, vec![]).await } + +pub fn create_batch_plan_for_cursor( + table_catalog: std::sync::Arc, + handle_args: HandlerArgs, + old_epoch: u64, + new_epoch: u64, +) -> Result { + let context = OptimizerContext::from_handler_args(handle_args.clone()); + let out_col_idx = table_catalog + .columns + .iter() + .enumerate() + .filter(|(_, v)| !v.is_hidden) + .map(|(i, _)| i) + .collect::>(); + let core = generic::LogScan::new( + table_catalog.name.clone(), + out_col_idx, + Rc::new(table_catalog.table_desc()), + Rc::new(context), + old_epoch, + new_epoch, + ); + let batch_log_seq_scan = BatchLogSeqScan::new(core); + let out_fields = FixedBitSet::from_iter(0..batch_log_seq_scan.core().schema().len()); + let out_names = batch_log_seq_scan.core().column_names(); + // Here we just need a plan_root to call the method, only out_fields and out_names will be used + let mut plan_root = PlanRoot::new( + PlanRef::from(batch_log_seq_scan.clone()), + RequiredDist::single(), + Order::default(), + out_fields, + out_names, + ); + let schema = batch_log_seq_scan.core().schema().clone(); + let (batch_log_seq_scan, query_mode) = match handle_args.session.config().query_mode() { + QueryMode::Auto => ( + plan_root.gen_batch_distributed_plan(PlanRef::from(batch_log_seq_scan))?, + QueryMode::Local, + ), + QueryMode::Local => ( + plan_root.gen_batch_local_plan(PlanRef::from(batch_log_seq_scan))?, + QueryMode::Local, + ), + QueryMode::Distributed => ( + plan_root.gen_batch_distributed_plan(PlanRef::from(batch_log_seq_scan))?, + QueryMode::Distributed, + ), + }; + Ok(BatchQueryPlanResult { + plan: batch_log_seq_scan, + query_mode, + schema, + stmt_type: StatementType::SELECT, + dependent_relations: table_catalog.dependent_relations.clone(), + }) +} diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index e0b69d27f647c..7a93144d48b83 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -972,6 +972,10 @@ fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> boo plan.node_type() == PlanNodeType::BatchSeqScan } + fn is_log_table(plan: &PlanRef) -> bool { + plan.node_type() == PlanNodeType::BatchLogSeqScan + } + fn is_source(plan: &PlanRef) -> bool { plan.node_type() == PlanNodeType::BatchSource || plan.node_type() == PlanNodeType::BatchKafkaScan @@ -996,6 +1000,7 @@ fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> boo || exist_and_no_exchange_before(&plan, is_insert) || exist_and_no_exchange_before(&plan, is_update) || exist_and_no_exchange_before(&plan, is_delete) + || exist_and_no_exchange_before(&plan, is_log_table) } /// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate diff --git a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs new file mode 100644 index 0000000000000..93132ce65e51c --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs @@ -0,0 +1,142 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::LogRowSeqScanNode; +use risingwave_pb::common::BatchQueryEpoch; + +use super::batch::prelude::*; +use super::utils::{childless_record, Distill}; +use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch, TryToBatchPb}; +use crate::catalog::ColumnId; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::ToLocalBatch; +use crate::optimizer::property::{Distribution, DistributionDisplay, Order}; +use crate::scheduler::SchedulerResult; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BatchLogSeqScan { + pub base: PlanBase, + core: generic::LogScan, +} + +impl BatchLogSeqScan { + fn new_inner(core: generic::LogScan, dist: Distribution) -> Self { + let order = Order::any(); + let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order); + + Self { base, core } + } + + pub fn new(core: generic::LogScan) -> Self { + // Use `Single` by default, will be updated later with `clone_with_dist`. + Self::new_inner(core, Distribution::Single) + } + + fn clone_with_dist(&self) -> Self { + Self::new_inner( + self.core.clone(), + match self.core.distribution_key() { + None => Distribution::SomeShard, + Some(distribution_key) => { + if distribution_key.is_empty() { + Distribution::Single + } else { + Distribution::UpstreamHashShard( + distribution_key, + self.core.table_desc.table_id, + ) + } + } + }, + ) + } + + /// Get a reference to the batch seq scan's logical. + #[must_use] + pub fn core(&self) -> &generic::LogScan { + &self.core + } +} + +impl_plan_tree_node_for_leaf! { BatchLogSeqScan } + +impl Distill for BatchLogSeqScan { + fn distill<'a>(&self) -> XmlNode<'a> { + let verbose = self.base.ctx().is_explain_verbose(); + let mut vec = Vec::with_capacity(3); + vec.push(("table", Pretty::from(self.core.table_name.clone()))); + vec.push(("columns", self.core.columns_pretty(verbose))); + + if verbose { + let dist = Pretty::display(&DistributionDisplay { + distribution: self.distribution(), + input_schema: self.base.schema(), + }); + vec.push(("distribution", dist)); + } + + childless_record("BatchScan", vec) + } +} + +impl ToDistributedBatch for BatchLogSeqScan { + fn to_distributed(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl TryToBatchPb for BatchLogSeqScan { + fn try_to_batch_prost_body(&self) -> SchedulerResult { + Ok(NodeBody::LogRowSeqScan(LogRowSeqScanNode { + table_desc: Some(self.core.table_desc.try_to_protobuf()?), + column_ids: self + .core + .output_column_ids() + .iter() + .map(ColumnId::get_id) + .collect(), + vnode_bitmap: None, + old_epoch: Some(BatchQueryEpoch { + epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed( + self.core.old_epoch, + )), + }), + new_epoch: Some(BatchQueryEpoch { + epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed( + self.core.new_epoch, + )), + }), + })) + } +} + +impl ToLocalBatch for BatchLogSeqScan { + fn to_local(&self) -> Result { + let dist = if let Some(distribution_key) = self.core.distribution_key() + && !distribution_key.is_empty() + { + Distribution::UpstreamHashShard(distribution_key, self.core.table_desc.table_id) + } else { + Distribution::SomeShard + }; + Ok(Self::new_inner(self.core.clone(), dist).into()) + } +} + +impl ExprRewritable for BatchLogSeqScan {} + +impl ExprVisitable for BatchLogSeqScan {} diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs new file mode 100644 index 0000000000000..cd5ddebdc0724 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -0,0 +1,147 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::rc::Rc; + +use educe::Educe; +use pretty_xmlish::Pretty; +use risingwave_common::catalog::{Field, Schema, TableDesc}; +use risingwave_common::types::DataType; +use risingwave_common::util::sort_util::ColumnOrder; + +use crate::catalog::ColumnId; +use crate::optimizer::optimizer_context::OptimizerContextRef; + +const OP_NAME: &str = "op"; +const OP_TYPE: DataType = DataType::Int16; + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct LogScan { + pub table_name: String, + /// Include `output_col_idx` and `op_column` + pub output_col_idx: Vec, + /// Descriptor of the table + pub table_desc: Rc, + /// Help `RowSeqLogScan` executor use a better chunk size + pub chunk_size: Option, + + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, + + pub old_epoch: u64, + pub new_epoch: u64, +} + +impl LogScan { + // Used for create batch exec, without op + pub fn output_column_ids(&self) -> Vec { + self.output_col_idx + .iter() + .map(|i| self.table_desc.columns[*i].column_id) + .collect() + } + + pub fn primary_key(&self) -> &[ColumnOrder] { + &self.table_desc.pk + } + + fn column_names_with_table_prefix(&self) -> Vec { + let mut out_column_names: Vec<_> = self + .output_col_idx + .iter() + .map(|&i| format!("{}.{}", self.table_name, self.table_desc.columns[i].name)) + .collect(); + out_column_names.push(format!("{}.{}", self.table_name, OP_NAME)); + out_column_names + } + + pub(crate) fn column_names(&self) -> Vec { + let mut out_column_names: Vec<_> = self + .output_col_idx + .iter() + .map(|&i| self.table_desc.columns[i].name.clone()) + .collect(); + out_column_names.push(OP_NAME.to_string()); + out_column_names + } + + pub fn distribution_key(&self) -> Option> { + let tb_idx_to_op_idx = self + .output_col_idx + .iter() + .enumerate() + .map(|(op_idx, tb_idx)| (*tb_idx, op_idx)) + .collect::>(); + self.table_desc + .distribution_key + .iter() + .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned()) + .collect() + } + + /// Create a logical scan node for log table scan + pub(crate) fn new( + table_name: String, + output_col_idx: Vec, + table_desc: Rc, + ctx: OptimizerContextRef, + old_epoch: u64, + new_epoch: u64, + ) -> Self { + Self { + table_name, + output_col_idx, + table_desc, + chunk_size: None, + ctx, + old_epoch, + new_epoch, + } + } + + pub(crate) fn columns_pretty<'a>(&self, verbose: bool) -> Pretty<'a> { + Pretty::Array( + match verbose { + true => self.column_names_with_table_prefix(), + false => self.column_names(), + } + .into_iter() + .map(Pretty::from) + .collect(), + ) + } + + pub(crate) fn schema(&self) -> Schema { + let mut fields: Vec<_> = self + .output_col_idx + .iter() + .map(|tb_idx| { + let col = &self.table_desc.columns[*tb_idx]; + Field::from_with_table_name_prefix(col, &self.table_name) + }) + .collect(); + fields.push(Field::with_name( + OP_TYPE, + format!("{}.{}", &self.table_name, OP_NAME), + )); + Schema { fields } + } + + pub(crate) fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } +} diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 5154c84017b87..0fcd255cdb03b 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -44,6 +44,8 @@ mod table_scan; pub use table_scan::*; mod sys_scan; pub use sys_scan::*; +mod log_scan; +pub use log_scan::*; mod cdc_scan; pub use cdc_scan::*; diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index e7ad78f373bac..27e1c140b5983 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -827,6 +827,7 @@ mod batch_hash_join; mod batch_hop_window; mod batch_insert; mod batch_limit; +mod batch_log_seq_scan; mod batch_lookup_join; mod batch_max_one_row; mod batch_nested_loop_join; @@ -926,6 +927,7 @@ pub use batch_iceberg_scan::BatchIcebergScan; pub use batch_insert::BatchInsert; pub use batch_kafka_scan::BatchKafkaScan; pub use batch_limit::BatchLimit; +pub use batch_log_seq_scan::BatchLogSeqScan; pub use batch_lookup_join::BatchLookupJoin; pub use batch_max_one_row::BatchMaxOneRow; pub use batch_nested_loop_join::BatchNestedLoopJoin; @@ -1073,6 +1075,7 @@ macro_rules! for_all_plan_nodes { , { Batch, Update } , { Batch, SeqScan } , { Batch, SysSeqScan } + , { Batch, LogSeqScan } , { Batch, HashJoin } , { Batch, NestedLoopJoin } , { Batch, Values } @@ -1181,6 +1184,7 @@ macro_rules! for_batch_plan_nodes { , { Batch, Filter } , { Batch, SeqScan } , { Batch, SysSeqScan } + , { Batch, LogSeqScan } , { Batch, HashJoin } , { Batch, NestedLoopJoin } , { Batch, Values } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index fc83ad0b5c25a..0b597f91a3cfb 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -985,6 +985,22 @@ impl StageRunner { node_body: Some(NodeBody::RowSeqScan(scan_node)), } } + PlanNodeType::BatchLogSeqScan => { + let node_body = execution_plan_node.node.clone(); + let NodeBody::LogRowSeqScan(mut scan_node) = node_body else { + unreachable!(); + }; + let partition = partition + .expect("no partition info for seq scan") + .into_table() + .expect("PartitionInfo should be TablePartitionInfo"); + scan_node.vnode_bitmap = Some(partition.vnode_bitmap); + PlanNodePb { + children: vec![], + identity, + node_body: Some(NodeBody::LogRowSeqScan(scan_node)), + } + } PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan | PlanNodeType::BatchIcebergScan => { diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index a97de0629f5fb..9e6d7451d9ef5 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -472,6 +472,26 @@ impl LocalQueryExecution { node_body: Some(node_body), }) } + PlanNodeType::BatchLogSeqScan => { + let mut node_body = execution_plan_node.node.clone(); + match &mut node_body { + NodeBody::LogRowSeqScan(ref mut scan_node) => { + if let Some(partition) = partition { + let partition = partition + .into_table() + .expect("PartitionInfo should be TablePartitionInfo here"); + scan_node.vnode_bitmap = Some(partition.vnode_bitmap); + } + } + _ => unreachable!(), + } + + Ok(PlanNodePb { + children: vec![], + identity, + node_body: Some(node_body), + }) + } PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan | PlanNodeType::BatchIcebergScan => { diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 33113028e97dc..6dfa31a845b18 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1064,18 +1064,7 @@ impl BatchPlanFragmenter { /// If there are multiple scan nodes in this stage, they must have the same distribution, but /// maybe different vnodes partition. We just use the same partition for all the scan nodes. fn collect_stage_table_scan(&self, node: PlanRef) -> SchedulerResult> { - if node.node_type() == PlanNodeType::BatchExchange { - // Do not visit next stage. - return Ok(None); - } - if let Some(scan_node) = node.as_batch_sys_seq_scan() { - let name = scan_node.core().table_name.to_owned(); - return Ok(Some(TableScanInfo::system_table(name))); - } - - if let Some(scan_node) = node.as_batch_seq_scan() { - let name = scan_node.core().table_name.to_owned(); - let table_desc = &*scan_node.core().table_desc; + let build_table_scan_info = |name, table_desc: &TableDesc, scan_range| { let table_catalog = self .catalog_reader .read_guard() @@ -1085,10 +1074,29 @@ impl BatchPlanFragmenter { let vnode_mapping = self .worker_node_manager .fragment_mapping(table_catalog.fragment_id)?; - let partitions = - derive_partitions(scan_node.scan_ranges(), table_desc, &vnode_mapping)?; + let partitions = derive_partitions(scan_range, table_desc, &vnode_mapping)?; let info = TableScanInfo::new(name, partitions); Ok(Some(info)) + }; + if node.node_type() == PlanNodeType::BatchExchange { + // Do not visit next stage. + return Ok(None); + } + if let Some(scan_node) = node.as_batch_sys_seq_scan() { + let name = scan_node.core().table_name.to_owned(); + Ok(Some(TableScanInfo::system_table(name))) + } else if let Some(scan_node) = node.as_batch_log_seq_scan() { + build_table_scan_info( + scan_node.core().table_name.to_owned(), + &scan_node.core().table_desc, + &[], + ) + } else if let Some(scan_node) = node.as_batch_seq_scan() { + build_table_scan_info( + scan_node.core().table_name.to_owned(), + &scan_node.core().table_desc, + scan_node.scan_ranges(), + ) } else { node.inputs() .into_iter() diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 3cf02d31ce87c..791e958b28774 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -25,6 +25,7 @@ use futures::future::try_join_all; use futures::{Stream, StreamExt}; use futures_async_stream::try_stream; use itertools::{Either, Itertools}; +use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; @@ -45,7 +46,7 @@ use crate::hummock::CachePolicy; use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode}; use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew}; use crate::row_serde::{find_columns_by_ids, ColumnMapping}; -use crate::store::{PrefetchOptions, ReadOptions, StateStoreIter}; +use crate::store::{ChangeLogValue, PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter}; use crate::table::merge_sort::merge_sort; use crate::table::{KeyedRow, TableDistribution, TableIter}; use crate::StateStore; @@ -234,6 +235,7 @@ impl StorageTableInner { let (output_columns, output_indices) = find_columns_by_ids(&table_columns, &output_column_ids); + let mut value_output_indices = vec![]; let mut key_output_indices = vec![]; @@ -414,8 +416,7 @@ impl StorageTableInner { pub trait PkAndRowStream = Stream>> + Send; /// The row iterator of the storage table. -/// The wrapper of [`StorageTableInnerIter`] if pk is not persisted. -pub type StorageTableInnerIter = impl PkAndRowStream; +/// The wrapper of stream item `StorageResult>` if pk is not persisted. #[async_trait::async_trait] impl TableIter for S { @@ -429,7 +430,7 @@ impl TableIter for S { /// Iterators impl StorageTableInner { - /// Get multiple [`StorageTableInnerIter`] based on the specified vnodes of this table with + /// Get multiple stream item `StorageResult>` based on the specified vnodes of this table with /// `vnode_hint`, and merge or concat them by given `ordered`. async fn iter_with_encoded_key_range( &self, @@ -439,7 +440,7 @@ impl StorageTableInner { vnode_hint: Option, ordered: bool, prefetch_options: PrefetchOptions, - ) -> StorageResult> { + ) -> StorageResult>> + Send> { let cache_policy = match ( encoded_key_range.start_bound(), encoded_key_range.end_bound(), @@ -516,77 +517,67 @@ impl StorageTableInner { Ok(iter) } - /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds. - async fn iter_with_pk_bounds( + // TODO: directly use `prefixed_range`. + fn serialize_pk_bound( &self, - epoch: HummockReadEpoch, pk_prefix: impl Row, - range_bounds: impl RangeBounds, - ordered: bool, - prefetch_options: PrefetchOptions, - ) -> StorageResult> { - // TODO: directly use `prefixed_range`. - fn serialize_pk_bound( - pk_serializer: &OrderedRowSerde, - pk_prefix: impl Row, - range_bound: Bound<&OwnedRow>, - is_start_bound: bool, - ) -> Bound { - match range_bound { - Included(k) => { - let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len() + k.len()); - let key = pk_prefix.chain(k); - let serialized_key = serialize_pk(&key, &pk_prefix_serializer); - if is_start_bound { - Included(serialized_key) - } else { - // Should use excluded next key for end bound. - // Otherwise keys starting with the bound is not included. - end_bound_of_prefix(&serialized_key) - } + range_bound: Bound<&OwnedRow>, + is_start_bound: bool, + ) -> Bound { + match range_bound { + Included(k) => { + let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len()); + let key = pk_prefix.chain(k); + let serialized_key = serialize_pk(&key, &pk_prefix_serializer); + if is_start_bound { + Included(serialized_key) + } else { + // Should use excluded next key for end bound. + // Otherwise keys starting with the bound is not included. + end_bound_of_prefix(&serialized_key) } - Excluded(k) => { - let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len() + k.len()); - let key = pk_prefix.chain(k); - let serialized_key = serialize_pk(&key, &pk_prefix_serializer); - if is_start_bound { - // Storage doesn't support excluded begin key yet, so transform it to - // included. - // We always serialize a u8 for null of datum which is not equal to '\xff', - // so we can assert that the next_key would never be empty. - let next_serialized_key = next_key(&serialized_key); - assert!(!next_serialized_key.is_empty()); - Included(Bytes::from(next_serialized_key)) - } else { - Excluded(serialized_key) - } + } + Excluded(k) => { + let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len()); + let key = pk_prefix.chain(k); + let serialized_key = serialize_pk(&key, &pk_prefix_serializer); + if is_start_bound { + // Storage doesn't support excluded begin key yet, so transform it to + // included. + // We always serialize a u8 for null of datum which is not equal to '\xff', + // so we can assert that the next_key would never be empty. + let next_serialized_key = next_key(&serialized_key); + assert!(!next_serialized_key.is_empty()); + Included(Bytes::from(next_serialized_key)) + } else { + Excluded(serialized_key) } - Unbounded => { - let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len()); - let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer); - if pk_prefix.is_empty() { - Unbounded - } else if is_start_bound { - Included(serialized_pk_prefix) - } else { - end_bound_of_prefix(&serialized_pk_prefix) - } + } + Unbounded => { + let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len()); + let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer); + if pk_prefix.is_empty() { + Unbounded + } else if is_start_bound { + Included(serialized_pk_prefix) + } else { + end_bound_of_prefix(&serialized_pk_prefix) } } } + } - let start_key = serialize_pk_bound( - &self.pk_serializer, - &pk_prefix, - range_bounds.start_bound(), - true, - ); - let end_key = serialize_pk_bound( - &self.pk_serializer, - &pk_prefix, - range_bounds.end_bound(), - false, - ); + /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds. + async fn iter_with_pk_bounds( + &self, + epoch: HummockReadEpoch, + pk_prefix: impl Row, + range_bounds: impl RangeBounds, + ordered: bool, + prefetch_options: PrefetchOptions, + ) -> StorageResult>> + Send> { + let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true); + let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false); assert!(pk_prefix.len() <= self.pk_indices.len()); let pk_prefix_indices = (0..pk_prefix.len()) @@ -636,7 +627,7 @@ impl StorageTableInner { .await } - /// Construct a [`StorageTableInnerIter`] for batch executors. + /// Construct a stream item `StorageResult>` for batch executors. /// Differs from the streaming one, this iterator will wait for the epoch before iteration pub async fn batch_iter_with_pk_bounds( &self, @@ -645,7 +636,7 @@ impl StorageTableInner { range_bounds: impl RangeBounds, ordered: bool, prefetch_options: PrefetchOptions, - ) -> StorageResult> { + ) -> StorageResult>> + Send> { self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options) .await } @@ -656,10 +647,64 @@ impl StorageTableInner { epoch: HummockReadEpoch, ordered: bool, prefetch_options: PrefetchOptions, - ) -> StorageResult> { + ) -> StorageResult>> + Send> { self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options) .await } + + pub async fn batch_iter_log_with_pk_bounds( + &self, + satrt_epoch: HummockReadEpoch, + end_epoch: HummockReadEpoch, + pk_prefix: impl Row, + range_bounds: impl RangeBounds, + ) -> StorageResult)>> + Send> { + let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true); + let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false); + + assert!(pk_prefix.len() <= self.pk_indices.len()); + let table_key_ranges = { + // Vnodes that are set and should be accessed. + let vnodes = match self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix) { + // If `vnode_hint` is set, we can only access this single vnode. + Some(vnode) => Either::Left(std::iter::once(vnode)), + // Otherwise, we need to access all vnodes of this table. + None => Either::Right(self.distribution.vnodes().iter_vnodes()), + }; + vnodes + .map(|vnode| prefixed_range_with_vnode((start_key.clone(), end_key.clone()), vnode)) + }; + + let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| async move { + let read_options = ReadLogOptions { + table_id: self.table_id, + }; + let iter = StorageTableInnerIterLogInner::::new( + &self.store, + self.mapping.clone(), + self.row_serde.clone(), + table_key_range, + read_options, + satrt_epoch, + end_epoch, + ) + .await? + .into_stream(); + Ok::<_, StorageError>(iter) + })) + .await?; + + #[auto_enum(futures03::Stream)] + let iter = match iterators.len() { + 0 => unreachable!(), + 1 => iterators.into_iter().next().unwrap(), + // Concat all iterators if not to preserve order. + _ => futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec()) + .flatten_unordered(1024), + }; + + Ok(iter) + } } /// [`StorageTableInnerIterInner`] iterates on the storage table. @@ -787,3 +832,119 @@ impl StorageTableInnerIterInner { } } } + +/// [`StorageTableInnerIterLogInner`] iterates on the storage table. +struct StorageTableInnerIterLogInner { + /// An iterator that returns raw bytes from storage. + iter: S::ChangeLogIter, + + mapping: Arc, + + row_deserializer: Arc, +} + +impl StorageTableInnerIterLogInner { + /// If `wait_epoch` is true, it will wait for the given epoch to be committed before iteration. + #[allow(clippy::too_many_arguments)] + async fn new( + store: &S, + mapping: Arc, + row_deserializer: Arc, + table_key_range: TableKeyRange, + read_options: ReadLogOptions, + satrt_epoch: HummockReadEpoch, + end_epoch: HummockReadEpoch, + ) -> StorageResult { + let raw_satrt_epoch = satrt_epoch.get_epoch(); + let raw_end_epoch = end_epoch.get_epoch(); + store.try_wait_epoch(end_epoch).await?; + let iter = store + .iter_log( + (raw_satrt_epoch, raw_end_epoch), + table_key_range, + read_options, + ) + .await?; + let iter = Self { + iter, + mapping, + row_deserializer, + }; + Ok(iter) + } + + /// Yield a row with its primary key. + #[try_stream(ok = (Op, KeyedRow), error = StorageError)] + async fn into_stream(mut self) { + while let Some((k, v)) = self + .iter + .try_next() + .verbose_instrument_await("storage_table_iter_next") + .await? + { + match v { + ChangeLogValue::Insert(value) => { + let full_row = self.row_deserializer.deserialize(value)?; + let row = self + .mapping + .project(OwnedRow::new(full_row)) + .into_owned_row(); + // TODO: may optimize the key clone + yield ( + Op::Insert, + KeyedRow:: { + vnode_prefixed_key: k.copy_into(), + row, + }, + ); + } + ChangeLogValue::Update { + new_value, + old_value, + } => { + let full_row = self.row_deserializer.deserialize(old_value)?; + let row = self + .mapping + .project(OwnedRow::new(full_row)) + .into_owned_row(); + // TODO: may optimize the key clone + yield ( + Op::UpdateDelete, + KeyedRow:: { + vnode_prefixed_key: k.copy_into(), + row, + }, + ); + let full_row = self.row_deserializer.deserialize(new_value)?; + let row = self + .mapping + .project(OwnedRow::new(full_row)) + .into_owned_row(); + // TODO: may optimize the key clone + yield ( + Op::UpdateInsert, + KeyedRow:: { + vnode_prefixed_key: k.copy_into(), + row, + }, + ); + } + ChangeLogValue::Delete(value) => { + let full_row = self.row_deserializer.deserialize(value)?; + let row = self + .mapping + .project(OwnedRow::new(full_row)) + .into_owned_row(); + // TODO: may optimize the key clone + yield ( + Op::Delete, + KeyedRow:: { + vnode_prefixed_key: k.copy_into(), + row, + }, + ); + } + } + } + } +} diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 9be20b2cce538..d245e4bde3790 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -122,6 +122,10 @@ impl> KeyedRow { self.row } + pub fn into_owned_row_key(self) -> (TableKey, OwnedRow) { + (self.vnode_prefixed_key, self.row) + } + pub fn vnode(&self) -> VirtualNode { self.vnode_prefixed_key.vnode_part() }