From cd362011f45c079a39f9d7e1e9cbd1690a32f78d Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Sun, 28 Apr 2024 21:21:41 +0800 Subject: [PATCH] fmt --- src/batch/src/executor/log_row_seq_scan.rs | 82 +++++++------------ src/common/src/array/stream_chunk.rs | 9 ++ .../optimizer/plan_node/generic/log_scan.rs | 21 ++--- .../src/table/batch_table/storage_table.rs | 36 +++----- 4 files changed, 53 insertions(+), 95 deletions(-) diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index a214e127c2f72..1bc0c1e53c109 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -33,9 +33,7 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::{collect_data_chunk, KeyedRow, TableDistribution}; use risingwave_storage::{dispatch_state_store, StateStore}; -use super::{ - BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, ScanRange, -}; +use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; use crate::error::{BatchError, Result}; use crate::monitor::BatchMetricsWithTaskLabels; use crate::task::BatchTaskContext; @@ -51,7 +49,6 @@ pub struct LogRowSeqScanExecutor { metrics: Option, table: StorageTable, - scan_ranges: Vec, old_epoch: BatchQueryEpoch, new_epoch: BatchQueryEpoch, } @@ -59,7 +56,6 @@ pub struct LogRowSeqScanExecutor { impl LogRowSeqScanExecutor { pub fn new( table: StorageTable, - scan_ranges: Vec, old_epoch: BatchQueryEpoch, new_epoch: BatchQueryEpoch, chunk_size: usize, @@ -77,7 +73,6 @@ impl LogRowSeqScanExecutor { schema, metrics, table, - scan_ranges, old_epoch, new_epoch, } @@ -115,7 +110,6 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { // Or it's single distribution, e.g., distinct agg. We scan in a single executor. None => Some(TableDistribution::all_vnodes()), }; - let scan_ranges = vec![ScanRange::full()]; let chunk_size = source.context.get_config().developer.chunk_size as u32; let metrics = source.context().batch_metrics(); @@ -124,7 +118,6 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); Ok(Box::new(LogRowSeqScanExecutor::new( table, - scan_ranges, log_store_seq_scan_node.old_epoch.clone().unwrap(), log_store_seq_scan_node.new_epoch.clone().unwrap(), chunk_size as usize, @@ -156,7 +149,6 @@ impl LogRowSeqScanExecutor { identity, metrics, table, - scan_ranges, old_epoch, new_epoch, schema, @@ -173,42 +165,33 @@ impl LogRowSeqScanExecutor { // Range Scan // WARN: DO NOT use `select` to execute range scans concurrently // it can consume too much memory if there're too many ranges. - for range in scan_ranges { - let stream = Self::execute_range( - table.clone(), - range, - old_epoch.clone(), - new_epoch.clone(), - chunk_size, - histogram.clone(), - Arc::new(schema.clone()), - ); - #[for_await] - for chunk in stream { - let chunk = chunk?; - yield chunk; - } + let stream = Self::execute_range( + table.clone(), + old_epoch.clone(), + new_epoch.clone(), + chunk_size, + histogram.clone(), + Arc::new(schema.clone()), + ); + #[for_await] + for chunk in stream { + let chunk = chunk?; + yield chunk; } } #[try_stream(ok = DataChunk, error = BatchError)] async fn execute_range( table: Arc>, - scan_range: ScanRange, old_epoch: BatchQueryEpoch, new_epoch: BatchQueryEpoch, chunk_size: usize, histogram: Option>, schema: Arc, ) { - let ScanRange { - pk_prefix, - next_col_bounds, - } = scan_range; + let pk_prefix = OwnedRow::default(); let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()]; - let (start_bound, end_bound) = (next_col_bounds.0, next_col_bounds.1); - assert!(pk_prefix.len() < table.pk_indices().len()); // Range Scan. let iter = table .batch_iter_log_with_pk_bounds( @@ -216,29 +199,19 @@ impl LogRowSeqScanExecutor { new_epoch.into(), &pk_prefix, ( - match start_bound { - Bound::Unbounded => { - if order_type.nulls_are_first() { - // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics. - Bound::Excluded(OwnedRow::new(vec![None])) - } else { - // Both start and end are unbounded, so we need to select all rows. - Bound::Unbounded - } - } - _ => unimplemented!("Log iter range need full"), + if order_type.nulls_are_first() { + // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics. + Bound::Excluded(OwnedRow::new(vec![None])) + } else { + // Both start and end are unbounded, so we need to select all rows. + Bound::Unbounded }, - match end_bound { - Bound::Unbounded => { - if order_type.nulls_are_last() { - // `NULL`s are at the end bound side, we should exclude them to meet SQL semantics. - Bound::Excluded(OwnedRow::new(vec![None])) - } else { - // Both start and end are unbounded, so we need to select all rows. - Bound::Unbounded - } - } - _ => unimplemented!("Log iter range need full"), + if order_type.nulls_are_last() { + // `NULL`s are at the end bound side, we should exclude them to meet SQL semantics. + Bound::Excluded(OwnedRow::new(vec![None])) + } else { + // Both start and end are unbounded, so we need to select all rows. + Bound::Unbounded }, ), ) @@ -251,9 +224,10 @@ impl LogRowSeqScanExecutor { let mut iter = iter.as_mut().map(|r| match r { Ok((op, value)) => { let (k, row) = value.into_owned_row_key(); + // Todo! To avoid create a full row. let full_row = row .into_iter() - .chain(vec![Some(ScalarImpl::Int16(op))]) + .chain(vec![Some(ScalarImpl::Int16(op.to_i16()))]) .collect_vec(); let row = OwnedRow::new(full_row); Ok(KeyedRow::::new(k, row)) diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index 157e98a429544..9d76786b6cec6 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -79,6 +79,15 @@ impl Op { Op::UpdateInsert => Op::Insert, } } + + pub fn to_i16(self) -> i16 { + match self { + Op::Insert => 1, + Op::Delete => 2, + Op::UpdateInsert => 3, + Op::UpdateDelete => 4, + } + } } pub type Ops<'a> = &'a [Op]; diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index d420c9a835978..cd5ddebdc0724 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -17,12 +17,11 @@ use std::rc::Rc; use educe::Educe; use pretty_xmlish::Pretty; -use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc}; +use risingwave_common::catalog::{Field, Schema, TableDesc}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::ColumnOrder; use crate::catalog::ColumnId; -use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::optimizer_context::OptimizerContextRef; const OP_NAME: &str = "op"; @@ -48,15 +47,11 @@ pub struct LogScan { } impl LogScan { - pub fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) {} - - pub fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {} - // Used for create batch exec, without op pub fn output_column_ids(&self) -> Vec { self.output_col_idx .iter() - .map(|i| self.get_table_columns()[*i].column_id) + .map(|i| self.table_desc.columns[*i].column_id) .collect() } @@ -64,11 +59,11 @@ impl LogScan { &self.table_desc.pk } - pub(crate) fn column_names_with_table_prefix(&self) -> Vec { + fn column_names_with_table_prefix(&self) -> Vec { let mut out_column_names: Vec<_> = self .output_col_idx .iter() - .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name)) + .map(|&i| format!("{}.{}", self.table_name, self.table_desc.columns[i].name)) .collect(); out_column_names.push(format!("{}.{}", self.table_name, OP_NAME)); out_column_names @@ -78,7 +73,7 @@ impl LogScan { let mut out_column_names: Vec<_> = self .output_col_idx .iter() - .map(|&i| self.get_table_columns()[i].name.clone()) + .map(|&i| self.table_desc.columns[i].name.clone()) .collect(); out_column_names.push(OP_NAME.to_string()); out_column_names @@ -135,7 +130,7 @@ impl LogScan { .output_col_idx .iter() .map(|tb_idx| { - let col = &self.get_table_columns()[*tb_idx]; + let col = &self.table_desc.columns[*tb_idx]; Field::from_with_table_name_prefix(col, &self.table_name) }) .collect(); @@ -149,8 +144,4 @@ impl LogScan { pub(crate) fn ctx(&self) -> OptimizerContextRef { self.ctx.clone() } - - pub fn get_table_columns(&self) -> Vec { - self.table_desc.columns.clone() - } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 1324e7dd68d65..791e958b28774 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -25,6 +25,7 @@ use futures::future::try_join_all; use futures::{Stream, StreamExt}; use futures_async_stream::try_stream; use itertools::{Either, Itertools}; +use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; @@ -415,7 +416,7 @@ impl StorageTableInner { pub trait PkAndRowStream = Stream>> + Send; /// The row iterator of the storage table. -/// The wrapper of [`StorageTableInnerIter`] if pk is not persisted. +/// The wrapper of stream item `StorageResult>` if pk is not persisted. #[async_trait::async_trait] impl TableIter for S { @@ -429,7 +430,7 @@ impl TableIter for S { /// Iterators impl StorageTableInner { - /// Get multiple [`StorageTableInnerIter`] based on the specified vnodes of this table with + /// Get multiple stream item `StorageResult>` based on the specified vnodes of this table with /// `vnode_hint`, and merge or concat them by given `ordered`. async fn iter_with_encoded_key_range( &self, @@ -626,7 +627,7 @@ impl StorageTableInner { .await } - /// Construct a [`StorageTableInnerIter`] for batch executors. + /// Construct a stream item `StorageResult>` for batch executors. /// Differs from the streaming one, this iterator will wait for the epoch before iteration pub async fn batch_iter_with_pk_bounds( &self, @@ -657,18 +658,7 @@ impl StorageTableInner { end_epoch: HummockReadEpoch, pk_prefix: impl Row, range_bounds: impl RangeBounds, - ) -> StorageResult)>> + Send> { - self.iter_log_with_pk_bounds(satrt_epoch, end_epoch, pk_prefix, range_bounds) - .await - } - - async fn iter_log_with_pk_bounds( - &self, - satrt_epoch: HummockReadEpoch, - end_epoch: HummockReadEpoch, - pk_prefix: impl Row, - range_bounds: impl RangeBounds, - ) -> StorageResult)>> + Send> { + ) -> StorageResult)>> + Send> { let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true); let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false); @@ -875,12 +865,6 @@ impl StorageTableInnerIterLogInner { read_options, ) .await?; - // For `HummockStorage`, a cluster recovery will clear storage data and make subsequent - // `HummockReadEpoch::Current` read incomplete. - // `validate_read_epoch` is a safeguard against that incorrect read. It rejects the read - // result if any recovery has happened after `try_wait_epoch`. - store.validate_read_epoch(end_epoch)?; - store.validate_read_epoch(satrt_epoch)?; let iter = Self { iter, mapping, @@ -890,7 +874,7 @@ impl StorageTableInnerIterLogInner { } /// Yield a row with its primary key. - #[try_stream(ok = (i16, KeyedRow), error = StorageError)] + #[try_stream(ok = (Op, KeyedRow), error = StorageError)] async fn into_stream(mut self) { while let Some((k, v)) = self .iter @@ -907,7 +891,7 @@ impl StorageTableInnerIterLogInner { .into_owned_row(); // TODO: may optimize the key clone yield ( - 1, + Op::Insert, KeyedRow:: { vnode_prefixed_key: k.copy_into(), row, @@ -925,7 +909,7 @@ impl StorageTableInnerIterLogInner { .into_owned_row(); // TODO: may optimize the key clone yield ( - 3, + Op::UpdateDelete, KeyedRow:: { vnode_prefixed_key: k.copy_into(), row, @@ -938,7 +922,7 @@ impl StorageTableInnerIterLogInner { .into_owned_row(); // TODO: may optimize the key clone yield ( - 4, + Op::UpdateInsert, KeyedRow:: { vnode_prefixed_key: k.copy_into(), row, @@ -953,7 +937,7 @@ impl StorageTableInnerIterLogInner { .into_owned_row(); // TODO: may optimize the key clone yield ( - 2, + Op::Delete, KeyedRow:: { vnode_prefixed_key: k.copy_into(), row,