Skip to content

Commit

Permalink
feat(storage): support get_table_watermark for LocalStateStore (#17767)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jul 23, 2024
1 parent 69caf47 commit 489aaea
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use await_tree::InstrumentAwait;
use bytes::Bytes;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
Expand Down Expand Up @@ -346,6 +347,10 @@ impl LocalStateStore for LocalHummockStorage {
.await
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
self.read_version.read().latest_watermark(vnode)
}

fn insert(
&mut self,
key: TableKey<Bytes>,
Expand Down
6 changes: 6 additions & 0 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ impl HummockReadVersion {
}
}

pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
self.table_watermarks
.as_ref()
.and_then(|watermark_index| watermark_index.latest_watermark(vnode))
}

pub fn is_replicated(&self) -> bool {
self.is_replicated
}
Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common_estimate_size::{EstimateSize, KvSize};
use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange};
use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
Expand Down Expand Up @@ -761,6 +761,11 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
std::mem::replace(&mut self.vnodes, vnodes)
}

fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
// TODO: may store the written table watermark and have a correct implementation
None
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use bytes::Bytes;
use futures::{Future, TryFutureExt};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockReadEpoch;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -285,6 +286,10 @@ impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.inner.update_vnode_bitmap(vnodes)
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
self.inner.get_table_watermark(vnode)
}
}

impl<S: StateStore> StateStore for MonitoredStateStore<S> {
Expand Down
5 changes: 5 additions & 0 deletions src/storage/src/monitor/traced_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use bytes::Bytes;
use futures::{Future, FutureExt};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_hummock_trace::{
Expand Down Expand Up @@ -232,6 +233,10 @@ impl<S: LocalStateStore> LocalStateStore for TracedStateStore<S> {
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.inner.update_vnode_bitmap(vnodes)
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
self.inner.get_table_watermark(vnode)
}
}

impl<S: StateStore> StateStore for TracedStateStore<S> {
Expand Down
5 changes: 5 additions & 0 deletions src/storage/src/panic_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use bytes::Bytes;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockReadEpoch;

Expand Down Expand Up @@ -162,6 +163,10 @@ impl LocalStateStore for PanicStateStore {
fn update_vnode_bitmap(&mut self, _vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
panic!("should not operate on the panic state store!");
}

fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
panic!("should not operate on the panic state store!");
}
}

impl StateStore for PanicStateStore {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ pub trait LocalStateStore: StaticSendSync {
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_;

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;

/// Inserts a key-value entry associated with a given `epoch` into the state store.
fn insert(
&mut self,
Expand Down
20 changes: 20 additions & 0 deletions src/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ pub mod verify {
use bytes::Bytes;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockReadEpoch;
use tracing::log::warn;
Expand Down Expand Up @@ -537,6 +538,14 @@ pub mod verify {
}
ret
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
let ret = self.actual.get_table_watermark(vnode);
if let Some(expected) = &self.expected {
assert_eq!(ret, expected.get_table_watermark(vnode));
}
ret
}
}

impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
Expand Down Expand Up @@ -827,6 +836,7 @@ pub mod boxed_state_store {
use futures::FutureExt;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult};

Expand Down Expand Up @@ -979,6 +989,8 @@ pub mod boxed_state_store {
fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap>;

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -1047,6 +1059,10 @@ pub mod boxed_state_store {
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.update_vnode_bitmap(vnodes)
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
self.get_table_watermark(vnode)
}
}

pub type BoxDynamicDispatchedLocalStateStore = Box<dyn DynamicDispatchedLocalStateStore>;
Expand Down Expand Up @@ -1079,6 +1095,10 @@ pub mod boxed_state_store {
self.deref().rev_iter(key_range, read_options)
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
self.deref().get_table_watermark(vnode)
}

fn insert(
&mut self,
key: TableKey<Bytes>,
Expand Down

0 comments on commit 489aaea

Please sign in to comment.