diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 9bf5437857196..4adff202775de 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -74,14 +74,12 @@ message FilterNode { message LogRowSeqScanNode{ plan_common.StorageTableDesc table_desc = 1; repeated int32 column_ids = 2; - // All the ranges need to be read. i.e., they are OR'ed. - // - // Empty `scan_ranges` means full table scan. - repeated ScanRange scan_ranges = 3; // The partition to read for scan tasks. // // Will be filled by the scheduler. - common.Buffer vnode_bitmap = 4; + common.Buffer vnode_bitmap = 3; + common.BatchQueryEpoch old_epoch = 4; + common.BatchQueryEpoch new_epoch = 5; } message InsertNode { diff --git a/src/batch/src/executor/generic_exchange.rs b/src/batch/src/executor/generic_exchange.rs index a0c1d46d00298..24ff47958dd3f 100644 --- a/src/batch/src/executor/generic_exchange.rs +++ b/src/batch/src/executor/generic_exchange.rs @@ -151,7 +151,6 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder { source: &ExecutorBuilder<'_, C>, inputs: Vec, ) -> Result { - println!("333"); ensure!( inputs.is_empty(), "Exchange executor should not have children!" diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index e381a45b4117f..07e65b50856ef 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::{Bound, Deref, RangeBounds}; +use std::ops::{Bound, Deref}; use std::sync::Arc; use futures::prelude::stream::StreamExt; @@ -23,9 +23,6 @@ use risingwave_common::array::DataChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnId, Schema}; use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::DataType; -use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::epoch::{MAX_EPOCH, Epoch}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::plan_common::StorageTableDesc; @@ -34,8 +31,7 @@ use risingwave_storage::table::{collect_data_chunk, TableDistribution}; use risingwave_storage::{dispatch_state_store, StateStore}; use super::{ - BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, - RowSeqScanExecutor, ScanRange, + BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, ScanRange, }; use crate::error::{BatchError, Result}; use crate::monitor::BatchMetricsWithTaskLabels; @@ -51,14 +47,16 @@ pub struct LogRowSeqScanExecutor { table: StorageTable, scan_ranges: Vec, - epoch: BatchQueryEpoch, + old_epoch: BatchQueryEpoch, + new_epoch: BatchQueryEpoch, } impl LogRowSeqScanExecutor { pub fn new( table: StorageTable, scan_ranges: Vec, - epoch: BatchQueryEpoch, + old_epoch: BatchQueryEpoch, + new_epoch: BatchQueryEpoch, chunk_size: usize, identity: String, metrics: Option, @@ -69,7 +67,8 @@ impl LogRowSeqScanExecutor { metrics, table, scan_ranges, - epoch, + old_epoch, + new_epoch, } } } @@ -82,7 +81,6 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { source: &ExecutorBuilder<'_, C>, inputs: Vec, ) -> Result { - println!("111"); ensure!( inputs.is_empty(), "LogStore row sequential scan should not have input executor!" @@ -93,6 +91,12 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { )?; let table_desc: &StorageTableDesc = log_store_seq_scan_node.get_table_desc()?; + let op_id = log_store_seq_scan_node + .column_ids + .iter() + .max() + .map(ColumnId::from) + .unwrap(); let column_ids = log_store_seq_scan_node .column_ids .iter() @@ -106,39 +110,24 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { // Or it's single distribution, e.g., distinct agg. We scan in a single executor. None => Some(TableDistribution::all_vnodes()), }; + let scan_ranges = vec![ScanRange::full()]; - let scan_ranges = { - let scan_ranges = &log_store_seq_scan_node.scan_ranges; - if scan_ranges.is_empty() { - vec![ScanRange::full()] - } else { - scan_ranges - .iter() - .map(|scan_range| { - let pk_types = table_desc.pk.iter().map(|order| { - DataType::from( - table_desc.columns[order.column_index as usize] - .column_type - .as_ref() - .unwrap(), - ) - }); - ScanRange::new(scan_range.clone(), pk_types) - }) - .try_collect()? - } - }; - - let epoch = source.epoch.clone(); let chunk_size = source.context.get_config().developer.chunk_size as u32; let metrics = source.context().batch_metrics(); dispatch_state_store!(source.context().state_store(), state_store, { - let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); + let table = StorageTable::new_partial_inner( + state_store, + column_ids, + vnodes, + table_desc, + vec![op_id], + ); Ok(Box::new(LogRowSeqScanExecutor::new( table, scan_ranges, - epoch, + log_store_seq_scan_node.old_epoch.clone().unwrap(), + log_store_seq_scan_node.new_epoch.clone().unwrap(), chunk_size as usize, source.plan_node().get_identity().clone(), metrics, @@ -169,7 +158,8 @@ impl LogRowSeqScanExecutor { metrics, table, scan_ranges, - epoch, + old_epoch, + new_epoch, } = *self; let table = std::sync::Arc::new(table); @@ -180,44 +170,22 @@ impl LogRowSeqScanExecutor { .row_seq_scan_next_duration .with_guarded_label_values(&metrics.executor_labels(&identity)) }); - - // if ordered { - // // Currently we execute range-scans concurrently so the order is not guaranteed if - // // there're multiple ranges. - // // TODO: reserve the order for multiple ranges. - // assert_eq!(scan_ranges.len(), 1); - // } - - // the number of rows have been returned as execute result - let mut returned = 0; - - let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size); - // Range Scan // WARN: DO NOT use `select` to execute range scans concurrently // it can consume too much memory if there're too many ranges. for range in scan_ranges { - println!("test"); let stream = Self::execute_range( table.clone(), range, - // ordered, - epoch.clone(), + old_epoch.clone(), + new_epoch.clone(), chunk_size, - // limit, histogram.clone(), ); #[for_await] for chunk in stream { - println!("{:?}", chunk); let chunk = chunk?; - returned += chunk.cardinality() as u64; yield chunk; - // if let Some(limit) = &limit - // && returned >= *limit - // { - // return Ok(()); - // } } } } @@ -226,41 +194,29 @@ impl LogRowSeqScanExecutor { async fn execute_range( table: Arc>, scan_range: ScanRange, - // ordered: bool, - epoch: BatchQueryEpoch, + old_epoch: BatchQueryEpoch, + new_epoch: BatchQueryEpoch, chunk_size: usize, - // limit: Option, histogram: Option>, ) { - let epoch1 = BatchQueryEpoch{ epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(65536))}; - let epoch2 = BatchQueryEpoch{ epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(MAX_EPOCH - 1))}; - let ScanRange { pk_prefix, next_col_bounds, } = scan_range; let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()]; - let (start_bound, end_bound) = if order_type.is_ascending() { - (next_col_bounds.0, next_col_bounds.1) - } else { - (next_col_bounds.1, next_col_bounds.0) - }; - - let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded); - let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded); - - // Range Scan. + let (start_bound, end_bound) = (next_col_bounds.0, next_col_bounds.1); assert!(pk_prefix.len() < table.pk_indices().len()); + // Range Scan. let iter = table .batch_iter_log_with_pk_bounds( - epoch1.into(), - epoch2.into(), + old_epoch.into(), + new_epoch.into(), &pk_prefix, ( match start_bound { Bound::Unbounded => { - if end_bound_is_bounded && order_type.nulls_are_first() { + if order_type.nulls_are_first() { // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics. Bound::Excluded(OwnedRow::new(vec![None])) } else { @@ -268,12 +224,11 @@ impl LogRowSeqScanExecutor { Bound::Unbounded } } - Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])), - Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])), + _ => unimplemented!("Log iter range need full"), }, match end_bound { Bound::Unbounded => { - if start_bound_is_bounded && order_type.nulls_are_last() { + if order_type.nulls_are_last() { // `NULL`s are at the end bound side, we should exclude them to meet SQL semantics. Bound::Excluded(OwnedRow::new(vec![None])) } else { @@ -281,8 +236,7 @@ impl LogRowSeqScanExecutor { Bound::Unbounded } } - Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])), - Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])), + _ => unimplemented!("Log iter range need full"), }, ), ) @@ -298,7 +252,6 @@ impl LogRowSeqScanExecutor { if let Some(timer) = timer { timer.observe_duration() } - println!("chunk...{:?}", chunk); if let Some(chunk) = chunk { yield chunk diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 8e732f8a5ee08..66c9058cc52b4 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -203,7 +203,6 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { #[async_recursion] async fn try_build(&self) -> Result { let mut inputs = Vec::with_capacity(self.plan_node.children.len()); - println!("input_node: {:?}", self.plan_node); for input_node in &self.plan_node.children { let input = self.clone_for_plan(input_node).build().await?; inputs.push(input); diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 5dc280b1420d2..1dc52c1c9bb6a 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -161,7 +161,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { source: &ExecutorBuilder<'_, C>, inputs: Vec, ) -> Result { - println!("222"); ensure!( inputs.is_empty(), "Row sequential scan should not have input executor!" @@ -178,7 +177,6 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { .copied() .map(ColumnId::from) .collect(); - let vnodes = match &seq_scan_node.vnode_bitmap { Some(vnodes) => Some(Bitmap::from(vnodes).into()), // This is possible for dml. vnode_bitmap is not filled by scheduler. diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 82d2f22f41cb4..05871a3ff4457 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -27,7 +27,7 @@ use crate::types::DataType; /// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is /// not globally unique. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] pub struct ColumnId(i32); impl std::fmt::Debug for ColumnId { diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index fdc822991c4f4..f88f58ff6d84f 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -18,7 +18,8 @@ use std::ops::Deref; use either::Either; use itertools::{EitherOrBoth, Itertools}; use risingwave_common::bail; -use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME}; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, TableId, DEFAULT_SCHEMA_NAME}; +use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_sqlparser::ast::{ AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias, TableFactor, @@ -31,8 +32,11 @@ use super::bind_context::ColumnBinding; use super::statement::RewriteExprsRecursive; use crate::binder::bind_context::{BindingCte, BindingCteState}; use crate::binder::Binder; +use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::CatalogError; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ExprImpl, InputRef}; +use crate::TableCatalog; mod cte_ref; mod join; @@ -343,7 +347,6 @@ impl Binder { name: ObjectName, alias: Option, as_of: Option, - query_log: bool, ) -> Result { let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; @@ -401,16 +404,95 @@ impl Binder { } } } else { - self.bind_relation_by_name_inner( - schema_name.as_deref(), - &table_name, - alias, - as_of, - query_log, - ) + self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, as_of) } } + pub fn bind_log_table_relation_by_name( + &mut self, + name: ObjectName, + old_epoch: u64, + new_epoch: u64, + ) -> Result { + let resolve_log_table_relation = |log_table_catalog: &std::sync::Arc| { + let log_table_catalog = &*log_table_catalog.clone(); + let mut log_table_catalog = log_table_catalog.clone(); + let next_column_id = log_table_catalog + .columns + .iter() + .max_by(|a, b| { + a.column_desc + .column_id + .get_id() + .cmp(&b.column_desc.column_id.get_id()) + }) + .map(|c| c.column_desc.column_id) + .unwrap_or_default() + .next(); + + log_table_catalog.columns.push(ColumnCatalog { + column_desc: ColumnDesc::named( + "op", + next_column_id, + risingwave_common::types::DataType::Int16, + ), + is_hidden: false, + }); + let log_table_catalog = std::sync::Arc::new(log_table_catalog); + let table = BoundLogTable { + table_id: log_table_catalog.id(), + table_catalog: log_table_catalog.clone(), + old_epoch, + new_epoch, + }; + ( + Relation::LogTable(Box::new(table)), + log_table_catalog + .columns + .iter() + .map(|c| (c.is_hidden, Field::from(&c.column_desc))) + .collect_vec(), + ) + }; + + let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; + let (ret, columns) = match schema_name { + Some(name) => { + let schema_path = SchemaPath::Name(&name); + if let Ok((table_catalog, _)) = + self.catalog + .get_table_by_name(&self.db_name, schema_path, &table_name) + { + resolve_log_table_relation(table_catalog) + } else { + return Err(CatalogError::NotFound("table", table_name.to_string()).into()); + } + } + None => (|| { + let user_name = &self.auth_context.user_name; + for path in self.search_path.path() { + let schema_name = if path == USER_NAME_WILD_CARD { + user_name + } else { + path + }; + if let Ok(schema) = self.catalog.get_schema_by_name(&self.db_name, schema_name) + { + if let Some(table_catalog) = schema.get_table_by_name(&table_name) { + return Ok(resolve_log_table_relation(table_catalog)); + } + } + } + Err(Into::::into(CatalogError::NotFound( + "table", + table_name.to_string(), + ))) + })()?, + }; + self.bind_table_to_context(columns, table_name.to_string(), None)?; + Ok(ret) + } + // Bind a relation provided a function arg. fn bind_relation_by_function_arg( &mut self, @@ -427,7 +509,7 @@ impl Binder { }?; Ok(( - self.bind_relation_by_name(table_name.clone(), None, None, false)?, + self.bind_relation_by_name(table_name.clone(), None, None)?, table_name, )) } @@ -477,17 +559,14 @@ impl Binder { .map_or(DEFAULT_SCHEMA_NAME.to_string(), |arg| arg.to_string()); let table_name = self.catalog.get_table_name_by_id(table_id)?; - self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, None, false) + self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, None) } pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result { match table_factor { - TableFactor::Table { - name, - alias, - as_of, - query_log, - } => self.bind_relation_by_name(name, alias, as_of, query_log), + TableFactor::Table { name, alias, as_of } => { + self.bind_relation_by_name(name, alias, as_of) + } TableFactor::TableFunction { name, alias, @@ -528,6 +607,11 @@ impl Binder { self.pop_and_merge_lateral_context()?; Ok(bound_join) } + TableFactor::LogTable { + name, + old_epoch, + new_epoch, + } => self.bind_log_table_relation_by_name(name, old_epoch, new_epoch), } } } diff --git a/src/frontend/src/binder/relation/table_function.rs b/src/frontend/src/binder/relation/table_function.rs index 804243d3cbaf5..9189e176e0d55 100644 --- a/src/frontend/src/binder/relation/table_function.rs +++ b/src/frontend/src/binder/relation/table_function.rs @@ -73,7 +73,6 @@ impl Binder { PG_KEYWORDS_TABLE_NAME, alias, None, - false, ); } } diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 49966790af818..606ae0bd331b5 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -46,6 +46,8 @@ pub struct BoundBaseTable { pub struct BoundLogTable { pub table_id: TableId, pub table_catalog: Arc, + pub new_epoch: u64, + pub old_epoch: u64, } #[derive(Debug, Clone)] @@ -78,7 +80,6 @@ impl Binder { table_name: &str, alias: Option, as_of: Option, - query_log: bool, ) -> Result { // define some helper functions converting catalog to bound relation let resolve_sys_table_relation = |sys_table_catalog: &Arc| { @@ -96,21 +97,6 @@ impl Binder { ) }; - let resolve_log_table_relation = |log_table_catalog: &Arc| { - let table = BoundLogTable { - table_id: log_table_catalog.id(), - table_catalog: log_table_catalog.clone(), - }; - ( - Relation::LogTable(Box::new(table)), - log_table_catalog - .columns - .iter() - .map(|c| (c.is_hidden, Field::from(&c.column_desc))) - .collect_vec(), - ) - }; - // start to bind let (ret, columns) = { match schema_name { @@ -145,13 +131,7 @@ impl Binder { self.catalog .get_table_by_name(&self.db_name, schema_path, table_name) { - if query_log{ - resolve_log_table_relation( - table_catalog - ) - }else{ - self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)? - } + self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)? } else if let Ok((source_catalog, _)) = self.catalog .get_source_by_name(&self.db_name, schema_path, table_name) @@ -190,20 +170,12 @@ impl Binder { if let Ok(schema) = self.catalog.get_schema_by_name(&self.db_name, schema_name) { - if let Some(table_catalog) = - schema.get_table_by_name(table_name) - { - if query_log{ - return Ok(resolve_log_table_relation( - table_catalog - )); - }else{ - return self.resolve_table_relation( - table_catalog.clone(), - &schema_name.clone(), - as_of, - ); - } + if let Some(table_catalog) = schema.get_table_by_name(table_name) { + return self.resolve_table_relation( + table_catalog.clone(), + &schema_name.clone(), + as_of, + ); } else if let Some(source_catalog) = schema.get_source_by_name(table_name) { diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index aa3eb1a16d3ba..27f029bf3e4a5 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -12,17 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::StreamExt; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::util::epoch::Epoch; use risingwave_sqlparser::ast::{DeclareCursorStatement, ObjectName, Query, Since, Statement}; use super::query::{gen_batch_plan_by_statement, gen_batch_plan_fragmenter}; -use super::util::{ - convert_epoch_to_logstore_i64, convert_unix_millis_to_logstore_i64, - gen_query_from_logstore_ge_rw_timestamp, -}; +use super::util::{convert_epoch_to_logstore_i64, convert_unix_millis_to_logstore_i64}; use super::RwPgResponse; use crate::error::{ErrorCode, Result}; use crate::handler::query::create_stream; @@ -62,14 +58,6 @@ async fn handle_declare_subscription_cursor( let cursor_from_subscription_name = sub_name.0.last().unwrap().real_value().clone(); let subscription = session.get_subscription_by_name(schema_name, &cursor_from_subscription_name)?; - let a = gen_query_from_logstore_ge_rw_timestamp(&subscription.subscription_from_name, 0); - let mut a = create_stream_for_cursor(handle_args.clone(), Statement::Query(Box::new(a))) - .await - .unwrap(); - println!("23156{:?}", a.1); - while let Some(a) = a.0.next().await { - println!("{:?}", a); - } // Start the first query of cursor, which includes querying the table and querying the subscription's logstore let start_rw_timestamp = match rw_timestamp { Some(risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp)) => { diff --git a/src/frontend/src/handler/describe.rs b/src/frontend/src/handler/describe.rs index 238a1fe4cefc8..0cb2bf02304a7 100644 --- a/src/frontend/src/handler/describe.rs +++ b/src/frontend/src/handler/describe.rs @@ -35,125 +35,124 @@ pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Re CatalogError::NotFound("table, source, sink or view", object_name.to_string()); // Vec, Vec, Vec, Vec>, String, Option - let (columns, pk_columns, dist_columns, indices, relname, description) = if let Ok(relation) = - binder.bind_relation_by_name(object_name.clone(), None, None, false) - { - match relation { - Relation::Source(s) => { - let pk_column_catalogs = s - .catalog - .pk_col_ids - .iter() - .map(|&column_id| { - s.catalog - .columns - .iter() - .filter(|x| x.column_id() == column_id) - .map(|x| x.column_desc.clone()) - .exactly_one() - .unwrap() - }) - .collect_vec(); - ( - s.catalog.columns, - pk_column_catalogs, - vec![], - vec![], - s.catalog.name, - None, // Description - ) - } - Relation::BaseTable(t) => { - let pk_column_catalogs = t - .table_catalog - .pk() - .iter() - .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone()) - .collect_vec(); - let dist_columns = t - .table_catalog - .distribution_key() - .iter() - .map(|idx| t.table_catalog.columns[*idx].column_desc.clone()) - .collect_vec(); - ( - t.table_catalog.columns.clone(), - pk_column_catalogs, - dist_columns, - t.table_indexes, - t.table_catalog.name.clone(), - t.table_catalog.description.clone(), - ) - } - Relation::SystemTable(t) => { - let pk_column_catalogs = t - .sys_table_catalog - .pk - .iter() - .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone()) - .collect_vec(); - ( - t.sys_table_catalog.columns.clone(), - pk_column_catalogs, - vec![], - vec![], - t.sys_table_catalog.name.clone(), - None, // Description - ) - } - Relation::Share(_) => { - if let Ok(view) = binder.bind_view_by_name(object_name.clone()) { - let columns = view - .view_catalog - .columns + let (columns, pk_columns, dist_columns, indices, relname, description) = + if let Ok(relation) = binder.bind_relation_by_name(object_name.clone(), None, None) { + match relation { + Relation::Source(s) => { + let pk_column_catalogs = s + .catalog + .pk_col_ids .iter() - .enumerate() - .map(|(idx, field)| ColumnCatalog { - column_desc: ColumnDesc::from_field_with_column_id(field, idx as _), - is_hidden: false, + .map(|&column_id| { + s.catalog + .columns + .iter() + .filter(|x| x.column_id() == column_id) + .map(|x| x.column_desc.clone()) + .exactly_one() + .unwrap() }) - .collect(); + .collect_vec(); ( - columns, + s.catalog.columns, + pk_column_catalogs, vec![], vec![], + s.catalog.name, + None, // Description + ) + } + Relation::BaseTable(t) => { + let pk_column_catalogs = t + .table_catalog + .pk() + .iter() + .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone()) + .collect_vec(); + let dist_columns = t + .table_catalog + .distribution_key() + .iter() + .map(|idx| t.table_catalog.columns[*idx].column_desc.clone()) + .collect_vec(); + ( + t.table_catalog.columns.clone(), + pk_column_catalogs, + dist_columns, + t.table_indexes, + t.table_catalog.name.clone(), + t.table_catalog.description.clone(), + ) + } + Relation::SystemTable(t) => { + let pk_column_catalogs = t + .sys_table_catalog + .pk + .iter() + .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone()) + .collect_vec(); + ( + t.sys_table_catalog.columns.clone(), + pk_column_catalogs, vec![], - view.view_catalog.name.clone(), - None, + vec![], + t.sys_table_catalog.name.clone(), + None, // Description ) - } else { + } + Relation::Share(_) => { + if let Ok(view) = binder.bind_view_by_name(object_name.clone()) { + let columns = view + .view_catalog + .columns + .iter() + .enumerate() + .map(|(idx, field)| ColumnCatalog { + column_desc: ColumnDesc::from_field_with_column_id(field, idx as _), + is_hidden: false, + }) + .collect(); + ( + columns, + vec![], + vec![], + vec![], + view.view_catalog.name.clone(), + None, + ) + } else { + return Err(not_found_err.into()); + } + } + _ => { return Err(not_found_err.into()); } } - _ => { - return Err(not_found_err.into()); - } - } - } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) { - let columns = sink.sink_catalog.full_columns().to_vec(); - let pk_columns = sink - .sink_catalog - .downstream_pk_indices() - .into_iter() - .map(|idx| columns[idx].column_desc.clone()) - .collect_vec(); - let dist_columns = sink - .sink_catalog - .distribution_key - .iter() - .map(|idx| columns[*idx].column_desc.clone()) - .collect_vec(); - ( - columns, - pk_columns, - dist_columns, - vec![], - sink.sink_catalog.name.clone(), - None, - ) - } else { - return Err(not_found_err.into()); - }; + } else if let Ok(sink) = binder.bind_sink_by_name(object_name.clone()) { + let columns = sink.sink_catalog.full_columns().to_vec(); + let pk_columns = sink + .sink_catalog + .downstream_pk_indices() + .into_iter() + .map(|idx| columns[idx].column_desc.clone()) + .collect_vec(); + let dist_columns = sink + .sink_catalog + .distribution_key + .iter() + .map(|idx| columns[*idx].column_desc.clone()) + .collect_vec(); + ( + columns, + pk_columns, + dist_columns, + vec![], + sink.sink_catalog.name.clone(), + None, + ) + } else { + return Err(not_found_err.into()); + }; // Convert all column descs to rows let mut rows = columns diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 81577626a090f..68f0653a3588c 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -173,6 +173,7 @@ fn gen_batch_query_plan( let mut logical = planner.plan(bound)?; let schema = logical.schema(); let batch_plan = logical.gen_batch_plan()?; + let dependent_relations = RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.clone()); @@ -199,7 +200,6 @@ fn gen_batch_query_plan( QueryMode::Local => logical.gen_batch_local_plan(batch_plan)?, QueryMode::Distributed => logical.gen_batch_distributed_plan(batch_plan)?, }; - println!("batch_plan {:?}",physical); Ok(BatchQueryPlanResult { plan: physical, @@ -468,7 +468,6 @@ async fn distribute_execute( query: Query, can_timeout_cancel: bool, ) -> Result { - println!("distribute_execute,{:?}",query); let timeout = if cfg!(madsim) { None } else if can_timeout_cancel { diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index aaa5c0e72b62f..dfeeb0965ad64 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -43,7 +43,7 @@ pub fn get_columns_from_table( table_name: ObjectName, ) -> Result> { let mut binder = Binder::new_for_system(session); - let relation = binder.bind_relation_by_name(table_name.clone(), None, None, false)?; + let relation = binder.bind_relation_by_name(table_name.clone(), None, None)?; let column_catalogs = match relation { Relation::Source(s) => s.catalog.columns, Relation::BaseTable(t) => t.table_catalog.columns.clone(), @@ -89,7 +89,7 @@ pub fn get_indexes_from_table( table_name: ObjectName, ) -> Result>> { let mut binder = Binder::new_for_system(session); - let relation = binder.bind_relation_by_name(table_name.clone(), None, None, false)?; + let relation = binder.bind_relation_by_name(table_name.clone(), None, None)?; let indexes = match relation { Relation::BaseTable(t) => t.table_indexes, _ => { diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index fc83118d6dd48..011b078958946 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -201,7 +201,6 @@ pub fn gen_query_from_table_name(from_name: ObjectName) -> Query { name: from_name, alias: None, as_of: None, - query_log: false, }; let from = vec![TableWithJoins { relation: table_factor, @@ -228,7 +227,6 @@ pub fn gen_query_from_logstore_ge_rw_timestamp(logstore_name: &str, rw_timestamp name: ObjectName(vec![logstore_name.into()]), alias: None, as_of: None, - query_log: true, }; let from = vec![TableWithJoins { relation: table_factor, @@ -245,9 +243,8 @@ pub fn gen_query_from_logstore_ge_rw_timestamp(logstore_name: &str, rw_timestamp ]; let select = Select { from, - // projection: vec![SelectItem::Wildcard(Some(except_columns))], - projection: vec![SelectItem::Wildcard(None)], - // selection, + projection: vec![SelectItem::Wildcard(Some(except_columns))], + selection, ..Default::default() }; let order_by = vec![OrderByExpr { @@ -259,7 +256,7 @@ pub fn gen_query_from_logstore_ge_rw_timestamp(logstore_name: &str, rw_timestamp Query { with: None, body, - order_by: vec![], + order_by, limit: None, offset: None, fetch: None, diff --git a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs index 2278b55bbc744..89cb162cbd65f 100644 --- a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs @@ -12,21 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Bound; - -use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::types::ScalarImpl; -use risingwave_common::util::scan_range::{is_full_range, ScanRange}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::{LogRowSeqScanNode, SysRowSeqScanNode}; -use risingwave_pb::plan_common::PbColumnDesc; +use risingwave_pb::batch_plan::LogRowSeqScanNode; +use risingwave_pb::common::BatchQueryEpoch; use super::batch::prelude::*; use super::utils::{childless_record, Distill}; -use super::{ - generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, TryToBatchPb, -}; +use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch, TryToBatchPb}; use crate::catalog::ColumnId; use crate::error::Result; use crate::expr::{ExprRewriter, ExprVisitor}; @@ -40,50 +33,37 @@ use crate::scheduler::SchedulerResult; pub struct BatchLogSeqScan { pub base: PlanBase, core: generic::LogScan, - scan_ranges: Vec, } impl BatchLogSeqScan { - fn new_inner(core: generic::LogScan, dist: Distribution, scan_ranges: Vec) -> Self { - // let order = if scan_ranges.len() > 1 { - // Order::any() - // } else { - // core.get_out_column_index_order() - // }; + fn new_inner(core: generic::LogScan, dist: Distribution) -> Self { let order = Order::any(); let base = PlanBase::new_batch_with_core(&core, dist, order); - { - // validate scan_range - scan_ranges.iter().for_each(|scan_range| { - assert!(!scan_range.is_full_table_scan()); - let scan_pk_prefix_len = scan_range.eq_conds.len(); - let order_len = core.table_desc.order_column_indices().len(); - assert!( - scan_pk_prefix_len < order_len - || (scan_pk_prefix_len == order_len && is_full_range(&scan_range.range)), - "invalid scan_range", - ); - }) - } - - Self { - base, - core, - scan_ranges, - } + Self { base, core } } - pub fn new(core: generic::LogScan, scan_ranges: Vec) -> Self { + pub fn new(core: generic::LogScan) -> Self { // Use `Single` by default, will be updated later with `clone_with_dist`. - Self::new_inner(core, Distribution::Single, scan_ranges) + Self::new_inner(core, Distribution::Single) } fn clone_with_dist(&self) -> Self { Self::new_inner( self.core.clone(), - Distribution::Single, - self.scan_ranges.clone(), + match self.core.distribution_key() { + None => Distribution::SomeShard, + Some(distribution_key) => { + if distribution_key.is_empty() { + Distribution::Single + } else { + Distribution::UpstreamHashShard( + distribution_key, + self.core.table_desc.table_id, + ) + } + } + }, ) } @@ -92,72 +72,10 @@ impl BatchLogSeqScan { pub fn core(&self) -> &generic::LogScan { &self.core } - - pub fn scan_ranges(&self) -> &[ScanRange] { - &self.scan_ranges - } - - fn scan_ranges_as_strs(&self, verbose: bool) -> Vec { - let order_names = match verbose { - true => self.core.order_names_with_table_prefix(), - false => self.core.order_names(), - }; - let mut range_strs = vec![]; - - let explain_max_range = 20; - for scan_range in self.scan_ranges.iter().take(explain_max_range) { - #[expect(clippy::disallowed_methods)] - let mut range_str = scan_range - .eq_conds - .iter() - .zip(order_names.iter()) - .map(|(v, name)| match v { - Some(v) => format!("{} = {:?}", name, v), - None => format!("{} IS NULL", name), - }) - .collect_vec(); - if !is_full_range(&scan_range.range) { - let i = scan_range.eq_conds.len(); - range_str.push(range_to_string(&order_names[i], &scan_range.range)) - } - range_strs.push(range_str.join(" AND ")); - } - if self.scan_ranges.len() > explain_max_range { - range_strs.push("...".to_string()); - } - range_strs - } } impl_plan_tree_node_for_leaf! { BatchLogSeqScan } -fn lb_to_string(name: &str, lb: &Bound) -> String { - let (op, v) = match lb { - Bound::Included(v) => (">=", v), - Bound::Excluded(v) => (">", v), - Bound::Unbounded => unreachable!(), - }; - format!("{} {} {:?}", name, op, v) -} -fn ub_to_string(name: &str, ub: &Bound) -> String { - let (op, v) = match ub { - Bound::Included(v) => ("<=", v), - Bound::Excluded(v) => ("<", v), - Bound::Unbounded => unreachable!(), - }; - format!("{} {} {:?}", name, op, v) -} -fn range_to_string(name: &str, range: &(Bound, Bound)) -> String { - match (&range.0, &range.1) { - (Bound::Unbounded, Bound::Unbounded) => unreachable!(), - (Bound::Unbounded, ub) => ub_to_string(name, ub), - (lb, Bound::Unbounded) => lb_to_string(name, lb), - (lb, ub) => { - format!("{} AND {}", lb_to_string(name, lb), ub_to_string(name, ub)) - } - } -} - impl Distill for BatchLogSeqScan { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx().is_explain_verbose(); @@ -165,14 +83,6 @@ impl Distill for BatchLogSeqScan { vec.push(("table", Pretty::from(self.core.table_name.clone()))); vec.push(("columns", self.core.columns_pretty(verbose))); - if !self.scan_ranges.is_empty() { - let range_strs = self.scan_ranges_as_strs(verbose); - vec.push(( - "scan_ranges", - Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()), - )); - } - if verbose { let dist = Pretty::display(&DistributionDisplay { distribution: self.distribution(), @@ -201,21 +111,31 @@ impl TryToBatchPb for BatchLogSeqScan { .iter() .map(ColumnId::get_id) .collect(), - scan_ranges: self.scan_ranges.iter().map(|r| r.to_protobuf()).collect(), - // To be filled by the scheduler. vnode_bitmap: None, + old_epoch: Some(BatchQueryEpoch { + epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed( + self.core.old_epoch, + )), + }), + new_epoch: Some(BatchQueryEpoch { + epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed( + self.core.new_epoch, + )), + }), })) } } impl ToLocalBatch for BatchLogSeqScan { fn to_local(&self) -> Result { - Ok(Self::new_inner( - self.core.clone(), - Distribution::Single, - self.scan_ranges.clone(), - ) - .into()) + let dist = if let Some(distribution_key) = self.core.distribution_key() + && !distribution_key.is_empty() + { + Distribution::UpstreamHashShard(distribution_key, self.core.table_desc.table_id) + } else { + Distribution::SomeShard + }; + Ok(Self::new_inner(self.core.clone(), dist).into()) } } @@ -225,9 +145,9 @@ impl ExprRewritable for BatchLogSeqScan { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut core = self.core.clone(); + let core = self.core.clone(); core.rewrite_exprs(r); - Self::new(core, self.scan_ranges.clone()).into() + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index 328c4e5652e32..42650f26b532a 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -41,6 +41,9 @@ pub struct LogScan { #[educe(PartialEq(ignore))] #[educe(Hash(ignore))] pub ctx: OptimizerContextRef, + + pub old_epoch: u64, + pub new_epoch: u64, } impl LogScan { @@ -74,6 +77,20 @@ impl LogScan { .collect() } + pub fn distribution_key(&self) -> Option> { + let tb_idx_to_op_idx = self + .output_col_idx + .iter() + .enumerate() + .map(|(op_idx, tb_idx)| (*tb_idx, op_idx)) + .collect::>(); + self.table_desc + .distribution_key + .iter() + .map(|&tb_idx| tb_idx_to_op_idx.get(&tb_idx).cloned()) + .collect() + } + /// get the Mapping of columnIndex from internal column index to output column index pub fn i2o_col_mapping(&self) -> ColIndexMapping { ColIndexMapping::with_remaining_columns( @@ -94,12 +111,14 @@ impl LogScan { ids } - /// Create a logical scan node for CDC backfill + /// Create a logical scan node for log table scan pub(crate) fn new( table_name: String, output_col_idx: Vec, table_desc: Rc, ctx: OptimizerContextRef, + old_epoch: u64, + new_epoch: u64, ) -> Self { Self { table_name, @@ -107,6 +126,8 @@ impl LogScan { table_desc, chunk_size: None, ctx, + old_epoch, + new_epoch, } } @@ -122,22 +143,6 @@ impl LogScan { ) } - pub(crate) fn order_names(&self) -> Vec { - self.table_desc - .order_column_indices() - .iter() - .map(|&i| self.get_table_columns()[i].name.clone()) - .collect() - } - - pub(crate) fn order_names_with_table_prefix(&self) -> Vec { - self.table_desc - .order_column_indices() - .iter() - .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name)) - .collect() - } - pub(crate) fn get_id_to_op_idx_mapping( output_col_idx: &[usize], table_desc: &Rc, @@ -154,11 +159,8 @@ impl LogScan { } } -// TODO: extend for cdc table impl GenericPlanNode for LogScan { fn schema(&self) -> Schema { - println!("{:?}",self - .output_col_idx); let fields = self .output_col_idx .iter() @@ -167,7 +169,6 @@ impl GenericPlanNode for LogScan { Field::from_with_table_name_prefix(col, &self.table_name) }) .collect(); - println!("{:?}",fields); Schema { fields } } @@ -190,7 +191,6 @@ impl GenericPlanNode for LogScan { fn functional_dependency(&self) -> FunctionalDependencySet { let pk_indices = self.stream_key(); - println!("pk_indices {:?}",pk_indices); let col_num = self.output_col_idx.len(); match &pk_indices { Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices), diff --git a/src/frontend/src/optimizer/plan_node/generic/table_scan.rs b/src/frontend/src/optimizer/plan_node/generic/table_scan.rs index 24547f7c1cc6d..e8ac52cb7fa8e 100644 --- a/src/frontend/src/optimizer/plan_node/generic/table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/table_scan.rs @@ -369,7 +369,6 @@ impl GenericPlanNode for TableScan { fn functional_dependency(&self) -> FunctionalDependencySet { let pk_indices = self.stream_key(); - println!("pk_indices111 {:?}",pk_indices); let col_num = self.output_col_idx.len(); match &pk_indices { Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices), diff --git a/src/frontend/src/optimizer/plan_node/logical_log_scan.rs b/src/frontend/src/optimizer/plan_node/logical_log_scan.rs index 632cf8b581975..d841e77689ca2 100644 --- a/src/frontend/src/optimizer/plan_node/logical_log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_log_scan.rs @@ -16,7 +16,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::{CdcTableDesc, ColumnDesc, TableDesc}; +use risingwave_common::catalog::{ColumnDesc, TableDesc}; use super::batch_log_seq_scan::BatchLogSeqScan; use super::generic::GenericPlanRef; @@ -31,13 +31,14 @@ use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::{ - ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamCdcTableScan, - ToStreamContext, + ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; use crate::optimizer::property::Order; use crate::utils::{ColIndexMapping, Condition}; -/// `LogicalCdcScan` reads rows of a table from an external upstream database +/// `LogicalLogScan` reads logs of a table from table or mv, +/// Note!: Although it currently parses queries, it only supports simple full table queries +/// and does not support orders or specifying pk ranges #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalLogScan { pub base: PlanBase, @@ -62,12 +63,16 @@ impl LogicalLogScan { table_name: String, // explain-only table_desc: Rc, ctx: OptimizerContextRef, + old_epoch: u64, + new_epoch: u64, ) -> Self { generic::LogScan::new( table_name, (0..table_desc.columns.len()).collect(), table_desc, ctx, + old_epoch, + new_epoch, ) .into() } @@ -76,16 +81,14 @@ impl LogicalLogScan { &self.core.table_name } - pub fn cdc_table_desc(&self) -> &TableDesc { + pub fn log_table_desc(&self) -> &TableDesc { self.core.table_desc.as_ref() } - /// Get the descs of the output columns. pub fn column_descs(&self) -> Vec { self.core.column_descs() } - /// Get the ids of the output columns. pub fn output_column_ids(&self) -> Vec { self.core.output_column_ids() } @@ -96,6 +99,8 @@ impl LogicalLogScan { output_col_idx, self.core.table_desc.clone(), self.base.ctx().clone(), + self.core.old_epoch, + self.core.new_epoch, ) .into() } @@ -126,7 +131,7 @@ impl Distill for LogicalLogScan { self.output_col_idx() .iter() .map(|i| { - let col_name = &self.cdc_table_desc().columns[*i].name; + let col_name = &self.log_table_desc().columns[*i].name; Pretty::from(if verbose { format!("{}.{}", self.table_name(), col_name) } else { @@ -138,7 +143,7 @@ impl Distill for LogicalLogScan { )); } - childless_record("LogicalCdcScan", vec) + childless_record("LogicalLogScan", vec) } } @@ -151,7 +156,6 @@ impl ColPrunable for LogicalLogScan { assert!(output_col_idx .iter() .all(|i| self.output_col_idx().contains(i))); - self.clone_with_output_indices(output_col_idx).into() } } @@ -194,8 +198,7 @@ impl ToBatch for LogicalLogScan { } fn to_batch_with_order_required(&self, required_order: &Order) -> Result { - required_order - .enforce_if_not_satisfies(BatchLogSeqScan::new(self.core.clone(), vec![]).into()) + required_order.enforce_if_not_satisfies(BatchLogSeqScan::new(self.core.clone()).into()) } } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 4b464b06995e4..56adca0446cdc 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -79,6 +79,8 @@ impl Planner { log_table.table_catalog.name().to_string(), Rc::new(log_table.table_catalog.table_desc()), self.ctx(), + log_table.old_epoch, + log_table.new_epoch, ) .into()) } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 4c63f516f7c42..59294169220e7 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -997,7 +997,6 @@ impl StageRunner { .into_table() .expect("PartitionInfo should be TablePartitionInfo"); scan_node.vnode_bitmap = Some(partition.vnode_bitmap); - scan_node.scan_ranges = partition.scan_ranges; PlanNodePb { children: vec![], identity, diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 8c55221d0e7c0..fa7fd85735113 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -469,7 +469,6 @@ impl LocalQueryExecution { .into_table() .expect("PartitionInfo should be TablePartitionInfo here"); scan_node.vnode_bitmap = Some(partition.vnode_bitmap); - scan_node.scan_ranges = partition.scan_ranges; } } _ => unreachable!(), diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 79a7187c992d8..06237c5756fba 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1064,9 +1064,7 @@ impl BatchPlanFragmenter { /// If there are multiple scan nodes in this stage, they must have the same distribution, but /// maybe different vnodes partition. We just use the same partition for all the scan nodes. fn collect_stage_table_scan(&self, node: PlanRef) -> SchedulerResult> { - let build_table_scan_info = |name,table_desc:&TableDesc,scan_range| { - // let name = scan_node.core().table_name.to_owned(); - // let table_desc = &*scan_node.core().table_desc; + let build_table_scan_info = |name, table_desc: &TableDesc, scan_range| { let table_catalog = self .catalog_reader .read_guard() @@ -1076,10 +1074,9 @@ impl BatchPlanFragmenter { let vnode_mapping = self .worker_node_manager .fragment_mapping(table_catalog.fragment_id)?; - let partitions = - derive_partitions(scan_range, table_desc, &vnode_mapping)?; + let partitions = derive_partitions(scan_range, table_desc, &vnode_mapping)?; let info = TableScanInfo::new(name, partitions); - return Ok(Some(info)) + Ok(Some(info)) }; if node.node_type() == PlanNodeType::BatchExchange { // Do not visit next stage. @@ -1088,10 +1085,18 @@ impl BatchPlanFragmenter { if let Some(scan_node) = node.as_batch_sys_seq_scan() { let name = scan_node.core().table_name.to_owned(); Ok(Some(TableScanInfo::system_table(name))) - }else if let Some(scan_node) = node.as_batch_log_seq_scan(){ - build_table_scan_info(scan_node.core().table_name.to_owned(),&scan_node.core().table_desc,scan_node.scan_ranges()) - }else if let Some(scan_node) = node.as_batch_seq_scan() { - build_table_scan_info(scan_node.core().table_name.to_owned(),&scan_node.core().table_desc,scan_node.scan_ranges()) + } else if let Some(scan_node) = node.as_batch_log_seq_scan() { + build_table_scan_info( + scan_node.core().table_name.to_owned(), + &scan_node.core().table_desc, + &[], + ) + } else if let Some(scan_node) = node.as_batch_seq_scan() { + build_table_scan_info( + scan_node.core().table_name.to_owned(), + &scan_node.core().table_desc, + scan_node.scan_ranges(), + ) } else { node.inputs() .into_iter() diff --git a/src/meta/src/controller/rename.rs b/src/meta/src/controller/rename.rs index 860981762cde4..520e7cc9e106d 100644 --- a/src/meta/src/controller/rename.rs +++ b/src/meta/src/controller/rename.rs @@ -196,6 +196,7 @@ impl QueryRewriter<'_> { TableFactor::NestedJoin(table_with_joins) => { self.visit_table_with_joins(table_with_joins); } + TableFactor::LogTable { .. } => {} } } diff --git a/src/sqlparser/src/ast/query.rs b/src/sqlparser/src/ast/query.rs index 7bd78b1ded73d..8ce9bfd5b06d6 100644 --- a/src/sqlparser/src/ast/query.rs +++ b/src/sqlparser/src/ast/query.rs @@ -381,7 +381,11 @@ pub enum TableFactor { name: ObjectName, alias: Option, as_of: Option, - query_log: bool, + }, + LogTable { + name: ObjectName, + old_epoch: u64, + new_epoch: u64, }, Derived { lateral: bool, @@ -452,6 +456,15 @@ impl fmt::Display for TableFactor { Ok(()) } TableFactor::NestedJoin(table_reference) => write!(f, "({})", table_reference), + TableFactor::LogTable { + name, + old_epoch, + new_epoch, + } => { + write!(f, "{}", name)?; + write!(f, "Epoch FROM {} TO {}", old_epoch, new_epoch)?; + Ok(()) + } } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index a971b70a0348f..2cc74c8d0a392 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -4982,12 +4982,7 @@ impl Parser { } else { let as_of = self.parse_as_of()?; let alias = self.parse_optional_table_alias(keywords::RESERVED_FOR_TABLE_ALIAS)?; - Ok(TableFactor::Table { - name, - alias, - as_of, - query_log: false, - }) + Ok(TableFactor::Table { name, alias, as_of }) } } } diff --git a/src/sqlparser/src/test_utils.rs b/src/sqlparser/src/test_utils.rs index 42c28d5bbe486..57ff2d0f0efb8 100644 --- a/src/sqlparser/src/test_utils.rs +++ b/src/sqlparser/src/test_utils.rs @@ -140,7 +140,6 @@ pub fn table(name: impl Into) -> TableFactor { name: ObjectName(vec![Ident::new_unchecked(name.into())]), as_of: None, alias: None, - query_log: false, } } diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 79a18330503df..1fbd627e13f02 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -310,7 +310,7 @@ impl EpochWithGap { if risingwave_common::util::epoch::is_max_epoch(epoch) { EpochWithGap::new_max_epoch() } else { - // debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0); + debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0); EpochWithGap(epoch + spill_offset as u64) } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index f9d8af7ae67b5..e6bc3deedd6d5 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -110,6 +110,15 @@ impl std::fmt::Debug for StorageTableInner StorageTableInner { + pub fn new_partial( + store: S, + output_column_ids: Vec, + vnodes: Option>, + table_desc: &StorageTableDesc, + ) -> Self { + Self::new_partial_inner(store, output_column_ids, vnodes, table_desc, vec![]) + } + /// Create a [`StorageTableInner`] given a complete set of `columns` and a partial /// set of `output_column_ids`. /// When reading from the storage table, @@ -120,11 +129,13 @@ impl StorageTableInner { /// from those supplied to associated executors. /// These `output_column_ids` may have `pk` appended, since they will be needed to scan from /// storage. The associated executors may not have these `pk` fields. - pub fn new_partial( + pub fn new_partial_inner( store: S, output_column_ids: Vec, vnodes: Option>, table_desc: &StorageTableDesc, + // Use for log iter's op + excluded_indices: Vec, ) -> Self { let table_id = TableId { table_id: table_desc.table_id, @@ -168,6 +179,7 @@ impl StorageTableInner { distribution, table_option, value_indices, + excluded_indices, prefix_hint_len, versioned, ) @@ -192,6 +204,7 @@ impl StorageTableInner { TableDistribution::singleton(), Default::default(), value_indices, + vec![], 0, false, ) @@ -228,6 +241,7 @@ impl StorageTableInner { distribution: TableDistribution, table_option: TableOption, value_indices: Vec, + excluded_column_ids: Vec, read_prefix_len_hint: usize, versioned: bool, ) -> Self { @@ -235,11 +249,19 @@ impl StorageTableInner { let (output_columns, output_indices) = find_columns_by_ids(&table_columns, &output_column_ids); + let (excluded_columns, excluded_indices) = + find_columns_by_ids(&table_columns, &excluded_column_ids); + if !excluded_columns.is_empty() { + // Now we only exclude the 'op' + assert_eq!(excluded_columns.first().unwrap().name, "op") + } let mut value_output_indices = vec![]; let mut key_output_indices = vec![]; for idx in &output_indices { - if value_indices.contains(idx) { + if excluded_indices.contains(idx) { + continue; + } else if value_indices.contains(idx) { value_output_indices.push(*idx); } else { key_output_indices.push(*idx); @@ -416,8 +438,6 @@ pub trait PkAndRowStream = Stream>> + Send; /// The row iterator of the storage table. /// The wrapper of [`StorageTableInnerIter`] if pk is not persisted. -// pub type StorageTableInnerIter = impl PkAndRowStream; -// pub type StorageTableInnerIter2 = impl PkAndRowStream; #[async_trait::async_trait] impl TableIter for S { @@ -659,7 +679,6 @@ impl StorageTableInner { end_epoch: HummockReadEpoch, pk_prefix: impl Row, range_bounds: impl RangeBounds, - // ordered: bool, ) -> StorageResult>> + Send> { self.iter_log_with_pk_bounds(satrt_epoch, end_epoch, pk_prefix, range_bounds) .await @@ -671,7 +690,6 @@ impl StorageTableInner { end_epoch: HummockReadEpoch, pk_prefix: impl Row, range_bounds: impl RangeBounds, - // ordered: bool, ) -> StorageResult>> + Send> { let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true); let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false); @@ -693,14 +711,9 @@ impl StorageTableInner { let read_options = ReadLogOptions { table_id: self.table_id, }; - let pk_serializer = match self.output_row_in_key_indices.is_empty() { - true => None, - false => Some(Arc::new(self.pk_serializer.clone())), - }; let iter = StorageTableInnerIterLogInner::::new( &self.store, self.mapping.clone(), - pk_serializer, self.row_serde.clone(), table_key_range, read_options, @@ -713,18 +726,13 @@ impl StorageTableInner { })) .await?; - println!("test444"); #[auto_enum(futures03::Stream)] let iter = match iterators.len() { 0 => unreachable!(), 1 => iterators.into_iter().next().unwrap(), // Concat all iterators if not to preserve order. - // _ if !ordered => { - // futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec()) - // .flatten_unordered(1024) - // } - // Merge all iterators if to preserve order. - _ => merge_sort(iterators.into_iter().map(Box::pin).collect()), + _ => futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec()) + .flatten_unordered(1024), }; Ok(iter) @@ -857,7 +865,7 @@ impl StorageTableInnerIterInner { } } -/// [`StorageTableInnerIterInner`] iterates on the storage table. +/// [`StorageTableInnerIterLogInner`] iterates on the storage table. struct StorageTableInnerIterLogInner { /// An iterator that returns raw bytes from storage. iter: S::ChangeLogIter, @@ -865,9 +873,6 @@ struct StorageTableInnerIterLogInner { mapping: Arc, row_deserializer: Arc, - - /// Used for serializing and deserializing the primary key. - pk_serializer: Option>, } impl StorageTableInnerIterLogInner { @@ -876,7 +881,6 @@ impl StorageTableInnerIterLogInner { async fn new( store: &S, mapping: Arc, - pk_serializer: Option>, row_deserializer: Arc, table_key_range: TableKeyRange, read_options: ReadLogOptions, @@ -885,7 +889,7 @@ impl StorageTableInnerIterLogInner { ) -> StorageResult { let raw_satrt_epoch = satrt_epoch.get_epoch(); let raw_end_epoch = end_epoch.get_epoch(); - // store.try_wait_epoch(end_epoch).await?; + store.try_wait_epoch(end_epoch).await?; let iter = store .iter_log( (raw_satrt_epoch, raw_end_epoch), @@ -893,16 +897,16 @@ impl StorageTableInnerIterLogInner { read_options, ) .await?; - // // For `HummockStorage`, a cluster recovery will clear storage data and make subsequent - // // `HummockReadEpoch::Current` read incomplete. - // // `validate_read_epoch` is a safeguard against that incorrect read. It rejects the read - // // result if any recovery has happened after `try_wait_epoch`. - // store.validate_read_epoch(epoch)?; + // For `HummockStorage`, a cluster recovery will clear storage data and make subsequent + // `HummockReadEpoch::Current` read incomplete. + // `validate_read_epoch` is a safeguard against that incorrect read. It rejects the read + // result if any recovery has happened after `try_wait_epoch`. + store.validate_read_epoch(end_epoch)?; + store.validate_read_epoch(satrt_epoch)?; let iter = Self { iter, mapping, row_deserializer, - pk_serializer, }; Ok(iter) } @@ -911,14 +915,15 @@ impl StorageTableInnerIterLogInner { #[try_stream(ok = KeyedRow, error = StorageError)] async fn into_stream(mut self) { let build_value_with_op = |value: &[u8], op: i16| -> Result, StorageError> { - let mut result_row_vec = vec![]; let full_row = self.row_deserializer.deserialize(value)?; let result_row_in_value = self .mapping .project(OwnedRow::new(full_row)) .into_owned_row(); - result_row_vec.push(Some(ScalarImpl::Int16(4))); - result_row_vec.extend(result_row_in_value.into_iter()); + let result_row_vec = result_row_in_value + .into_iter() + .chain(vec![Some(ScalarImpl::Int16(op))]) + .collect_vec(); Ok(result_row_vec) }; diff --git a/src/tests/sqlsmith/src/reducer.rs b/src/tests/sqlsmith/src/reducer.rs index a636c2ec4413e..f4fcf5f524634 100644 --- a/src/tests/sqlsmith/src/reducer.rs +++ b/src/tests/sqlsmith/src/reducer.rs @@ -184,6 +184,7 @@ fn find_ddl_references_for_query_in_table_factor( TableFactor::NestedJoin(table_with_joins) => { find_ddl_references_for_query_in_table_with_joins(table_with_joins, ddl_references); } + TableFactor::LogTable { .. } => unimplemented!(), } } diff --git a/src/tests/sqlsmith/src/sql_gen/relation.rs b/src/tests/sqlsmith/src/sql_gen/relation.rs index f5ed7dbe0591d..6e6db4e40493d 100644 --- a/src/tests/sqlsmith/src/sql_gen/relation.rs +++ b/src/tests/sqlsmith/src/sql_gen/relation.rs @@ -68,7 +68,6 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { columns: vec![], }), as_of: None, - query_log: false, }; table.name = alias; // Rename the table. (table_factor, table) diff --git a/src/tests/sqlsmith/src/sql_gen/utils.rs b/src/tests/sqlsmith/src/sql_gen/utils.rs index 40e3b5cf9c0db..0e36507e86169 100644 --- a/src/tests/sqlsmith/src/sql_gen/utils.rs +++ b/src/tests/sqlsmith/src/sql_gen/utils.rs @@ -75,7 +75,6 @@ pub(crate) fn create_table_factor_from_table(table: &Table) -> TableFactor { name: ObjectName(vec![Ident::new_unchecked(&table.name)]), alias: None, as_of: None, - query_log: false, } }