From 47ad484a89d0c236e6a0e0a419ed5a5f81ae3817 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Fri, 19 Apr 2024 16:29:52 +0800 Subject: [PATCH] refactor(storage): pass sync result in commit epoch in test (#16403) --- .../src/hummock/mock_hummock_meta_client.rs | 55 +++++++------------ src/meta/src/stream/stream_manager.rs | 9 +-- src/rpc_client/src/hummock_meta_client.rs | 8 +-- src/rpc_client/src/meta_client.rs | 10 +--- src/storage/hummock_sdk/src/lib.rs | 14 +++++ .../src/bin/replay/replay_impl.rs | 4 +- .../hummock_test/src/compactor_tests.rs | 40 ++++---------- .../hummock_test/src/failpoint_tests.rs | 16 ++---- .../hummock_test/src/hummock_storage_tests.rs | 26 ++++----- .../hummock_test/src/snapshot_tests.rs | 32 +++-------- .../hummock_test/src/state_store_tests.rs | 34 ++++-------- src/storage/hummock_test/src/test_utils.rs | 8 +-- .../event_handler/hummock_event_handler.rs | 3 +- src/storage/src/hummock/event_handler/mod.rs | 4 +- .../src/hummock/hummock_meta_client.rs | 8 +-- .../src/hummock/store/hummock_storage.rs | 2 +- src/storage/src/memory.rs | 2 +- src/storage/src/monitor/monitored_store.rs | 2 +- src/storage/src/monitor/traced_store.rs | 2 +- src/storage/src/panic_store.rs | 2 +- src/storage/src/store.rs | 17 +----- src/storage/src/store_impl.rs | 4 +- .../common/log_store_impl/kv_log_store/mod.rs | 3 +- .../src/common/table/test_state_table.rs | 24 ++------ src/stream/src/task/barrier_manager.rs | 3 +- .../src/task/barrier_manager/managed_state.rs | 2 +- .../src/delete_range_runner.rs | 2 +- 27 files changed, 114 insertions(+), 222 deletions(-) diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 74c760e25776a..9a888e27bc78e 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::SystemTime; @@ -22,13 +21,11 @@ use async_trait::async_trait; use fail::fail_point; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; -use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; -use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, - SstObjectIdRange, + SstObjectIdRange, SyncResult, }; use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::hummock::compact_task::TaskStatus; @@ -93,31 +90,6 @@ impl MockHummockMetaClient { .await .unwrap_or(None) } - - pub async fn commit_epoch_with_watermark( - &self, - epoch: HummockEpoch, - sstables: Vec, - new_table_watermarks: HashMap, - ) -> Result<()> { - let sst_to_worker = sstables - .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), self.context_id)) - .collect(); - self.hummock_manager - .commit_epoch( - epoch, - CommitEpochInfo::new( - sstables.into_iter().map(Into::into).collect(), - new_table_watermarks, - sst_to_worker, - None, - ), - ) - .await - .map_err(mock_err)?; - Ok(()) - } } fn mock_err(error: super::error::Error) -> RpcError { @@ -183,17 +155,28 @@ impl HummockMetaClient for MockHummockMetaClient { }) } - async fn commit_epoch( - &self, - epoch: HummockEpoch, - sstables: Vec, - ) -> Result<()> { - let sst_to_worker = sstables + async fn commit_epoch(&self, epoch: HummockEpoch, sync_result: SyncResult) -> Result<()> { + let sst_to_worker = sync_result + .uncommitted_ssts .iter() .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), self.context_id)) .collect(); + let new_table_watermark = sync_result.table_watermarks; + self.hummock_manager - .commit_epoch(epoch, CommitEpochInfo::for_test(sstables, sst_to_worker)) + .commit_epoch( + epoch, + CommitEpochInfo::new( + sync_result + .uncommitted_ssts + .into_iter() + .map(|sst| sst.into()) + .collect(), + new_table_watermark, + sst_to_worker, + None, + ), + ) .await .map_err(mock_err)?; Ok(()) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 94b97abcdc007..6a801932785c7 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -872,14 +872,7 @@ mod tests { let _ = tx.send(Ok(StreamingControlStreamResponse { response: Some( streaming_control_stream_response::Response::CompleteBarrier( - BarrierCompleteResponse { - request_id: "".to_string(), - status: None, - create_mview_progress: vec![], - synced_sstables: vec![], - worker_id: 0, - table_watermarks: Default::default(), - }, + BarrierCompleteResponse::default(), ), ), })); diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index 573ebfb2698d8..6e1dfec3b7be3 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use futures::stream::BoxStream; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ - HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, SstObjectIdRange, + HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, SyncResult, }; use risingwave_pb::hummock::{ HummockSnapshot, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, VacuumTask, @@ -37,11 +37,7 @@ pub trait HummockMetaClient: Send + Sync + 'static { async fn get_snapshot(&self) -> Result; async fn get_new_sst_ids(&self, number: u32) -> Result; // We keep `commit_epoch` only for test/benchmark. - async fn commit_epoch( - &self, - epoch: HummockEpoch, - sstables: Vec, - ) -> Result<()>; + async fn commit_epoch(&self, epoch: HummockEpoch, sync_result: SyncResult) -> Result<()>; async fn update_current_epoch(&self, epoch: HummockEpoch) -> Result<()>; async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> Result<()>; async fn trigger_manual_compaction( diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index d438c13ba7a00..fbb8dff1f5a98 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -37,8 +37,8 @@ use risingwave_common::RW_VERSION; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, - SstObjectIdRange, + CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, + SyncResult, }; use risingwave_pb::backup_service::backup_service_client::BackupServiceClient; use risingwave_pb::backup_service::*; @@ -1392,11 +1392,7 @@ impl HummockMetaClient for MetaClient { Ok(SstObjectIdRange::new(resp.start_id, resp.end_id)) } - async fn commit_epoch( - &self, - _epoch: HummockEpoch, - _sstables: Vec, - ) -> Result<()> { + async fn commit_epoch(&self, _epoch: HummockEpoch, _sync_result: SyncResult) -> Result<()> { panic!("Only meta service can commit_epoch in production.") } diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index aa095b6c66322..1fbd627e13f02 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -27,6 +27,7 @@ mod key_cmp; use std::cmp::Ordering; +use std::collections::HashMap; pub use key_cmp::*; use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK; @@ -48,6 +49,9 @@ pub mod table_watermark; pub mod version; pub use compact::*; +use risingwave_common::catalog::TableId; + +use crate::table_watermark::TableWatermarks; pub type HummockSstableObjectId = u64; pub type HummockSstableId = u64; @@ -89,6 +93,16 @@ macro_rules! info_in_release { } } +#[derive(Default, Debug)] +pub struct SyncResult { + /// The size of all synced shared buffers. + pub sync_size: usize, + /// The `sst_info` of sync. + pub uncommitted_ssts: Vec, + /// The collected table watermarks written by state tables. + pub table_watermarks: HashMap, +} + #[derive(Debug, Clone)] pub struct LocalSstableInfo { pub compaction_group_id: CompactionGroupId, diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index 43899fa7e256c..3b7f4bc74740e 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -20,7 +20,7 @@ use futures_async_stream::try_stream; use risingwave_common::util::addr::HostAddr; use risingwave_common_service::observer_manager::{Channel, NotificationClient, ObserverError}; use risingwave_hummock_sdk::key::TableKey; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use risingwave_hummock_trace::{ GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore, ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions, @@ -33,7 +33,7 @@ use risingwave_pb::meta::{SubscribeResponse, SubscribeType}; use risingwave_storage::hummock::store::LocalHummockStorage; use risingwave_storage::hummock::HummockStorage; use risingwave_storage::store::{ - to_owned_item, LocalStateStore, StateStoreIterExt, StateStoreRead, SyncResult, + to_owned_item, LocalStateStore, StateStoreIterExt, StateStoreRead, }; use risingwave_storage::{StateStore, StateStoreIter, StateStoreReadIter}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 3718b06f00fe5..7b8da8c92f077 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -186,13 +186,9 @@ pub(crate) mod tests { } else { local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); } - let ssts = storage - .seal_and_sync_epoch(epoch) - .await - .unwrap() - .uncommitted_ssts; + let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); - hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap(); + hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); } } @@ -537,12 +533,8 @@ pub(crate) mod tests { storage: &HummockStorage, epoch: u64, ) { - let ssts = storage - .seal_and_sync_epoch(epoch) - .await - .unwrap() - .uncommitted_ssts; - hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap(); + let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); + hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); } async fn prepare_data( @@ -766,12 +758,8 @@ pub(crate) mod tests { storage.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); other.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); - let ssts = global_storage - .seal_and_sync_epoch(epoch) - .await - .unwrap() - .uncommitted_ssts; - hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap(); + let res = global_storage.seal_and_sync_epoch(epoch).await.unwrap(); + hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); } // Mimic dropping table @@ -959,12 +947,8 @@ pub(crate) mod tests { local.flush().await.unwrap(); local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); - let ssts = storage - .seal_and_sync_epoch(epoch) - .await - .unwrap() - .uncommitted_ssts; - hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap(); + let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); + hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); } let manual_compcation_option = ManualCompactionOption { @@ -1158,12 +1142,8 @@ pub(crate) mod tests { .unwrap(); local.flush().await.unwrap(); local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); - let ssts = storage - .seal_and_sync_epoch(epoch) - .await - .unwrap() - .uncommitted_ssts; - hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap(); + let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); + hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); } let manual_compcation_option = ManualCompactionOption { diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index 98fe2759abfcb..d2ab797cb634a 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -140,12 +140,8 @@ async fn test_failpoints_state_store_read_upload() { ); // sync epoch1 test the read_error - let ssts = hummock_storage - .seal_and_sync_epoch(1) - .await - .unwrap() - .uncommitted_ssts; - meta_client.commit_epoch(1, ssts).await.unwrap(); + let res = hummock_storage.seal_and_sync_epoch(1).await.unwrap(); + meta_client.commit_epoch(1, res).await.unwrap(); hummock_storage .try_wait_epoch(HummockReadEpoch::Committed(1)) .await @@ -216,12 +212,8 @@ async fn test_failpoints_state_store_read_upload() { assert!(result.is_err()); fail::remove(mem_upload_err); - let ssts = hummock_storage - .seal_and_sync_epoch(3) - .await - .unwrap() - .uncommitted_ssts; - meta_client.commit_epoch(3, ssts).await.unwrap(); + let res = hummock_storage.seal_and_sync_epoch(3).await.unwrap(); + meta_client.commit_epoch(3, res).await.unwrap(); hummock_storage .try_wait_epoch(HummockReadEpoch::Committed(3)) .await diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 940eeeaabbc29..f7711b7fcdf13 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -534,7 +534,7 @@ async fn test_state_store_sync() { let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); test_env .meta_client - .commit_epoch(epoch1, res.uncommitted_ssts) + .commit_epoch(epoch1, res) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch1).await; @@ -576,7 +576,7 @@ async fn test_state_store_sync() { let res = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); test_env .meta_client - .commit_epoch(epoch2, res.uncommitted_ssts) + .commit_epoch(epoch2, res) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch2).await; @@ -780,7 +780,7 @@ async fn test_delete_get() { let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); test_env .meta_client - .commit_epoch(epoch1, res.uncommitted_ssts) + .commit_epoch(epoch1, res) .await .unwrap(); let epoch2 = epoch1.next_epoch(); @@ -802,7 +802,7 @@ async fn test_delete_get() { let res = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); test_env .meta_client - .commit_epoch(epoch2, res.uncommitted_ssts) + .commit_epoch(epoch2, res) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch2).await; @@ -960,13 +960,13 @@ async fn test_multiple_epoch_sync() { test_env .meta_client - .commit_epoch(epoch2, sync_result2.uncommitted_ssts) + .commit_epoch(epoch2, sync_result2) .await .unwrap(); test_env .meta_client - .commit_epoch(epoch3, sync_result3.uncommitted_ssts) + .commit_epoch(epoch3, sync_result3) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch3).await; @@ -1114,12 +1114,12 @@ async fn test_iter_with_min_epoch() { let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); test_env .meta_client - .commit_epoch(epoch1, sync_result1.uncommitted_ssts) + .commit_epoch(epoch1, sync_result1) .await .unwrap(); test_env .meta_client - .commit_epoch(epoch2, sync_result2.uncommitted_ssts) + .commit_epoch(epoch2, sync_result2) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch2).await; @@ -1390,7 +1390,7 @@ async fn test_hummock_version_reader() { let sync_result1 = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); test_env .meta_client - .commit_epoch(epoch1, sync_result1.uncommitted_ssts) + .commit_epoch(epoch1, sync_result1) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch1).await; @@ -1398,7 +1398,7 @@ async fn test_hummock_version_reader() { let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); test_env .meta_client - .commit_epoch(epoch2, sync_result2.uncommitted_ssts) + .commit_epoch(epoch2, sync_result2) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch2).await; @@ -1406,7 +1406,7 @@ async fn test_hummock_version_reader() { let sync_result3 = test_env.storage.seal_and_sync_epoch(epoch3).await.unwrap(); test_env .meta_client - .commit_epoch(epoch3, sync_result3.uncommitted_ssts) + .commit_epoch(epoch3, sync_result3) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch3).await; @@ -1777,12 +1777,12 @@ async fn test_get_with_min_epoch() { let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); test_env .meta_client - .commit_epoch(epoch1, sync_result1.uncommitted_ssts) + .commit_epoch(epoch1, sync_result1) .await .unwrap(); test_env .meta_client - .commit_epoch(epoch2, sync_result2.uncommitted_ssts) + .commit_epoch(epoch2, sync_result2) .await .unwrap(); diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index c019f12a0268b..981ed3687bd40 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -137,14 +137,10 @@ async fn test_snapshot_inner( let epoch2 = epoch1.next_epoch(); local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); if enable_sync { - let ssts = hummock_storage - .seal_and_sync_epoch(epoch1) - .await - .unwrap() - .uncommitted_ssts; + let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); if enable_commit { mock_hummock_meta_client - .commit_epoch(epoch1, ssts) + .commit_epoch(epoch1, res) .await .unwrap(); hummock_storage @@ -181,14 +177,10 @@ async fn test_snapshot_inner( let epoch3 = epoch2.next_epoch(); local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); if enable_sync { - let ssts = hummock_storage - .seal_and_sync_epoch(epoch2) - .await - .unwrap() - .uncommitted_ssts; + let res = hummock_storage.seal_and_sync_epoch(epoch2).await.unwrap(); if enable_commit { mock_hummock_meta_client - .commit_epoch(epoch2, ssts) + .commit_epoch(epoch2, res) .await .unwrap(); hummock_storage @@ -225,14 +217,10 @@ async fn test_snapshot_inner( .unwrap(); local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); if enable_sync { - let ssts = hummock_storage - .seal_and_sync_epoch(epoch3) - .await - .unwrap() - .uncommitted_ssts; + let res = hummock_storage.seal_and_sync_epoch(epoch3).await.unwrap(); if enable_commit { mock_hummock_meta_client - .commit_epoch(epoch3, ssts) + .commit_epoch(epoch3, res) .await .unwrap(); hummock_storage @@ -287,14 +275,10 @@ async fn test_snapshot_range_scan_inner( .unwrap(); local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); if enable_sync { - let ssts = hummock_storage - .seal_and_sync_epoch(epoch) - .await - .unwrap() - .uncommitted_ssts; + let res = hummock_storage.seal_and_sync_epoch(epoch).await.unwrap(); if enable_commit { mock_hummock_meta_client - .commit_epoch(epoch, ssts) + .commit_epoch(epoch, res) .await .unwrap(); hummock_storage diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index ff6385e35ab1e..40dd23d78d987 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -24,7 +24,9 @@ use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::key::prefixed_range_with_vnode; -use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo}; +use risingwave_hummock_sdk::{ + HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, +}; use risingwave_meta::hummock::test_utils::setup_compute_env; use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_rpc_client::HummockMetaClient; @@ -372,12 +374,8 @@ async fn test_basic_inner( .unwrap(); let len = count_stream(iter).await; assert_eq!(len, 4); - let ssts = hummock_storage - .seal_and_sync_epoch(epoch1) - .await - .unwrap() - .uncommitted_ssts; - meta_client.commit_epoch(epoch1, ssts).await.unwrap(); + let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); + meta_client.commit_epoch(epoch1, res).await.unwrap(); hummock_storage .try_wait_epoch(HummockReadEpoch::Committed(epoch1)) .await @@ -1062,12 +1060,8 @@ async fn test_delete_get_inner( ) .await .unwrap(); - let ssts = hummock_storage - .seal_and_sync_epoch(epoch1) - .await - .unwrap() - .uncommitted_ssts; - meta_client.commit_epoch(epoch1, ssts).await.unwrap(); + let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); + meta_client.commit_epoch(epoch1, res).await.unwrap(); let epoch2 = epoch1.next_epoch(); local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); @@ -1086,12 +1080,8 @@ async fn test_delete_get_inner( .await .unwrap(); local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); - let ssts = hummock_storage - .seal_and_sync_epoch(epoch2) - .await - .unwrap() - .uncommitted_ssts; - meta_client.commit_epoch(epoch2, ssts).await.unwrap(); + let res = hummock_storage.seal_and_sync_epoch(epoch2).await.unwrap(); + meta_client.commit_epoch(epoch2, res).await.unwrap(); hummock_storage .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) .await @@ -1242,12 +1232,12 @@ async fn test_multiple_epoch_sync_inner( test_get().await; meta_client - .commit_epoch(epoch2, sync_result2.uncommitted_ssts) + .commit_epoch(epoch2, sync_result2) .await .unwrap(); meta_client - .commit_epoch(epoch3, sync_result3.uncommitted_ssts) + .commit_epoch(epoch3, sync_result3) .await .unwrap(); hummock_storage @@ -1340,7 +1330,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { min_object_id_epoch1, ); meta_client - .commit_epoch(epoch1, sync_result1.uncommitted_ssts) + .commit_epoch(epoch1, sync_result1) .await .unwrap(); hummock_storage diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index f5d1a10a18839..82b01bccaa12f 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -21,6 +21,8 @@ use risingwave_common_service::observer_manager::ObserverManager; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::TableKey; pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str}; +#[cfg(test)] +use risingwave_hummock_sdk::SyncResult; use risingwave_meta::hummock::test_utils::{ register_table_ids_to_compaction_group, setup_compute_env, }; @@ -28,6 +30,7 @@ use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient}; use risingwave_meta::manager::MetaSrvEnv; use risingwave_pb::catalog::{PbTable, Table}; use risingwave_pb::common::WorkerNode; +use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::error::StorageResult; use risingwave_storage::filter_key_extractor::{ FilterKeyExtractorImpl, FilterKeyExtractorManager, FullKeyFilterKeyExtractor, @@ -252,10 +255,7 @@ impl HummockTestEnv { // On completion of this function call, the provided epoch should be committed and visible. pub async fn commit_epoch(&self, epoch: u64) { let res = self.storage.seal_and_sync_epoch(epoch).await.unwrap(); - self.meta_client - .commit_epoch_with_watermark(epoch, res.uncommitted_ssts, res.table_watermarks) - .await - .unwrap(); + self.meta_client.commit_epoch(epoch, res).await.unwrap(); self.storage.try_wait_epoch_for_test(epoch).await; } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index fcfcac53a478e..0b96aa19cb837 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -28,7 +28,7 @@ use prometheus::core::{AtomicU64, GenericGauge}; use prometheus::{Histogram, IntGauge}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; -use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo, SyncResult}; use thiserror_ext::AsReport; use tokio::spawn; use tokio::sync::mpsc::error::SendError; @@ -60,7 +60,6 @@ use crate::hummock::{ }; use crate::monitor::HummockStateStoreMetrics; use crate::opts::StorageOpts; -use crate::store::SyncResult; #[derive(Clone)] pub struct BufferTracker { diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 04532cc9b9c10..9e98fac4ee3b1 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -18,14 +18,14 @@ use std::sync::Arc; use parking_lot::{RwLock, RwLockReadGuard}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::HummockEpoch; +use risingwave_hummock_sdk::{HummockEpoch, SyncResult}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; use crate::hummock::HummockResult; use crate::mem_table::ImmutableMemtable; -use crate::store::{SealCurrentEpochOptions, SyncResult}; +use crate::store::SealCurrentEpochOptions; pub mod hummock_event_handler; pub mod refiller; diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 50c60ce5f11be..5cf380285cf1e 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::stream::BoxStream; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo, SstObjectIdRange}; +use risingwave_hummock_sdk::{HummockSstableObjectId, SstObjectIdRange, SyncResult}; use risingwave_pb::hummock::{HummockSnapshot, SubscribeCompactionEventRequest, VacuumTask}; use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient, MetaClient}; @@ -78,11 +78,7 @@ impl HummockMetaClient for MonitoredHummockMetaClient { res } - async fn commit_epoch( - &self, - _epoch: HummockEpoch, - _sstables: Vec, - ) -> Result<()> { + async fn commit_epoch(&self, _epoch: HummockEpoch, _sync_result: SyncResult) -> Result<()> { panic!("Only meta service can commit_epoch in production.") } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 2bdbd70c17d56..8c18b647ca819 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -28,7 +28,7 @@ use risingwave_hummock_sdk::key::{ is_empty_key_range, vnode, vnode_range, TableKey, TableKeyRange, }; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index db8eafcf4bc1d..5e645aa7ea57f 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -22,7 +22,7 @@ use bytes::Bytes; use parking_lot::RwLock; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey}; -use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; +use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch, SyncResult}; use crate::error::StorageResult; use crate::mem_table::MemtableLocalStateStore; diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 9486256181f9f..79f5bf3bdcf1b 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -20,7 +20,7 @@ use futures::{Future, TryFutureExt}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use thiserror_ext::AsReport; use tokio::time::Instant; use tracing::{error, Instrument}; diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 51c15b944c2ba..47c0de67729b6 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -17,7 +17,7 @@ use bytes::Bytes; use futures::{Future, TryFutureExt}; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use risingwave_hummock_trace::{ init_collector, should_use_trace, ConcurrentId, MayTraceSpan, OperationResult, StorageType, TraceResult, TraceSpan, TracedBytes, TracedSealCurrentEpochOptions, LOCAL_ID, diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 546dd5a399c75..7c9645697caa1 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use bytes::Bytes; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use crate::error::StorageResult; use crate::storage_value::StorageValue; diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 8b15e581796a8..95705d154af6c 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::cmp::min; -use std::collections::HashMap; use std::default::Default; use std::fmt::{Debug, Formatter}; use std::future::Future; @@ -30,10 +29,8 @@ use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{Epoch, EpochPair}; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; -use risingwave_hummock_sdk::table_watermark::{ - TableWatermarks, VnodeWatermark, WatermarkDirection, -}; -use risingwave_hummock_sdk::{HummockReadEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; +use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use risingwave_hummock_trace::{ TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions, TracedReadOptions, TracedSealCurrentEpochOptions, TracedWriteOptions, @@ -311,16 +308,6 @@ pub trait StateStoreWrite: StaticSendSync { ) -> StorageResult; } -#[derive(Default, Debug)] -pub struct SyncResult { - /// The size of all synced shared buffers. - pub sync_size: usize, - /// The `sst_info` of sync. - pub uncommitted_ssts: Vec, - /// The collected table watermarks written by state tables. - pub table_watermarks: HashMap, -} - pub trait StateStore: StateStoreRead + StaticSendSync + Clone { type Local: LocalStateStore; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 1d67e2fa9da2f..7fdb838654b70 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -212,7 +212,7 @@ pub mod verify { use bytes::Bytes; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; - use risingwave_hummock_sdk::HummockReadEpoch; + use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use tracing::log::warn; use crate::error::StorageResult; @@ -733,7 +733,7 @@ pub mod boxed_state_store { use dyn_clone::{clone_trait_object, DynClone}; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; - use risingwave_hummock_sdk::HummockReadEpoch; + use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use crate::error::StorageResult; use crate::hummock::HummockStorage; diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 5d7c1d021a6b1..3d28ccdcb7e6c 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -411,10 +411,9 @@ mod tests { use risingwave_connector::sink::log_store::{ ChunkId, LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset, }; - use risingwave_hummock_sdk::HummockReadEpoch; + use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use risingwave_hummock_test::test_utils::prepare_hummock_test_env; use risingwave_storage::hummock::HummockStorage; - use risingwave_storage::store::SyncResult; use risingwave_storage::StateStore; use crate::common::log_store_impl::kv_log_store::reader::KvLogStoreReader; diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index b04c9f1dd6378..4ef3381871cbf 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -1356,11 +1356,7 @@ async fn test_state_table_may_exist() { // test may_exist with data only in uncommitted ssts (e1) check_may_exist(&state_table, vec![1, 4], vec![2, 3, 6, 12]).await; - test_env - .meta_client - .commit_epoch(e1, e1_res.uncommitted_ssts) - .await - .unwrap(); + test_env.meta_client.commit_epoch(e1, e1_res).await.unwrap(); test_env.storage.try_wait_epoch_for_test(e1).await; // test may_exist with data only in committed ssts (e1) @@ -1434,11 +1430,7 @@ async fn test_state_table_may_exist() { // (e2), committed ssts (e1) check_may_exist(&state_table, vec![1, 3, 4, 6], vec![12]).await; - test_env - .meta_client - .commit_epoch(e2, e2_res.uncommitted_ssts) - .await - .unwrap(); + test_env.meta_client.commit_epoch(e2, e2_res).await.unwrap(); test_env.storage.try_wait_epoch_for_test(e2).await; epoch.inc_for_test(); @@ -1451,18 +1443,10 @@ async fn test_state_table_may_exist() { // test may_exist with data in uncommitted ssts (e3, e4), committed ssts (e1, e2, e3, e4) check_may_exist(&state_table, vec![1, 3, 4, 6], vec![12]).await; - test_env - .meta_client - .commit_epoch(e3, e3_res.uncommitted_ssts) - .await - .unwrap(); + test_env.meta_client.commit_epoch(e3, e3_res).await.unwrap(); test_env.storage.try_wait_epoch_for_test(e3).await; - test_env - .meta_client - .commit_epoch(e4, e4_res.uncommitted_ssts) - .await - .unwrap(); + test_env.meta_client.commit_epoch(e4, e4_res).await.unwrap(); test_env.storage.try_wait_epoch_for_test(e4).await; // test may_exist with data in committed ssts (e1, e2, e3, e4) diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 9d006a426d6b7..6e7b6f70e2421 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -48,7 +48,7 @@ mod tests; pub use progress::CreateMviewProgress; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; -use risingwave_hummock_sdk::LocalSstableInfo; +use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult}; use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -58,7 +58,6 @@ use risingwave_pb::stream_service::{ streaming_control_stream_response, BarrierCompleteResponse, StreamingControlStreamRequest, StreamingControlStreamResponse, }; -use risingwave_storage::store::SyncResult; use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index ed192b39dfe5f..4bb39ddabff1e 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -25,9 +25,9 @@ use futures::stream::FuturesOrdered; use futures::{FutureExt, StreamExt}; use prometheus::HistogramTimer; use risingwave_common::must_match; +use risingwave_hummock_sdk::SyncResult; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; -use risingwave_storage::store::SyncResult; use risingwave_storage::{dispatch_state_store, StateStore, StateStoreImpl}; use rw_futures_util::pending_on_none; use thiserror_ext::AsReport; diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 66abdf4286e37..341e89a58af89 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -377,7 +377,7 @@ async fn run_compare_result( // let checkpoint = epoch % 10 == 0; let ret = hummock.seal_and_sync_epoch(epoch).await.unwrap(); meta_client - .commit_epoch(epoch, ret.uncommitted_ssts) + .commit_epoch(epoch, ret) .await .map_err(|e| format!("{:?}", e))?; if (epoch / test_epoch(1)) % 200 == 0 {