Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: ensure there is no staging data in state table on vnode bitmap update #15175

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,10 @@ impl LocalStateStore for LocalHummockStorage {
})
.expect("should be able to send")
}

fn is_committed(&self) -> bool {
self.read_version.read().staging().is_empty()
}
}

impl LocalHummockStorage {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ impl StagingVersion {
});
(overlapped_imms, overlapped_ssts)
}

pub fn is_empty(&self) -> bool {
self.imm.is_empty() && self.sst.is_empty()
}
}

#[derive(Clone)]
Expand Down
5 changes: 5 additions & 0 deletions src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,11 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
async fn try_flush(&mut self) -> StorageResult<()> {
Ok(())
}

// There is no staging data for memtable state store
fn is_committed(&self) -> bool {
true
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
.try_flush()
.verbose_instrument_await("store_try_flush")
}

fn is_committed(&self) -> bool {
self.inner.is_committed()
}
}

impl<S: StateStore> StateStore for MonitoredStateStore<S> {
Expand Down
5 changes: 5 additions & 0 deletions src/storage/src/monitor/traced_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ impl<S: LocalStateStore> LocalStateStore for TracedStateStore<S> {
span.may_send_result(OperationResult::TryFlush(res.as_ref().map(|o| *o).into()));
res
}

// TODO: is it necessary to introduce a trace span for this call?
fn is_committed(&self) -> bool {
self.inner.is_committed()
}
}

impl<S: StateStore> StateStore for TracedStateStore<S> {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/panic_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ impl LocalStateStore for PanicStateStore {
async fn try_flush(&mut self) -> StorageResult<()> {
panic!("should not operate on the panic state store!");
}

fn is_committed(&self) -> bool {
panic!("should not operate on the panic state store!");
}
}

impl StateStore for PanicStateStore {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,12 @@ pub trait LocalStateStore: StaticSendSync {
fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_;
fn epoch(&self) -> u64;

/// Checks whether there is any unflushed data in local state store.
fn is_dirty(&self) -> bool;

/// Checks whether there is any flushed but uncommitted data in local state store.
fn is_committed(&self) -> bool;

/// Initializes the state store with given `epoch` pair.
/// Typically we will use `epoch.curr` as the initialized epoch,
/// Since state table will begin as empty.
Expand Down
18 changes: 18 additions & 0 deletions src/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,14 @@ pub mod verify {
}
ret
}

fn is_committed(&self) -> bool {
let ret = self.actual.is_committed();
if let Some(expected) = &self.expected {
assert_eq!(ret, expected.is_committed());
}
ret
}
}

impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
Expand Down Expand Up @@ -781,6 +789,8 @@ pub mod boxed_state_store {
async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;

fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);

fn is_committed(&self) -> bool;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -845,6 +855,10 @@ pub mod boxed_state_store {
fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
self.seal_current_epoch(next_epoch, opts)
}

fn is_committed(&self) -> bool {
self.is_committed()
}
}

pub type BoxDynamicDispatchedLocalStateStore = Box<dyn DynamicDispatchedLocalStateStore>;
Expand Down Expand Up @@ -905,6 +919,10 @@ pub mod boxed_state_store {
self.deref().is_dirty()
}

fn is_committed(&self) -> bool {
self.deref().is_committed()
}

fn init(
&mut self,
options: InitOptions,
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,10 @@ where
!self.is_dirty(),
"vnode bitmap should only be updated when state table is clean"
);
assert!(
self.local_store.is_committed(),
"vnode bitmap should only be updated when data in state table are all committed"
);
if self.distribution.is_singleton() {
assert_eq!(
&new_vnodes,
Expand Down
Loading