diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 0a96f45de80a5..f5b27d293bb3d 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -21,6 +21,7 @@ use await_tree::InstrumentAwait; use bytes::Bytes; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; @@ -346,6 +347,10 @@ impl LocalStateStore for LocalHummockStorage { .await } + fn get_table_watermark(&self, vnode: VirtualNode) -> Option { + self.read_version.read().latest_watermark(vnode) + } + fn insert( &mut self, key: TableKey, diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 7db6e1edf5e99..51f52be6d669e 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -410,6 +410,12 @@ impl HummockReadVersion { } } + pub fn latest_watermark(&self, vnode: VirtualNode) -> Option { + self.table_watermarks + .as_ref() + .and_then(|watermark_index| watermark_index.latest_watermark(vnode)) + } + pub fn is_replicated(&self) -> bool { self.is_replicated } diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index bd4f13b8291d2..3561010b8a328 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -26,7 +26,7 @@ use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; -use risingwave_common::hash::VnodeBitmapExt; +use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common_estimate_size::{EstimateSize, KvSize}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange}; use risingwave_hummock_sdk::table_watermark::WatermarkDirection; @@ -761,6 +761,11 @@ impl LocalStateStore for MemtableLocalState fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { std::mem::replace(&mut self.vnodes, vnodes) } + + fn get_table_watermark(&self, _vnode: VirtualNode) -> Option { + // TODO: may store the written table watermark and have a correct implementation + None + } } #[cfg(test)] diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 2dcb2fb30e7b4..8c00435541d2c 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -21,6 +21,7 @@ use bytes::Bytes; use futures::{Future, TryFutureExt}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; +use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use thiserror_ext::AsReport; @@ -285,6 +286,10 @@ impl LocalStateStore for MonitoredStateStore { fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { self.inner.update_vnode_bitmap(vnodes) } + + fn get_table_watermark(&self, vnode: VirtualNode) -> Option { + self.inner.get_table_watermark(vnode) + } } impl StateStore for MonitoredStateStore { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index bdd9ce90406fe..b31c8fd0d73e8 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -19,6 +19,7 @@ use bytes::Bytes; use futures::{Future, FutureExt}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; +use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_trace::{ @@ -232,6 +233,10 @@ impl LocalStateStore for TracedStateStore { fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { self.inner.update_vnode_bitmap(vnodes) } + + fn get_table_watermark(&self, vnode: VirtualNode) -> Option { + self.inner.get_table_watermark(vnode) + } } impl StateStore for TracedStateStore { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 7ec0249a3427f..a9e10c6553c54 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use bytes::Bytes; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; +use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -162,6 +163,10 @@ impl LocalStateStore for PanicStateStore { fn update_vnode_bitmap(&mut self, _vnodes: Arc) -> Arc { panic!("should not operate on the panic state store!"); } + + fn get_table_watermark(&self, _vnode: VirtualNode) -> Option { + panic!("should not operate on the panic state store!"); + } } impl StateStore for PanicStateStore { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 5653927891bd8..9c98271022c5b 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -383,6 +383,8 @@ pub trait LocalStateStore: StaticSendSync { read_options: ReadOptions, ) -> impl Future>> + Send + '_; + fn get_table_watermark(&self, vnode: VirtualNode) -> Option; + /// Inserts a key-value entry associated with a given `epoch` into the state store. fn insert( &mut self, diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 20a146ce04f10..7e1c247586241 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -216,6 +216,7 @@ pub mod verify { use bytes::Bytes; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; + use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use tracing::log::warn; @@ -537,6 +538,14 @@ pub mod verify { } ret } + + fn get_table_watermark(&self, vnode: VirtualNode) -> Option { + let ret = self.actual.get_table_watermark(vnode); + if let Some(expected) = &self.expected { + assert_eq!(ret, expected.get_table_watermark(vnode)); + } + ret + } } impl StateStore for VerifyStateStore { @@ -827,6 +836,7 @@ pub mod boxed_state_store { use futures::FutureExt; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; + use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; @@ -979,6 +989,8 @@ pub mod boxed_state_store { fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc; + + fn get_table_watermark(&self, vnode: VirtualNode) -> Option; } #[async_trait::async_trait] @@ -1047,6 +1059,10 @@ pub mod boxed_state_store { fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { self.update_vnode_bitmap(vnodes) } + + fn get_table_watermark(&self, vnode: VirtualNode) -> Option { + self.get_table_watermark(vnode) + } } pub type BoxDynamicDispatchedLocalStateStore = Box; @@ -1079,6 +1095,10 @@ pub mod boxed_state_store { self.deref().rev_iter(key_range, read_options) } + fn get_table_watermark(&self, vnode: VirtualNode) -> Option { + self.deref().get_table_watermark(vnode) + } + fn insert( &mut self, key: TableKey,