diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index 1386cc7db1cec..ae8a8e1ae6a9f 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -16,10 +16,13 @@ use std::collections::HashMap; use fixedbitset::FixedBitSet; use itertools::Itertools; +use risingwave_pb::catalog::Table; use risingwave_pb::common::PbColumnOrder; use risingwave_pb::plan_common::StorageTableDesc; use super::{ColumnDesc, ColumnId, TableId}; +use crate::catalog::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; +use crate::catalog::TableOption; use crate::util::sort_util::ColumnOrder; /// Includes necessary information for compute node to access data of the table. @@ -136,4 +139,28 @@ impl TableDesc { }); id_to_idx } + + pub fn from_pb_table(table: &Table) -> Self { + let table_options = TableOption::build_table_option(&table.properties); + Self { + table_id: TableId::new(table.id), + pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(), + columns: table + .columns + .iter() + .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap())) + .collect(), + distribution_key: table.distribution_key.iter().map(|i| *i as _).collect(), + stream_key: table.stream_key.iter().map(|i| *i as _).collect(), + vnode_col_index: table.vnode_col_index.map(|i| i as _), + append_only: table.append_only, + retention_seconds: table_options + .retention_seconds + .unwrap_or(TABLE_OPTION_DUMMY_RETENTION_SECOND), + value_indices: table.value_indices.iter().map(|i| *i as _).collect(), + read_prefix_len_hint: table.read_prefix_len_hint as _, + watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(), + versioned: table.version.is_some(), + } + } } diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 19b8975f775af..9bc49a9372ac0 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -18,7 +18,7 @@ use parse_display::Display; use crate::array::{Array, ArrayImpl, DataChunk}; use crate::hash::Crc32HashCode; use crate::row::{Row, RowExt}; -use crate::types::{DataType, DatumRef, ScalarRefImpl}; +use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl}; use crate::util::hash_util::Crc32FastBuilder; use crate::util::row_id::extract_vnode_id_from_row_id; @@ -96,6 +96,10 @@ impl VirtualNode { self.0 as _ } + pub const fn to_datum(self) -> Datum { + Some(ScalarImpl::Int16(self.to_scalar())) + } + /// Creates a virtual node from the given big-endian bytes representation. pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { let inner = VirtualNodeInner::from_be_bytes(bytes); diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 922f0c22060e2..0c55bf79ffa4b 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -335,6 +335,10 @@ impl StorageTableInner { pub fn table_id(&self) -> TableId { self.table_id } + + pub fn vnodes(&self) -> &Arc { + self.distribution.vnodes() + } } /// Point get impl StorageTableInner { diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index f2822ca88022f..105a2f1f18604 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -115,11 +115,15 @@ impl TableDistribution { Self::singleton_vnode_bitmap_ref().clone() } - pub fn all_vnodes() -> Arc { + pub fn all_vnodes_ref() -> &'static Arc { /// A bitmap that all vnodes are set. static ALL_VNODES: LazyLock> = LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into()); - ALL_VNODES.clone() + &ALL_VNODES + } + + pub fn all_vnodes() -> Arc { + Self::all_vnodes_ref().clone() } /// Distribution that accesses all vnodes, mainly used for tests. @@ -272,10 +276,9 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu vnode } -pub fn get_vnode_from_row(row: impl Row, index: usize, _vnodes: &Bitmap) -> VirtualNode { +pub fn get_vnode_from_row(row: impl Row, index: usize, vnodes: &Bitmap) -> VirtualNode { let vnode = VirtualNode::from_datum(row.datum_at(index)); - // TODO: enable this check when `WatermarkFilterExecutor` use `StorageTable` to read global max watermark - // check_vnode_is_set(vnode, vnodes); + check_vnode_is_set(vnode, vnodes); tracing::debug!(target: "events::storage::storage_table", "get vnode from row: {:?} vnode column index {:?} => {}", row, index, vnode); diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 454f69582981b..9be1a94280280 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -13,12 +13,13 @@ // limitations under the License. use std::cmp; +use std::ops::Deref; +use std::sync::Arc; -use futures::future::join_all; +use futures::future::{try_join, try_join_all}; use futures::StreamExt; use futures_async_stream::try_stream; -use itertools::Itertools; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, DefaultOrd, ScalarImpl}; use risingwave_common::{bail, row}; @@ -27,7 +28,10 @@ use risingwave_expr::expr::{ NonStrictExpression, }; use risingwave_expr::Result as ExprResult; +use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::expr::expr_node::Type; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::table::TableDistribution; use risingwave_storage::StateStore; use super::error::StreamExecutorError; @@ -52,6 +56,7 @@ pub struct WatermarkFilterExecutor { /// The column we should generate watermark and filter on. event_time_col_idx: usize, table: StateTable, + global_watermark_table: StorageTable, } impl WatermarkFilterExecutor { @@ -62,6 +67,7 @@ impl WatermarkFilterExecutor { watermark_expr: NonStrictExpression, event_time_col_idx: usize, table: StateTable, + global_watermark_table: StorageTable, ) -> Self { Self { ctx, @@ -70,6 +76,7 @@ impl WatermarkFilterExecutor { watermark_expr, event_time_col_idx, table, + global_watermark_table, } } } @@ -106,6 +113,7 @@ impl WatermarkFilterExecutor { ctx, info, mut table, + mut global_watermark_table, } = *self; let eval_error_report = ActorEvalErrorReport { @@ -126,7 +134,8 @@ impl WatermarkFilterExecutor { yield Message::Barrier(first_barrier); // Initiate and yield the first watermark. - let mut current_watermark = Self::get_global_max_watermark(&table).await?; + let mut current_watermark = + Self::get_global_max_watermark(&table, &global_watermark_table).await?; let mut last_checkpoint_watermark = None; @@ -231,12 +240,19 @@ impl WatermarkFilterExecutor { Message::Barrier(barrier) => { // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) { + let other_vnodes_bitmap = Arc::new( + (!(*vnode_bitmap).clone()) + & TableDistribution::all_vnodes_ref().deref(), + ); + let _ = global_watermark_table.update_vnode_bitmap(other_vnodes_bitmap); let (previous_vnode_bitmap, _cache_may_stale) = table.update_vnode_bitmap(vnode_bitmap.clone()); // Take the global max watermark when scaling happens. if previous_vnode_bitmap != vnode_bitmap { - current_watermark = Self::get_global_max_watermark(&table).await?; + current_watermark = + Self::get_global_max_watermark(&table, &global_watermark_table) + .await?; } } @@ -262,7 +278,8 @@ impl WatermarkFilterExecutor { if idle_input { // Align watermark let global_max_watermark = - Self::get_global_max_watermark(&table).await?; + Self::get_global_max_watermark(&table, &global_watermark_table) + .await?; current_watermark = if let Some(global_max_watermark) = global_max_watermark.clone() @@ -314,29 +331,45 @@ impl WatermarkFilterExecutor { /// If the returned if `Ok(None)`, it means there is no global max watermark. async fn get_global_max_watermark( table: &StateTable, + global_watermark_table: &StorageTable, ) -> StreamExecutorResult> { - let watermark_iter_futures = (0..VirtualNode::COUNT).map(|vnode| async move { - let pk = row::once(Some(ScalarImpl::Int16(vnode as _))); - let watermark_row: Option = table.get_row(pk).await?; - match watermark_row { - Some(row) => { - if row.len() == 1 { - Ok::<_, StreamExecutorError>(row[0].to_owned()) - } else { - bail!("The watermark row should only contains 1 datum"); - } + let epoch = table.epoch(); + let handle_watermark_row = |watermark_row: Option| match watermark_row { + Some(row) => { + if row.len() == 1 { + Ok::<_, StreamExecutorError>(row[0].to_owned()) + } else { + bail!("The watermark row should only contains 1 datum"); } - _ => Ok(None), } + _ => Ok(None), + }; + let global_watermark_iter_futures = + global_watermark_table + .vnodes() + .iter_vnodes() + .map(|vnode| async move { + let pk = row::once(vnode.to_datum()); + let watermark_row: Option = global_watermark_table + .get_row(pk, HummockReadEpoch::NoWait(epoch)) + .await?; + handle_watermark_row(watermark_row) + }); + let local_watermark_iter_futures = table.vnodes().iter_vnodes().map(|vnode| async move { + let pk = row::once(vnode.to_datum()); + let watermark_row: Option = table.get_row(pk).await?; + handle_watermark_row(watermark_row) }); - let watermarks: Vec<_> = join_all(watermark_iter_futures) - .await - .into_iter() - .try_collect()?; + let (global_watermarks, local_watermarks) = try_join( + try_join_all(global_watermark_iter_futures), + try_join_all(local_watermark_iter_futures), + ) + .await?; // Return the minimal value if the remote max watermark is Null. - let watermark = watermarks + let watermark = global_watermarks .into_iter() + .chain(local_watermarks.into_iter()) .flatten() .max_by(DefaultOrd::default_cmp); @@ -346,11 +379,15 @@ impl WatermarkFilterExecutor { #[cfg(test)] mod tests { + use itertools::Itertools; use risingwave_common::array::StreamChunk; - use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; + use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableDesc}; use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::Date; use risingwave_common::util::sort_util::OrderType; + use risingwave_pb::catalog::Table; + use risingwave_pb::common::ColumnOrder; + use risingwave_pb::plan_common::PbColumnCatalog; use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::table::TableDistribution; @@ -368,24 +405,54 @@ mod tests { pk_indices: &[usize], val_indices: &[usize], table_id: u32, - ) -> StateTable { - let column_descs = data_types - .iter() - .enumerate() - .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone())) - .collect_vec(); + ) -> (StorageTable, StateTable) { + let table = Table { + id: table_id, + columns: data_types + .iter() + .enumerate() + .map(|(id, data_type)| PbColumnCatalog { + column_desc: Some( + ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()) + .to_protobuf(), + ), + is_hidden: false, + }) + .collect(), + pk: pk_indices + .iter() + .zip_eq(order_types.iter()) + .map(|(pk, order)| ColumnOrder { + column_index: *pk as _, + order_type: Some(order.to_protobuf()), + }) + .collect(), + distribution_key: vec![], + stream_key: vec![0], + append_only: false, + vnode_col_index: Some(0), + value_indices: val_indices.iter().map(|i| *i as _).collect(), + read_prefix_len_hint: 0, + ..Default::default() + }; // TODO: use consistent operations for watermark filter after we have upsert. - StateTable::new_with_distribution_inconsistent_op( - mem_state, - TableId::new(table_id), - column_descs, - order_types.to_vec(), - pk_indices.to_vec(), - TableDistribution::all(vec![0]), - Some(val_indices.to_vec()), + let state_table = StateTable::from_table_catalog_inconsistent_op( + &table, + mem_state.clone(), + Some(TableDistribution::all_vnodes()), ) - .await + .await; + + let desc = TableDesc::from_pb_table(&table).to_protobuf(); + + let storage_table = StorageTable::new_partial( + mem_state, + val_indices.iter().map(|i| ColumnId::new(*i as _)).collect(), + Some(TableDistribution::all_vnodes()), + &desc, + ); + (storage_table, state_table) } async fn create_watermark_filter_executor( @@ -400,7 +467,7 @@ mod tests { let watermark_expr = build_from_pretty("(subtract:timestamp $1:timestamp 1day:interval)"); - let table = create_in_memory_state_table( + let (storage_table, table) = create_in_memory_state_table( mem_state, &[DataType::Int16, WATERMARK_TYPE], &[OrderType::ascending()], @@ -426,6 +493,7 @@ mod tests { watermark_expr, 1, table, + storage_table, ) .boxed(), tx, diff --git a/src/stream/src/from_proto/watermark_filter.rs b/src/stream/src/from_proto/watermark_filter.rs index ed44a90480a6d..0956fdd9c3d5a 100644 --- a/src/stream/src/from_proto/watermark_filter.rs +++ b/src/stream/src/from_proto/watermark_filter.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Deref; use std::sync::Arc; +use risingwave_common::catalog::{ColumnId, TableDesc}; use risingwave_expr::expr::build_non_strict_from_prost; use risingwave_pb::stream_plan::WatermarkFilterNode; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::table::TableDistribution; use super::*; use crate::common::table::state_table::StateTable; @@ -46,6 +50,17 @@ impl ExecutorBuilder for WatermarkFilterBuilder { // TODO: may use consistent op for watermark filter after we have upsert. let [table]: [_; 1] = node.get_tables().clone().try_into().unwrap(); + let desc = TableDesc::from_pb_table(&table).to_protobuf(); + let column_ids = desc + .value_indices + .iter() + .map(|i| ColumnId::new(*i as _)) + .collect_vec(); + let other_vnodes = + Arc::new((!(*vnodes).clone()) & TableDistribution::all_vnodes_ref().deref()); + let global_watermark_table = + StorageTable::new_partial(store.clone(), column_ids, Some(other_vnodes), &desc); + let table = StateTable::from_table_catalog_inconsistent_op(&table, store, Some(vnodes)).await; @@ -56,6 +71,7 @@ impl ExecutorBuilder for WatermarkFilterBuilder { watermark_expr, event_time_col_idx, table, + global_watermark_table, ) .boxed()) }