diff --git a/src/storage/hummock_test/src/local_state_store_test_utils.rs b/src/storage/hummock_test/src/local_state_store_test_utils.rs index 59472dec68b56..d9f2a503c39e3 100644 --- a/src/storage/hummock_test/src/local_state_store_test_utils.rs +++ b/src/storage/hummock_test/src/local_state_store_test_utils.rs @@ -13,16 +13,20 @@ // limitations under the License. use std::future::Future; +use std::sync::Arc; +use risingwave_common::buffer::Bitmap; +use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::EpochPair; use risingwave_storage::error::StorageResult; use risingwave_storage::store::{InitOptions, LocalStateStore}; pub trait LocalStateStoreTestExt: LocalStateStore { fn init_for_test(&mut self, epoch: u64) -> impl Future> + Send + '_ { - self.init(InitOptions::new_with_epoch(EpochPair::new_test_epoch( - epoch, - ))) + self.init(InitOptions::new( + EpochPair::new_test_epoch(epoch), + Arc::new(Bitmap::ones(VirtualNode::COUNT)), + )) } } impl LocalStateStoreTestExt for T {} diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 241a8e26769d0..f5b749c83fb51 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -13,10 +13,12 @@ // limitations under the License. use bincode::{Decode, Encode}; +use risingwave_common::buffer::Bitmap; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::EpochPair; use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_pb::common::PbBuffer; use crate::TracedBytes; @@ -58,7 +60,7 @@ impl From for CachePriority { } } -#[derive(Copy, Encode, Decode, PartialEq, Eq, Debug, Clone, Hash)] +#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)] pub struct TracedTableId { pub table_id: u32, } @@ -114,7 +116,7 @@ pub struct TracedWriteOptions { pub table_id: TracedTableId, } -#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)] +#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)] pub struct TracedTableOption { pub retention_seconds: Option, } @@ -135,13 +137,13 @@ impl From for TableOption { } } -#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)] +#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)] pub enum TracedOpConsistencyLevel { Inconsistent, ConsistentOldValue, } -#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)] +#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)] pub struct TracedNewLocalOptions { pub table_id: TracedTableId, pub op_consistency_level: TracedOpConsistencyLevel, @@ -165,7 +167,7 @@ impl TracedNewLocalOptions { pub type TracedHummockEpoch = u64; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)] +#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] pub enum TracedHummockReadEpoch { Committed(TracedHummockEpoch), Current(TracedHummockEpoch), @@ -195,7 +197,7 @@ impl From for HummockReadEpoch { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)] +#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] pub struct TracedEpochPair { pub curr: TracedHummockEpoch, pub prev: TracedHummockEpoch, @@ -219,9 +221,10 @@ impl From for EpochPair { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)] +#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] pub struct TracedInitOptions { pub epoch: TracedEpochPair, + pub vnodes: TracedBitmap, } #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] @@ -230,3 +233,29 @@ pub struct TracedSealCurrentEpochOptions { pub table_watermarks: Option<(bool, Vec>)>, pub switch_op_consistency_level: Option, } + +#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] +pub struct TracedBitmap { + pub compression: i32, + pub body: Vec, +} + +impl From for TracedBitmap { + fn from(value: Bitmap) -> Self { + let pb = value.to_protobuf(); + Self { + compression: pb.compression, + body: pb.body, + } + } +} + +impl From for Bitmap { + fn from(value: TracedBitmap) -> Self { + let pb = PbBuffer { + compression: value.compression, + body: value.body, + }; + Bitmap::from(&pb) + } +} diff --git a/src/storage/hummock_trace/src/replay/runner.rs b/src/storage/hummock_trace/src/replay/runner.rs index 02bfd0277662f..2683b0ddcecc1 100644 --- a/src/storage/hummock_trace/src/replay/runner.rs +++ b/src/storage/hummock_trace/src/replay/runner.rs @@ -104,7 +104,7 @@ mod tests { let storage_type4 = StorageType::Global; let actor_1 = vec![ - (0, Operation::NewLocalStorage(opts1, 1)), + (0, Operation::NewLocalStorage(opts1.clone(), 1)), ( 1, Operation::get( @@ -135,7 +135,7 @@ mod tests { .map(|(record_id, op)| Ok(Record::new(storage_type1, record_id, op))); let actor_2 = vec![ - (4, Operation::NewLocalStorage(opts2, 2)), + (4, Operation::NewLocalStorage(opts2.clone(), 2)), ( 5, Operation::get( @@ -166,7 +166,7 @@ mod tests { .map(|(record_id, op)| Ok(Record::new(storage_type2, record_id, op))); let actor_3 = vec![ - (8, Operation::NewLocalStorage(opts3, 3)), + (8, Operation::NewLocalStorage(opts3.clone(), 3)), ( 9, Operation::get( diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index 4d37708420d48..29e814c3f9ade 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -284,7 +284,7 @@ impl ReplayWorker { } Operation::NewLocalStorage(new_local_opts, id) => { assert_ne!(storage_type, StorageType::Global); - local_storage_opts_map.insert(id, new_local_opts); + local_storage_opts_map.insert(id, new_local_opts.clone()); let local_storage = replay.new_local(new_local_opts).await; local_storages.insert(storage_type, local_storage); } diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index fa35f71975063..b4e4f041b02f3 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use bytes::Bytes; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange}; @@ -386,6 +387,14 @@ impl LocalStateStore for LocalHummockStorage { "local state store of table id {:?} is init for more than once", self.table_id ); + let prev_vnodes = self + .read_version + .write() + .update_vnode_bitmap(options.vnodes); + assert!( + prev_vnodes.is_none(), + "Vnode bitmap should be empty during init" + ); Ok(()) } @@ -427,6 +436,15 @@ impl LocalStateStore for LocalHummockStorage { }) .expect("should be able to send") } + + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + let mut read_version = self.read_version.write(); + assert!(read_version.staging().is_empty(), "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update", + self.table_id(), self.instance_id() + ); + let prev_vnodes = read_version.update_vnode_bitmap(vnodes); + prev_vnodes.expect("Previous vnode bitmap should not be none") + } } impl LocalHummockStorage { diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 9b47935fa282e..513451b848521 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -23,6 +23,7 @@ use await_tree::InstrumentAwait; use bytes::Bytes; use itertools::Itertools; use parking_lot::RwLock; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{ @@ -192,6 +193,10 @@ impl StagingVersion { }); (overlapped_imms, overlapped_ssts) } + + pub fn is_empty(&self) -> bool { + self.imm.is_empty() && self.sst.is_empty() + } } #[derive(Clone)] @@ -212,6 +217,10 @@ pub struct HummockReadVersion { is_replicated: bool, table_watermarks: Option, + + // Vnode bitmap corresponding to the read version + // It will be initialized after local state store init + vnodes: Option>, } impl HummockReadVersion { @@ -238,6 +247,7 @@ impl HummockReadVersion { committed: committed_version, is_replicated, + vnodes: None, } } @@ -499,6 +509,10 @@ impl HummockReadVersion { pub fn is_replicated(&self) -> bool { self.is_replicated } + + pub fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Option> { + self.vnodes.replace(vnodes) + } } pub fn read_filter_for_batch( diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 68893c317754f..0e94071d492a0 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -18,11 +18,13 @@ use std::collections::BTreeMap; use std::future::Future; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::RangeBounds; +use std::sync::Arc; use bytes::Bytes; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::hash::VnodeBitmapExt; @@ -434,6 +436,7 @@ pub struct MemtableLocalStateStore { table_id: TableId, op_consistency_level: OpConsistencyLevel, table_option: TableOption, + vnodes: Option>, } impl MemtableLocalStateStore { @@ -445,6 +448,7 @@ impl MemtableLocalStateStore { table_id: option.table_id, op_consistency_level: option.op_consistency_level, table_option: option.table_option, + vnodes: None, } } @@ -594,9 +598,15 @@ impl LocalStateStore for MemtableLocalState async fn init(&mut self, options: InitOptions) -> StorageResult<()> { assert!( self.epoch.replace(options.epoch.curr).is_none(), - "local state store of table id {:?} is init for more than once", + "epoch in local state store of table id {:?} is init for more than once", self.table_id ); + assert!( + self.vnodes.replace(options.vnodes).is_none(), + "vnodes in local state store of table id {:?} is init for more than once", + self.table_id + ); + Ok(()) } @@ -653,6 +663,10 @@ impl LocalStateStore for MemtableLocalState async fn try_flush(&mut self) -> StorageResult<()> { Ok(()) } + + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + self.vnodes.replace(vnodes).unwrap() + } } #[cfg(test)] diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 1bea7f6742c68..e1110df630595 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -18,6 +18,7 @@ use await_tree::InstrumentAwait; use bytes::Bytes; use futures::{Future, TryFutureExt, TryStreamExt}; use futures_async_stream::try_stream; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -286,6 +287,10 @@ impl LocalStateStore for MonitoredStateStore { .try_flush() .verbose_instrument_await("store_try_flush") } + + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + self.inner.update_vnode_bitmap(vnodes) + } } impl StateStore for MonitoredStateStore { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 8cf96a231ead0..de55143dd7d73 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -11,10 +11,12 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use bytes::Bytes; use futures::{Future, TryFutureExt, TryStreamExt}; use futures_async_stream::try_stream; +use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_trace::{ @@ -219,6 +221,11 @@ impl LocalStateStore for TracedStateStore { span.may_send_result(OperationResult::TryFlush(res.as_ref().map(|o| *o).into())); res } + + // TODO: add trace span + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + self.inner.update_vnode_bitmap(vnodes) + } } impl StateStore for TracedStateStore { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 5299cac9fe085..7e5985bb6aefe 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -14,10 +14,12 @@ use std::ops::Bound; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use bytes::Bytes; use futures::Stream; +use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -134,6 +136,10 @@ impl LocalStateStore for PanicStateStore { async fn try_flush(&mut self) -> StorageResult<()> { panic!("should not operate on the panic state store!"); } + + fn update_vnode_bitmap(&mut self, _vnodes: Arc) -> Arc { + panic!("should not operate on the panic state store!"); + } } impl StateStore for PanicStateStore { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 7c8353dc0f30f..e5b096de47196 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -23,6 +23,7 @@ use bytes::Bytes; use futures::{Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use prost::Message; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::{Epoch, EpochPair}; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; @@ -31,8 +32,8 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::{HummockReadEpoch, LocalSstableInfo}; use risingwave_hummock_trace::{ - TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions, - TracedReadOptions, TracedSealCurrentEpochOptions, TracedWriteOptions, + TracedBitmap, TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, + TracedPrefetchOptions, TracedReadOptions, TracedSealCurrentEpochOptions, TracedWriteOptions, }; use crate::error::{StorageError, StorageResult}; @@ -270,6 +271,10 @@ pub trait LocalStateStore: StaticSendSync { key_range: TableKeyRange, read_options: ReadOptions, ) -> impl Future> + Send + '_; + + // Updates the vnode bitmap corresponding to the local state store + // Returns the previous vnode bitmap + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc; } /// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory @@ -534,17 +539,14 @@ impl NewLocalOptions { #[derive(Clone)] pub struct InitOptions { pub epoch: EpochPair, -} -impl InitOptions { - pub fn new_with_epoch(epoch: EpochPair) -> Self { - Self { epoch } - } + /// The vnode bitmap for the local state store instance + pub vnodes: Arc, } -impl From for InitOptions { - fn from(value: EpochPair) -> Self { - Self { epoch: value } +impl InitOptions { + pub fn new(epoch: EpochPair, vnodes: Arc) -> Self { + Self { epoch, vnodes } } } @@ -552,6 +554,7 @@ impl From for TracedInitOptions { fn from(value: InitOptions) -> Self { TracedInitOptions { epoch: value.epoch.into(), + vnodes: TracedBitmap::from(value.vnodes.as_ref().clone()), } } } @@ -560,6 +563,7 @@ impl From for InitOptions { fn from(value: TracedInitOptions) -> Self { InitOptions { epoch: value.epoch.into(), + vnodes: Arc::new(Bitmap::from(value.vnodes)), } } } diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index f1316fe7e20c8..50fe81d53ed54 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -203,10 +203,12 @@ pub mod verify { use std::fmt::Debug; use std::future::Future; use std::ops::{Bound, Deref}; + use std::sync::Arc; use bytes::Bytes; use futures::{pin_mut, TryStreamExt}; use futures_async_stream::try_stream; + use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use tracing::log::warn; @@ -459,6 +461,14 @@ pub mod verify { } ret } + + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + let ret = self.actual.update_vnode_bitmap(vnodes.clone()); + if let Some(expected) = &mut self.expected { + assert_eq!(ret, expected.update_vnode_bitmap(vnodes)); + } + ret + } } impl StateStore for VerifyStateStore { @@ -683,11 +693,13 @@ impl AsHummock for SledStateStore { pub mod boxed_state_store { use std::future::Future; use std::ops::{Deref, DerefMut}; + use std::sync::Arc; use bytes::Bytes; use dyn_clone::{clone_trait_object, DynClone}; use futures::stream::BoxStream; use futures::StreamExt; + use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -781,6 +793,8 @@ pub mod boxed_state_store { async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>; fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); + + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc; } #[async_trait::async_trait] @@ -845,6 +859,10 @@ pub mod boxed_state_store { fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { self.seal_current_epoch(next_epoch, opts) } + + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + self.update_vnode_bitmap(vnodes) + } } pub type BoxDynamicDispatchedLocalStateStore = Box; @@ -915,6 +933,10 @@ pub mod boxed_state_store { fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { self.deref_mut().seal_current_epoch(next_epoch, opts) } + + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + self.deref_mut().update_vnode_bitmap(vnodes) + } } // For global StateStore diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 5a3f525b0dacd..b4f54d17ed1d1 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -46,7 +46,9 @@ pub struct KvLogStoreWriter { metrics: KvLogStoreMetrics, - is_paused: watch::Sender, + paused_notifier: watch::Sender, + + is_paused: bool, identity: String, } @@ -58,7 +60,7 @@ impl KvLogStoreWriter { serde: LogStoreRowSerde, tx: LogStoreBufferSender, metrics: KvLogStoreMetrics, - is_paused: watch::Sender, + paused_notifier: watch::Sender, identity: String, ) -> Self { Self { @@ -68,8 +70,9 @@ impl KvLogStoreWriter { serde, tx, metrics, - is_paused, + paused_notifier, identity, + is_paused: false, } } } @@ -81,7 +84,7 @@ impl LogWriter for KvLogStoreWriter { pause_read_on_bootstrap: bool, ) -> LogStoreResult<()> { self.state_store - .init(InitOptions::new_with_epoch(epoch)) + .init(InitOptions::new(epoch, self.serde.vnodes().clone())) .await?; if pause_read_on_bootstrap { self.pause()?; @@ -93,6 +96,9 @@ impl LogWriter for KvLogStoreWriter { } async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> { + // No data is expected when the stream is paused. + assert!(!self.is_paused); + if chunk.cardinality() == 0 { return Ok(()); } @@ -133,10 +139,15 @@ impl LogWriter for KvLogStoreWriter { ) -> LogStoreResult<()> { let epoch = self.state_store.epoch(); let mut flush_info = FlushInfo::new(); - for vnode in self.serde.vnodes().iter_vnodes() { - let (key, value) = self.serde.serialize_barrier(epoch, vnode, is_checkpoint); - flush_info.flush_one(key.estimated_size() + value.estimated_size()); - self.state_store.insert(key, value, None)?; + + // When the stream is paused, donot flush barrier to ensure there is no dirty data in state store. + // Besides, barrier on a paused stream is useless in log store because it won't change the log store state. + if !self.is_paused { + for vnode in self.serde.vnodes().iter_vnodes() { + let (key, value) = self.serde.serialize_barrier(epoch, vnode, is_checkpoint); + flush_info.flush_one(key.estimated_size() + value.estimated_size()); + self.state_store.insert(key, value, None)?; + } } self.tx .flush_all_unflushed(|chunk, epoch, start_seq_id, end_seq_id| { @@ -149,7 +160,15 @@ impl LogWriter for KvLogStoreWriter { } Ok(()) })?; + + // No data is expected when the stream is paused. + if self.is_paused { + assert_eq!(flush_info.flush_count, 0); + assert_eq!(flush_info.flush_size, 0); + assert!(!self.state_store.is_dirty()); + } flush_info.report(&self.metrics); + let watermark = self.tx.pop_truncation(epoch).map(|truncation_offset| { VnodeWatermark::new( self.serde.vnodes().clone(), @@ -173,20 +192,25 @@ impl LogWriter for KvLogStoreWriter { async fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> LogStoreResult<()> { self.serde.update_vnode_bitmap(new_vnodes.clone()); + self.state_store.update_vnode_bitmap(new_vnodes.clone()); self.tx.update_vnode(self.state_store.epoch(), new_vnodes); Ok(()) } fn pause(&mut self) -> LogStoreResult<()> { info!("KvLogStore of {} is paused", self.identity); - self.is_paused + assert!(!self.is_paused); + self.is_paused = true; + self.paused_notifier .send(true) .map_err(|_| anyhow!("unable to set pause")) } fn resume(&mut self) -> LogStoreResult<()> { info!("KvLogStore of {} is resumed", self.identity); - self.is_paused + assert!(self.is_paused); + self.is_paused = false; + self.paused_notifier .send(false) .map_err(|_| anyhow!("unable to set resume")) } diff --git a/src/stream/src/common/log_store_impl/subscription_log_store.rs b/src/stream/src/common/log_store_impl/subscription_log_store.rs index 500818d8832a8..999400771a30c 100644 --- a/src/stream/src/common/log_store_impl/subscription_log_store.rs +++ b/src/stream/src/common/log_store_impl/subscription_log_store.rs @@ -61,7 +61,7 @@ impl SubscriptionLogStoreWriter { _pause_read_on_bootstrap: bool, ) -> LogStoreResult<()> { self.state_store - .init(InitOptions::new_with_epoch(epoch)) + .init(InitOptions::new(epoch, self.serde.vnodes().clone())) .await?; self.seq_id = FIRST_SEQ_ID; Ok(()) @@ -116,6 +116,7 @@ impl SubscriptionLogStoreWriter { pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> LogStoreResult<()> { self.serde.update_vnode_bitmap(new_vnodes.clone()); + self.state_store.update_vnode_bitmap(new_vnodes); Ok(()) } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 81ab622d3e61e..68a590cda1cbb 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -178,7 +178,7 @@ where /// as it needs to wait for prev epoch to be committed. pub async fn init_epoch(&mut self, epoch: EpochPair) -> StorageResult<()> { self.local_store - .init(InitOptions::new_with_epoch(epoch)) + .init(InitOptions::new(epoch, self.vnodes().clone())) .await } } @@ -195,7 +195,7 @@ where /// No need to `wait_for_epoch`, so it should complete immediately. pub fn init_epoch(&mut self, epoch: EpochPair) { self.local_store - .init(InitOptions::new_with_epoch(epoch)) + .init(InitOptions::new(epoch, self.vnodes().clone())) .now_or_never() .expect("non-replicated state store should start immediately.") .expect("non-replicated state store should not wait_for_epoch, and fail because of it.") @@ -780,6 +780,13 @@ where !self.is_dirty(), "vnode bitmap should only be updated when state table is clean" ); + let prev_vnodes = self.local_store.update_vnode_bitmap(new_vnodes.clone()); + assert_eq!( + &prev_vnodes, + self.vnodes(), + "state table and state store vnode bitmap mismatches" + ); + if self.distribution.is_singleton() { assert_eq!( &new_vnodes, diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 7928308c3f94a..eb857e92a9d7f 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -201,6 +201,8 @@ impl SinkExecutor { .init(epoch_pair, barrier.is_pause_on_startup()) .await?; + let mut is_paused = false; + // Propagate the first barrier yield Message::Barrier(barrier); @@ -209,21 +211,29 @@ impl SinkExecutor { match msg? { Message::Watermark(w) => yield Message::Watermark(w), Message::Chunk(chunk) => { + assert!(!is_paused, "Should not receive any data after pause"); log_writer.write_chunk(chunk.clone()).await?; yield Message::Chunk(chunk); } Message::Barrier(barrier) => { + log_writer + .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) + .await?; + if let Some(mutation) = barrier.mutation.as_deref() { match mutation { - Mutation::Pause => log_writer.pause()?, - Mutation::Resume => log_writer.resume()?, + Mutation::Pause => { + log_writer.pause()?; + is_paused = true; + } + Mutation::Resume => { + log_writer.resume()?; + is_paused = false; + } _ => (), } } - log_writer - .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) - .await?; if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_id) { log_writer.update_vnode_bitmap(vnode_bitmap).await?; }