diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index fa35f71975063..d97cec4d17c69 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -427,6 +427,10 @@ impl LocalStateStore for LocalHummockStorage { }) .expect("should be able to send") } + + fn is_committed(&self) -> bool { + self.read_version.read().staging().is_empty() + } } impl LocalHummockStorage { diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 9b47935fa282e..6d7a6449a175c 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -192,6 +192,10 @@ impl StagingVersion { }); (overlapped_imms, overlapped_ssts) } + + pub fn is_empty(&self) -> bool { + self.imm.is_empty() && self.sst.is_empty() + } } #[derive(Clone)] diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 68893c317754f..ff77421554c86 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -653,6 +653,11 @@ impl LocalStateStore for MemtableLocalState async fn try_flush(&mut self) -> StorageResult<()> { Ok(()) } + + // There is no staging data for memtable state store + fn is_committed(&self) -> bool { + true + } } #[cfg(test)] diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 1bea7f6742c68..8585f4dc06e0b 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -286,6 +286,10 @@ impl LocalStateStore for MonitoredStateStore { .try_flush() .verbose_instrument_await("store_try_flush") } + + fn is_committed(&self) -> bool { + self.inner.is_committed() + } } impl StateStore for MonitoredStateStore { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 8cf96a231ead0..70369d7341e47 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -219,6 +219,11 @@ impl LocalStateStore for TracedStateStore { span.may_send_result(OperationResult::TryFlush(res.as_ref().map(|o| *o).into())); res } + + // TODO: is it necessary to introduce a trace span for this call? + fn is_committed(&self) -> bool { + self.inner.is_committed() + } } impl StateStore for TracedStateStore { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 5299cac9fe085..938febcedbae5 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -134,6 +134,10 @@ impl LocalStateStore for PanicStateStore { async fn try_flush(&mut self) -> StorageResult<()> { panic!("should not operate on the panic state store!"); } + + fn is_committed(&self) -> bool { + panic!("should not operate on the panic state store!"); + } } impl StateStore for PanicStateStore { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 7c8353dc0f30f..90cd4659545e7 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -241,8 +241,12 @@ pub trait LocalStateStore: StaticSendSync { fn try_flush(&mut self) -> impl Future> + Send + '_; fn epoch(&self) -> u64; + /// Checks whether there is any unflushed data in local state store. fn is_dirty(&self) -> bool; + /// Checks whether there is any flushed but uncommitted data in local state store. + fn is_committed(&self) -> bool; + /// Initializes the state store with given `epoch` pair. /// Typically we will use `epoch.curr` as the initialized epoch, /// Since state table will begin as empty. diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index f1316fe7e20c8..d53760a2a5206 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -459,6 +459,14 @@ pub mod verify { } ret } + + fn is_committed(&self) -> bool { + let ret = self.actual.is_committed(); + if let Some(expected) = &self.expected { + assert_eq!(ret, expected.is_committed()); + } + ret + } } impl StateStore for VerifyStateStore { @@ -781,6 +789,8 @@ pub mod boxed_state_store { async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>; fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); + + fn is_committed(&self) -> bool; } #[async_trait::async_trait] @@ -845,6 +855,10 @@ pub mod boxed_state_store { fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { self.seal_current_epoch(next_epoch, opts) } + + fn is_committed(&self) -> bool { + self.is_committed() + } } pub type BoxDynamicDispatchedLocalStateStore = Box; @@ -905,6 +919,10 @@ pub mod boxed_state_store { self.deref().is_dirty() } + fn is_committed(&self) -> bool { + self.deref().is_committed() + } + fn init( &mut self, options: InitOptions, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 81ab622d3e61e..519e8d9f61ac4 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -780,6 +780,10 @@ where !self.is_dirty(), "vnode bitmap should only be updated when state table is clean" ); + assert!( + self.local_store.is_committed(), + "vnode bitmap should only be updated when data in state table are all committed" + ); if self.distribution.is_singleton() { assert_eq!( &new_vnodes,