From 8d7a2ac9922847db12faf7e90128b74bb9e26668 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Wed, 26 Jun 2024 15:37:03 +0800 Subject: [PATCH] refactor(storage): pass table ids to state store sync (#17332) --- proto/stream_service.proto | 1 + src/meta/src/barrier/rpc.rs | 6 + .../src/bin/replay/replay_impl.rs | 5 +- .../hummock_test/src/compactor_tests.rs | 4 + .../hummock_test/src/snapshot_tests.rs | 13 +- src/storage/hummock_test/src/test_utils.rs | 28 +-- src/storage/hummock_trace/src/collector.rs | 16 +- src/storage/hummock_trace/src/record.rs | 2 +- src/storage/hummock_trace/src/replay/mod.rs | 4 +- .../hummock_trace/src/replay/runner.rs | 6 +- .../hummock_trace/src/replay/worker.rs | 4 +- .../event_handler/hummock_event_handler.rs | 11 +- src/storage/src/hummock/event_handler/mod.rs | 6 +- .../src/hummock/event_handler/uploader.rs | 54 ++++-- .../src/hummock/store/hummock_storage.rs | 15 +- src/storage/src/memory.rs | 4 +- src/storage/src/monitor/monitored_store.rs | 8 +- src/storage/src/monitor/traced_store.rs | 8 +- src/storage/src/panic_store.rs | 4 +- src/storage/src/store.rs | 3 +- src/storage/src/store_impl.rs | 30 +++- .../common/log_store_impl/kv_log_store/mod.rs | 5 +- src/stream/src/task/barrier_manager.rs | 9 +- .../src/task/barrier_manager/managed_state.rs | 170 +++++++++--------- src/stream/src/task/barrier_manager/tests.rs | 2 + 25 files changed, 261 insertions(+), 157 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 99c3f68fa34bf..85b12d8ed5fa1 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -57,6 +57,7 @@ message InjectBarrierRequest { stream_plan.Barrier barrier = 2; repeated uint32 actor_ids_to_send = 3; repeated uint32 actor_ids_to_collect = 4; + repeated uint32 table_ids_to_sync = 5; } message BarrierCompleteResponse { diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 23d2d0b5d3c2a..0a7e6d4e1e950 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -294,6 +294,12 @@ impl ControlStreamManager { barrier: Some(barrier), actor_ids_to_send, actor_ids_to_collect, + table_ids_to_sync: command_context + .info + .existing_table_ids() + .iter() + .map(|table_id| table_id.table_id) + .collect(), }, ), ), diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index 3b7f4bc74740e..d8a2a2f0c24bd 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -17,6 +17,7 @@ use std::ops::Bound; use futures::stream::BoxStream; use futures::{Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; +use risingwave_common::catalog::TableId; use risingwave_common::util::addr::HostAddr; use risingwave_common_service::observer_manager::{Channel, NotificationClient, ObserverError}; use risingwave_hummock_sdk::key::TableKey; @@ -137,10 +138,10 @@ impl ReplayRead for GlobalReplayImpl { #[async_trait::async_trait] impl ReplayStateStore for GlobalReplayImpl { - async fn sync(&self, id: u64) -> Result { + async fn sync(&self, id: u64, table_ids: Vec) -> Result { let result: SyncResult = self .store - .sync(id) + .sync(id, table_ids.into_iter().map(TableId::new).collect()) .await .map_err(|e| TraceError::SyncFailed(format!("{e}")))?; Ok(result.sync_size) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index a10b8963a6d7b..2e2f8123be958 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -722,6 +722,10 @@ pub(crate) mod tests { ) .await; + global_storage + .wait_version(hummock_manager_ref.get_current_version().await) + .await; + let vnode = VirtualNode::from_index(1); for index in 0..kv_count { epoch.inc_epoch(); diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index b85457f309f73..402952dd09680 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -22,17 +22,16 @@ use risingwave_hummock_sdk::key::prefixed_range_with_vnode; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_rpc_client::HummockMetaClient; -use risingwave_storage::hummock::CachePolicy; +use risingwave_storage::hummock::{CachePolicy, HummockStorage}; use risingwave_storage::storage_value::StorageValue; use risingwave_storage::store::{ LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, SealCurrentEpochOptions, - WriteOptions, + StateStoreRead, WriteOptions, }; +use risingwave_storage::StateStore; use crate::local_state_store_test_utils::LocalStateStoreTestExt; -use crate::test_utils::{ - gen_key_from_bytes, with_hummock_storage_v2, HummockStateStoreTestTrait, TestIngestBatch, -}; +use crate::test_utils::{gen_key_from_bytes, with_hummock_storage_v2, TestIngestBatch}; macro_rules! assert_count_range_scan { ($storage:expr, $vnode:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{ @@ -104,7 +103,7 @@ macro_rules! assert_count_backward_range_scan { } async fn test_snapshot_inner( - hummock_storage: impl HummockStateStoreTestTrait, + hummock_storage: HummockStorage, mock_hummock_meta_client: Arc, enable_sync: bool, enable_commit: bool, @@ -235,7 +234,7 @@ async fn test_snapshot_inner( } async fn test_snapshot_range_scan_inner( - hummock_storage: impl HummockStateStoreTestTrait, + hummock_storage: HummockStorage, mock_hummock_meta_client: Arc, enable_sync: bool, enable_commit: bool, diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index 7c6be2eb041ca..5a2034facdbfa 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -21,8 +21,6 @@ use risingwave_common_service::observer_manager::ObserverManager; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::TableKey; pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str}; -#[cfg(test)] -use risingwave_hummock_sdk::SyncResult; use risingwave_meta::hummock::test_utils::{ register_table_ids_to_compaction_group, setup_compute_env, }; @@ -114,24 +112,6 @@ impl TestIngestBatch for S { } } -#[cfg(test)] -#[async_trait::async_trait] -pub(crate) trait HummockStateStoreTestTrait: StateStore { - #[allow(dead_code)] - fn get_pinned_version(&self) -> PinnedVersion; - async fn seal_and_sync_epoch(&self, epoch: u64) -> StorageResult { - self.seal_epoch(epoch, true); - self.sync(epoch).await - } -} - -#[cfg(test)] -impl HummockStateStoreTestTrait for HummockStorage { - fn get_pinned_version(&self) -> PinnedVersion { - self.get_pinned_version() - } -} - pub async fn with_hummock_storage_v2( table_id: TableId, ) -> (HummockStorage, Arc) { @@ -234,6 +214,12 @@ pub struct HummockTestEnv { } impl HummockTestEnv { + async fn wait_version_sync(&self) { + self.storage + .wait_version(self.manager.get_current_version().await) + .await + } + pub async fn register_table_id(&self, table_id: TableId) { register_tables_with_id_for_test( self.storage.filter_key_extractor_manager(), @@ -241,6 +227,7 @@ impl HummockTestEnv { &[table_id.table_id()], ) .await; + self.wait_version_sync().await; } pub async fn register_table(&self, table: PbTable) { @@ -250,6 +237,7 @@ impl HummockTestEnv { &[table], ) .await; + self.wait_version_sync().await; } // Seal, sync and commit a epoch. diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index b1a269a4620ee..c6eb360ad1116 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::env; use std::fs::{create_dir_all, OpenOptions}; use std::io::BufWriter; @@ -23,6 +24,7 @@ use std::sync::LazyLock; use bincode::{Decode, Encode}; use bytes::Bytes; use parking_lot::Mutex; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::meta::SubscribeResponse; use tokio::runtime::Runtime; @@ -286,8 +288,18 @@ impl TraceSpan { ) } - pub fn new_sync_span(epoch: u64, storage_type: StorageType) -> MayTraceSpan { - Self::new_global_op(Operation::Sync(epoch), storage_type) + pub fn new_sync_span( + epoch: u64, + table_ids: &HashSet, + storage_type: StorageType, + ) -> MayTraceSpan { + Self::new_global_op( + Operation::Sync( + epoch, + table_ids.iter().map(|table_id| table_id.table_id).collect(), + ), + storage_type, + ) } pub fn new_seal_span( diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index 916a54a2530c0..4aced4e023d38 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -146,7 +146,7 @@ pub enum Operation { IterNext(RecordId), /// Sync operation of Hummock. - Sync(u64), + Sync(u64, Vec), /// Seal operation of Hummock. Seal(u64, bool), diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index 046ab67b18607..9e8b586640b5f 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -115,7 +115,7 @@ pub trait ReplayWrite { #[cfg_attr(test, automock)] #[async_trait::async_trait] pub trait ReplayStateStore { - async fn sync(&self, id: u64) -> Result; + async fn sync(&self, id: u64, table_ids: Vec) -> Result; fn seal_epoch(&self, epoch_id: u64, is_checkpoint: bool); async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; @@ -146,7 +146,7 @@ mock! { } #[async_trait::async_trait] impl ReplayStateStore for GlobalReplayInterface{ - async fn sync(&self, id: u64) -> Result; + async fn sync(&self, id: u64, table_ids: Vec) -> Result; fn seal_epoch(&self, epoch_id: u64, is_checkpoint: bool); async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64, ) -> Result; diff --git a/src/storage/hummock_trace/src/replay/runner.rs b/src/storage/hummock_trace/src/replay/runner.rs index 2683b0ddcecc1..cb9ddacda1763 100644 --- a/src/storage/hummock_trace/src/replay/runner.rs +++ b/src/storage/hummock_trace/src/replay/runner.rs @@ -199,7 +199,7 @@ mod tests { let mut non_local: Vec> = vec![ (12, Operation::Seal(seal_id, seal_checkpoint)), (12, Operation::Finish), - (13, Operation::Sync(sync_id)), + (13, Operation::Sync(sync_id, vec![1, 2, 3])), ( 13, Operation::Result(OperationResult::Sync(TraceResult::Ok(0))), @@ -247,9 +247,9 @@ mod tests { mock_replay .expect_sync() - .with(predicate::eq(sync_id)) + .with(predicate::eq(sync_id), predicate::eq(vec![1, 2, 3])) .times(1) - .returning(|_| Ok(0)); + .returning(|_, _| Ok(0)); mock_replay .expect_seal_epoch() diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index 2c2cef690b19d..622d7cb833190 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -257,9 +257,9 @@ impl ReplayWorker { panic!("expect iter result, but got {:?}", res); } } - Operation::Sync(epoch_id) => { + Operation::Sync(epoch_id, table_ids) => { assert_eq!(storage_type, StorageType::Global); - let sync_result = replay.sync(epoch_id).await.unwrap(); + let sync_result = replay.sync(epoch_id, table_ids).await.unwrap(); let res = res_rx.recv().await.expect("recv result failed"); if let OperationResult::Sync(expected) = res { assert_eq!(TraceResult::Ok(sync_result), expected, "sync failed"); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 486afc3d38dc6..cbb0044e2f798 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::pin::pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; @@ -466,6 +466,7 @@ impl HummockEventHandler { &mut self, new_sync_epoch: HummockEpoch, sync_result_sender: oneshot::Sender>, + table_ids: HashSet, ) { debug!( "awaiting for epoch to be synced: {}, max_synced_epoch: {}", @@ -473,7 +474,7 @@ impl HummockEventHandler { self.uploader.max_synced_epoch() ); self.uploader - .start_sync_epoch(new_sync_epoch, sync_result_sender); + .start_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); } async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, prev_epoch: u64) { @@ -729,8 +730,9 @@ impl HummockEventHandler { HummockEvent::SyncEpoch { new_sync_epoch, sync_result_sender, + table_ids, } => { - self.handle_sync_epoch(new_sync_epoch, sync_result_sender); + self.handle_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); } HummockEvent::Clear(_, _) => { unreachable!("clear is handled in separated async context") @@ -917,6 +919,7 @@ impl SyncedData { #[cfg(test)] mod tests { + use std::collections::HashSet; use std::future::{poll_fn, Future}; use std::sync::Arc; use std::task::Poll; @@ -1188,12 +1191,14 @@ mod tests { send_event(HummockEvent::SyncEpoch { new_sync_epoch: epoch1, sync_result_sender: tx1, + table_ids: HashSet::from_iter([TEST_TABLE_ID]), }); assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await); let (tx2, mut rx2) = oneshot::channel(); send_event(HummockEvent::SyncEpoch { new_sync_epoch: epoch2, sync_result_sender: tx2, + table_ids: HashSet::from_iter([TEST_TABLE_ID]), }); assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index c88611cd0888a..1d0935c0ac485 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use parking_lot::{RwLock, RwLockReadGuard}; @@ -61,6 +61,7 @@ pub enum HummockEvent { SyncEpoch { new_sync_epoch: HummockEpoch, sync_result_sender: oneshot::Sender>, + table_ids: HashSet, }, /// Clear shared buffer and reset all states @@ -109,7 +110,8 @@ impl HummockEvent { HummockEvent::SyncEpoch { new_sync_epoch, sync_result_sender: _, - } => format!("AwaitSyncEpoch epoch {} ", new_sync_epoch), + table_ids, + } => format!("AwaitSyncEpoch epoch {} {:?}", new_sync_epoch, table_ids), HummockEvent::Clear(_, prev_epoch) => format!("Clear {:?}", prev_epoch), diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index ee0e975e5a3ea..f341937d8795f 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -14,7 +14,7 @@ use std::cmp::Ordering; use std::collections::btree_map::Entry; -use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::fmt::{Debug, Display, Formatter}; use std::future::{poll_fn, Future}; use std::mem::{replace, swap, take}; @@ -730,6 +730,17 @@ impl LocalInstanceUnsyncData { } ret } + + fn assert_after_epoch(&self, epoch: HummockEpoch) { + if let Some(oldest_sealed_data) = self.sealed_data.back() { + assert!(!oldest_sealed_data.imms.is_empty()); + assert_gt!(oldest_sealed_data.epoch, epoch); + } else if let Some(current_data) = &self.current_epoch_data { + if current_data.epoch <= epoch { + assert!(current_data.imms.is_empty() && !current_data.has_spilled); + } + } + } } struct TableUnsyncData { @@ -778,6 +789,17 @@ impl TableUnsyncData { }), ) } + + fn assert_after_epoch(&self, epoch: HummockEpoch) { + self.instance_data + .values() + .for_each(|instance_data| instance_data.assert_after_epoch(epoch)); + if let Some((_, watermarks)) = &self.table_watermarks + && let Some((oldest_epoch, _)) = watermarks.first_key_value() + { + assert_gt!(*oldest_epoch, epoch); + } + } } #[derive(Default)] @@ -874,7 +896,12 @@ impl UnsyncData { } } - fn sync(&mut self, epoch: HummockEpoch, context: &UploaderContext) -> SyncDataBuilder { + fn sync( + &mut self, + epoch: HummockEpoch, + context: &UploaderContext, + table_ids: HashSet, + ) -> SyncDataBuilder { let sync_epoch_data = take_before_epoch(&mut self.epoch_data, epoch); let mut sync_data = SyncDataBuilder::default(); @@ -884,6 +911,10 @@ impl UnsyncData { let mut flush_payload = HashMap::new(); for (table_id, table_data) in &mut self.table_data { + if !table_ids.contains(table_id) { + table_data.assert_after_epoch(epoch); + continue; + } let (unflushed_payload, table_watermarks) = table_data.sync(epoch); for (instance_id, payload) in unflushed_payload { if !payload.is_empty() { @@ -1089,6 +1120,7 @@ impl HummockUploader { &mut self, epoch: HummockEpoch, sync_result_sender: oneshot::Sender>, + table_ids: HashSet, ) { let data = match &mut self.state { UploaderState::Working(data) => data, @@ -1114,7 +1146,7 @@ impl HummockUploader { self.max_syncing_epoch = epoch; - let sync_data = data.unsync_data.sync(epoch, &self.context); + let sync_data = data.unsync_data.sync(epoch, &self.context, table_ids); let SyncDataBuilder { spilled_data: @@ -1394,7 +1426,7 @@ impl HummockUploader { #[cfg(test)] pub(crate) mod tests { - use std::collections::{HashMap, VecDeque}; + use std::collections::{HashMap, HashSet, VecDeque}; use std::future::{poll_fn, Future}; use std::ops::Deref; use std::sync::atomic::AtomicUsize; @@ -1680,7 +1712,7 @@ pub(crate) mod tests { uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); let (sync_tx, sync_rx) = oneshot::channel(); - uploader.start_sync_epoch(epoch1, sync_tx); + uploader.start_sync_epoch(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); assert_eq!(epoch1 as HummockEpoch, uploader.max_syncing_epoch); assert_eq!(1, uploader.data().syncing_data.len()); let syncing_data = uploader.data().syncing_data.front().unwrap(); @@ -1736,7 +1768,7 @@ pub(crate) mod tests { let epoch1 = INITIAL_EPOCH.next_epoch(); let (sync_tx, sync_rx) = oneshot::channel(); - uploader.start_sync_epoch(epoch1, sync_tx); + uploader.start_sync_epoch(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); assert_eq!(epoch1, uploader.max_syncing_epoch); assert_uploader_pending(&mut uploader).await; @@ -1769,7 +1801,7 @@ pub(crate) mod tests { uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm); let (sync_tx, sync_rx) = oneshot::channel(); - uploader.start_sync_epoch(epoch1, sync_tx); + uploader.start_sync_epoch(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); assert_eq!(epoch1, uploader.max_syncing_epoch); assert_uploader_pending(&mut uploader).await; @@ -1834,7 +1866,7 @@ pub(crate) mod tests { assert_eq!(epoch3, uploader.max_syncing_epoch); let (sync_tx, sync_rx) = oneshot::channel(); - uploader.start_sync_epoch(epoch6, sync_tx); + uploader.start_sync_epoch(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID])); assert_eq!(epoch6, uploader.max_syncing_epoch); uploader.update_pinned_version(version4); assert_eq!(epoch4, uploader.max_synced_epoch); @@ -2002,7 +2034,7 @@ pub(crate) mod tests { new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload)); uploader.local_seal_epoch_for_test(instance_id1, epoch1); let (sync_tx1, mut sync_rx1) = oneshot::channel(); - uploader.start_sync_epoch(epoch1, sync_tx1); + uploader.start_sync_epoch(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID])); await_start1_4.await; let epoch3 = epoch2.next_epoch(); @@ -2088,7 +2120,7 @@ pub(crate) mod tests { // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1]) let (sync_tx2, sync_rx2) = oneshot::channel(); - uploader.start_sync_epoch(epoch2, sync_tx2); + uploader.start_sync_epoch(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID])); uploader.local_seal_epoch_for_test(instance_id2, epoch3); let sst = uploader.next_uploaded_sst().await; assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids()); @@ -2117,7 +2149,7 @@ pub(crate) mod tests { let (await_start4_with_3_3, finish_tx4_with_3_3) = new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload)); let (sync_tx4, mut sync_rx4) = oneshot::channel(); - uploader.start_sync_epoch(epoch4, sync_tx4); + uploader.start_sync_epoch(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID])); await_start4_with_3_3.await; // current uploader state: diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index b4a3c55bbf829..11aa643c3659a 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::future::Future; use std::ops::{Bound, Deref}; use std::sync::atomic::{AtomicU64, Ordering as MemOrdering}; @@ -562,12 +563,13 @@ impl StateStore for HummockStorage { wait_for_epoch(&self.version_update_notifier_tx, wait_epoch).await } - fn sync(&self, epoch: u64) -> impl SyncFuture { + fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { let (tx, rx) = oneshot::channel(); self.hummock_event_sender .send(HummockEvent::SyncEpoch { new_sync_epoch: epoch, sync_result_sender: tx, + table_ids, }) .expect("should send success"); rx.map(|recv_result| { @@ -651,7 +653,16 @@ impl HummockStorage { epoch: u64, ) -> StorageResult { self.seal_epoch(epoch, true); - self.sync(epoch).await + let table_ids = self + .pinned_version + .load() + .version() + .state_table_info + .info() + .keys() + .cloned() + .collect(); + self.sync(epoch, table_ids).await } /// Used in the compaction test tool diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 261249d90ff8a..7a71a2309c3ef 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cmp::Ordering; -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque}; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; use std::sync::{Arc, LazyLock}; @@ -741,7 +741,7 @@ impl StateStore for RangeKvStateStore { } #[allow(clippy::unused_async)] - fn sync(&self, _epoch: u64) -> impl SyncFuture { + fn sync(&self, _epoch: u64, _table_ids: HashSet) -> impl SyncFuture { let result = self.inner.flush(); // memory backend doesn't need to push to S3, so this is a no-op async move { diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index a571d47b799a2..95791714ef64a 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::marker::PhantomData; use std::sync::Arc; @@ -319,8 +320,11 @@ impl StateStore for MonitoredStateStore { .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch")) } - fn sync(&self, epoch: u64) -> impl SyncFuture { - let future = self.inner.sync(epoch).instrument_await("store_sync"); + fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { + let future = self + .inner + .sync(epoch, table_ids) + .instrument_await("store_sync"); let timer = self.storage_metrics.sync_duration.start_timer(); let sync_size = self.storage_metrics.sync_size.clone(); async move { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index f22c854808114..d7b9bd66edcdd 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use bytes::Bytes; use futures::{Future, FutureExt}; use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_trace::{ @@ -253,10 +255,10 @@ impl StateStore for TracedStateStore { res } - fn sync(&self, epoch: u64) -> impl SyncFuture { - let span: MayTraceSpan = TraceSpan::new_sync_span(epoch, self.storage_type); + fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { + let span: MayTraceSpan = TraceSpan::new_sync_span(epoch, &table_ids, self.storage_type); - let future = self.inner.sync(epoch); + let future = self.inner.sync(epoch, table_ids); future.map(move |sync_result| { span.may_send_result(OperationResult::Sync( diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 9f3a428442040..831f1f09a1c56 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::marker::PhantomData; use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -180,7 +182,7 @@ impl StateStore for PanicStateStore { } #[allow(clippy::unused_async)] - fn sync(&self, _epoch: u64) -> impl SyncFuture { + fn sync(&self, _epoch: u64, _table_ids: HashSet) -> impl SyncFuture { async { panic!("should not await sync epoch from the panic state store!") } } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index beb19a0a98c5d..9e373d3069bb1 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::cmp::min; +use std::collections::HashSet; use std::default::Default; use std::fmt::{Debug, Formatter}; use std::future::Future; @@ -330,7 +331,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { epoch: HummockReadEpoch, ) -> impl Future> + Send + '_; - fn sync(&self, epoch: u64) -> impl SyncFuture; + fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture; /// update max current epoch in storage. fn seal_epoch(&self, epoch: u64, is_checkpoint: bool); diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 2512f680b536e..3c8284cb55c90 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -206,6 +206,7 @@ macro_rules! dispatch_state_store { #[cfg(any(debug_assertions, test, feature = "test"))] pub mod verify { + use std::collections::HashSet; use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; @@ -214,6 +215,7 @@ pub mod verify { use bytes::Bytes; use risingwave_common::buffer::Bitmap; + use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use tracing::log::warn; @@ -558,9 +560,12 @@ pub mod verify { self.actual.try_wait_epoch(epoch) } - fn sync(&self, epoch: u64) -> impl SyncFuture { - let expected_future = self.expected.as_ref().map(|expected| expected.sync(epoch)); - let actual_future = self.actual.sync(epoch); + fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { + let expected_future = self + .expected + .as_ref() + .map(|expected| expected.sync(epoch, table_ids.clone())); + let actual_future = self.actual.sync(epoch, table_ids); async move { if let Some(expected_future) = expected_future { expected_future.await?; @@ -820,6 +825,7 @@ impl AsHummock for SledStateStore { #[cfg(debug_assertions)] pub mod boxed_state_store { + use std::collections::HashSet; use std::future::Future; use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -829,6 +835,7 @@ pub mod boxed_state_store { use futures::future::BoxFuture; use futures::FutureExt; use risingwave_common::buffer::Bitmap; + use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; @@ -1154,7 +1161,11 @@ pub mod boxed_state_store { pub trait DynamicDispatchedStateStoreExt: StaticSendSync { async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()>; - fn sync(&self, epoch: u64) -> BoxFuture<'static, StorageResult>; + fn sync( + &self, + epoch: u64, + table_ids: HashSet, + ) -> BoxFuture<'static, StorageResult>; fn seal_epoch(&self, epoch: u64, is_checkpoint: bool); @@ -1171,8 +1182,12 @@ pub mod boxed_state_store { self.try_wait_epoch(epoch).await } - fn sync(&self, epoch: u64) -> BoxFuture<'static, StorageResult> { - self.sync(epoch).boxed() + fn sync( + &self, + epoch: u64, + table_ids: HashSet, + ) -> BoxFuture<'static, StorageResult> { + self.sync(epoch, table_ids).boxed() } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { @@ -1268,8 +1283,9 @@ pub mod boxed_state_store { fn sync( &self, epoch: u64, + table_ids: HashSet, ) -> impl Future> + Send + 'static { - self.deref().sync(epoch) + self.deref().sync(epoch, table_ids) } fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 3db9723e2ba14..73dca8b1956bd 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -438,7 +438,7 @@ mod tests { use risingwave_connector::sink::log_store::{ ChunkId, LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset, }; - use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; + use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_test::test_utils::prepare_hummock_test_env; use risingwave_storage::hummock::HummockStorage; use risingwave_storage::StateStore; @@ -507,8 +507,7 @@ mod tests { writer.flush_current_epoch(epoch3, true).await.unwrap(); test_env.storage.seal_epoch(epoch1, false); - test_env.storage.seal_epoch(epoch2, true); - let sync_result: SyncResult = test_env.storage.sync(epoch2).await.unwrap(); + let sync_result = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); assert!(!sync_result.uncommitted_ssts.is_empty()); reader.init().await.unwrap(); diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 8f807de9ff378..1098f505004c4 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -47,6 +47,7 @@ mod progress; mod tests; pub use progress::CreateMviewProgress; +use risingwave_common::catalog::TableId; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult}; @@ -489,6 +490,10 @@ impl LocalBarrierWorker { &barrier, req.actor_ids_to_send.into_iter().collect(), req.actor_ids_to_collect.into_iter().collect(), + req.table_ids_to_sync + .into_iter() + .map(TableId::new) + .collect(), )?; Ok(()) } @@ -666,6 +671,7 @@ impl LocalBarrierWorker { barrier: &Barrier, to_send: HashSet, to_collect: HashSet, + table_ids: HashSet, ) -> StreamResult<()> { #[cfg(not(test))] { @@ -713,7 +719,8 @@ impl LocalBarrierWorker { } } - self.state.transform_to_issued(barrier, to_collect); + self.state + .transform_to_issued(barrier, to_collect, table_ids); for actor_id in to_send { match self.barrier_senders.get(&actor_id) { diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 0519be828f1f8..6c70525aa4b3a 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -22,9 +22,11 @@ use std::sync::Arc; use anyhow::anyhow; use await_tree::InstrumentAwait; +use futures::future::BoxFuture; use futures::stream::FuturesOrdered; use futures::{FutureExt, StreamExt, TryFutureExt}; use prometheus::HistogramTimer; +use risingwave_common::catalog::TableId; use risingwave_common::must_match; use risingwave_hummock_sdk::SyncResult; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -47,6 +49,10 @@ struct IssuedState { pub remaining_actors: BTreeSet, pub barrier_inflight_latency: HistogramTimer, + + pub table_ids: HashSet, + + pub kind: BarrierKind, } impl Debug for IssuedState { @@ -54,6 +60,8 @@ impl Debug for IssuedState { f.debug_struct("IssuedState") .field("mutation", &self.mutation) .field("remaining_actors", &self.remaining_actors) + .field("table_ids", &self.table_ids) + .field("kind", &self.kind) .finish() } } @@ -83,62 +91,33 @@ enum ManagedBarrierStateInner { pub(super) struct BarrierState { curr_epoch: u64, inner: ManagedBarrierStateInner, - kind: BarrierKind, } type AwaitEpochCompletedFuture = impl Future)> + 'static; -fn sync_epoch( - state_store: &StateStoreImpl, +fn sync_epoch( + state_store: &S, streaming_metrics: &StreamingMetrics, prev_epoch: u64, - kind: BarrierKind, -) -> impl Future>> + 'static { - let barrier_sync_latency = streaming_metrics.barrier_sync_latency.clone(); - - let sync_result_future = match kind { - BarrierKind::Unspecified => unreachable!(), - BarrierKind::Initial => { - if let Some(hummock) = state_store.as_hummock() { - let mce = hummock.get_pinned_version().max_committed_epoch(); - assert_eq!( - mce, prev_epoch, - "first epoch should match with the current version", - ); - } - tracing::info!(?prev_epoch, "ignored syncing data for the first barrier"); - None - } - BarrierKind::Barrier => None, - BarrierKind::Checkpoint => { - let timer = barrier_sync_latency.start_timer(); - let sync_result_future = dispatch_state_store!(state_store, store, { - store - .sync(prev_epoch) - .instrument_await(format!("sync_epoch (epoch {})", prev_epoch)) - .inspect_ok(move |_| { - timer.observe_duration(); - }) - .inspect_err(move |e| { - tracing::error!( - prev_epoch, - error = %e.as_report(), - "Failed to sync state store", - ); - }) - .boxed() - }); - Some(sync_result_future) - } - }; - async move { - if let Some(sync_result_future) = sync_result_future { - Ok(Some(sync_result_future.await?)) - } else { - Ok(None) - } - } + table_ids: HashSet, +) -> BoxFuture<'static, StreamResult> { + let timer = streaming_metrics.barrier_sync_latency.start_timer(); + let future = state_store.sync(prev_epoch, table_ids); + future + .instrument_await(format!("sync_epoch (epoch {})", prev_epoch)) + .inspect_ok(move |_| { + timer.observe_duration(); + }) + .map_err(move |e| { + tracing::error!( + prev_epoch, + error = %e.as_report(), + "Failed to sync state store", + ); + e.into() + }) + .boxed() } #[derive(Debug)] @@ -152,13 +131,13 @@ impl Display for ManagedBarrierStateDebugInfo<'_> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut prev_epoch = 0u64; for (epoch, barrier_state) in self.epoch_barrier_state_map { - write!(f, "> Epoch {} ({:?}): ", epoch, barrier_state.kind)?; + write!(f, "> Epoch {}: ", epoch)?; match &barrier_state.inner { ManagedBarrierStateInner::Stashed { .. } => { write!(f, "Stashed")?; } ManagedBarrierStateInner::Issued(state) => { - write!(f, "Issued. Remaining actors: [")?; + write!(f, "Issued [{:?}]. Remaining actors: [", state.kind)?; let mut is_prev_epoch_issued = false; if prev_epoch != 0 { let bs = &self.epoch_barrier_state_map[&prev_epoch]; @@ -279,7 +258,6 @@ impl ManagedBarrierState { inner: ManagedBarrierStateInner::Stashed { mutation_senders: vec![sender], }, - kind: barrier.kind, }); } Entry::Occupied(mut o) => { @@ -332,11 +310,14 @@ impl ManagedBarrierState { ManagedBarrierStateInner::AllCollected, ); - must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState { + let (kind, table_ids) = must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState { barrier_inflight_latency: timer, + kind, + table_ids, .. }) => { timer.observe_duration(); + (kind, table_ids) }); let create_mview_progress = self @@ -358,33 +339,60 @@ impl ManagedBarrierState { }) .collect(); - let kind = barrier_state.kind; - match kind { + let complete_barrier_future = match kind { BarrierKind::Unspecified => unreachable!(), - BarrierKind::Initial => tracing::info!( - epoch = prev_epoch, - "ignore sealing data for the first barrier" - ), - BarrierKind::Barrier | BarrierKind::Checkpoint => { + BarrierKind::Initial => { + tracing::info!( + epoch = prev_epoch, + "ignore sealing data for the first barrier" + ); + if let Some(hummock) = self.state_store.as_hummock() { + let mce = hummock.get_pinned_version().max_committed_epoch(); + assert_eq!( + mce, prev_epoch, + "first epoch should match with the current version", + ); + } + tracing::info!(?prev_epoch, "ignored syncing data for the first barrier"); + None + } + BarrierKind::Barrier => { dispatch_state_store!(&self.state_store, state_store, { state_store.seal_epoch(prev_epoch, kind.is_checkpoint()); }); + None } - } + BarrierKind::Checkpoint => { + dispatch_state_store!(&self.state_store, state_store, { + state_store.seal_epoch(prev_epoch, kind.is_checkpoint()); + Some(sync_epoch( + state_store, + &self.streaming_metrics, + prev_epoch, + table_ids, + )) + }) + } + }; self.await_epoch_completed_futures.push_back({ - let future = - sync_epoch(&self.state_store, &self.streaming_metrics, prev_epoch, kind).map( - move |result| { - ( - prev_epoch, - result.map(move |sync_result| BarrierCompleteResult { - sync_result, - create_mview_progress, - }), - ) - }, - ); + let future = async move { + if let Some(future) = complete_barrier_future { + let result = future.await; + result.map(Some) + } else { + Ok(None) + } + } + .map(move |result| { + ( + prev_epoch, + result.map(|sync_result| BarrierCompleteResult { + sync_result, + create_mview_progress, + }), + ) + }); if let Some(reg) = &self.barrier_await_tree_reg { reg.register( await_tree_key::BarrierAwait { prev_epoch }, @@ -483,6 +491,7 @@ impl ManagedBarrierState { &mut self, barrier: &Barrier, actor_ids_to_collect: HashSet, + table_ids: HashSet, ) { let timer = self .streaming_metrics @@ -516,8 +525,9 @@ impl ManagedBarrierState { remaining_actors: BTreeSet::from_iter(actor_ids_to_collect), mutation: barrier.mutation.clone(), barrier_inflight_latency: timer, + kind: barrier.kind, + table_ids, }), - kind: barrier.kind, }, ); self.may_have_collected_all(barrier.epoch.prev); @@ -601,9 +611,9 @@ mod tests { let actor_ids_to_collect1 = HashSet::from([1, 2]); let actor_ids_to_collect2 = HashSet::from([1, 2]); let actor_ids_to_collect3 = HashSet::from([1, 2, 3]); - managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1); - managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2); - managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3); + managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new()); + managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new()); + managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new()); managed_barrier_state.collect(1, &barrier1); managed_barrier_state.collect(2, &barrier1); assert_eq!( @@ -651,9 +661,9 @@ mod tests { let actor_ids_to_collect1 = HashSet::from([1, 2, 3, 4]); let actor_ids_to_collect2 = HashSet::from([1, 2, 3]); let actor_ids_to_collect3 = HashSet::from([1, 2]); - managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1); - managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2); - managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3); + managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new()); + managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new()); + managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new()); managed_barrier_state.collect(1, &barrier1); managed_barrier_state.collect(1, &barrier2); diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index 5fbedcdfd0dcc..82a08e1d66117 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -87,6 +87,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { barrier: Some(barrier.to_protobuf()), actor_ids_to_send: actor_ids.clone(), actor_ids_to_collect: actor_ids, + table_ids_to_sync: vec![], }, )), })) @@ -193,6 +194,7 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { barrier: Some(barrier.to_protobuf()), actor_ids_to_send, actor_ids_to_collect, + table_ids_to_sync: vec![], }, )), }))