diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index 1a04a83730d88..4ecc094eb8d7e 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -99,7 +99,7 @@ fn criterion_benchmark(c: &mut Criterion) { )) .unwrap(); } - hummock_storage.seal_current_epoch(MAX_EPOCH); + hummock_storage.seal_current_epoch(MAX_EPOCH, SealCurrentEpochOptions::for_test()); c.bench_function("bench-hummock-iter", move |b| { b.iter(|| { diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index 84dc71d88599f..67d299ec34cc2 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -25,7 +25,7 @@ use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_trace::{ GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore, ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions, - TracedReadOptions, TracedSubResp, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, }; use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey}; use risingwave_pb::common::WorkerNode; @@ -207,8 +207,11 @@ impl LocalReplay for LocalReplayImpl { .map_err(|_| TraceError::Other("init failed")) } - fn seal_current_epoch(&mut self, next_epoch: u64) { - self.0.seal_current_epoch(next_epoch); + fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) { + self.0.seal_current_epoch( + next_epoch, + opts.try_into().expect("should not fail to convert"), + ); } fn epoch(&self) -> u64 { diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 294acd39df31f..8af2907c5f781 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -165,9 +165,9 @@ pub(crate) mod tests { .await .unwrap(); if i + 1 < epochs.len() { - local.seal_current_epoch(epochs[i + 1]); + local.seal_current_epoch(epochs[i + 1], SealCurrentEpochOptions::for_test()); } else { - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); } let ssts = storage .seal_and_sync_epoch(epoch) @@ -549,7 +549,7 @@ pub(crate) mod tests { .unwrap(); } local.flush(Vec::new()).await.unwrap(); - local.seal_current_epoch(epoch + 1); + local.seal_current_epoch(epoch + 1, SealCurrentEpochOptions::for_test()); flush_and_commit(&hummock_meta_client, storage, epoch).await; } @@ -734,8 +734,8 @@ pub(crate) mod tests { .insert(TableKey(prefix.freeze()), val.clone(), None) .unwrap(); storage.flush(Vec::new()).await.unwrap(); - storage.seal_current_epoch(next_epoch); - other.seal_current_epoch(next_epoch); + storage.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + other.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); let ssts = global_storage .seal_and_sync_epoch(epoch) @@ -925,7 +925,7 @@ pub(crate) mod tests { .insert(TableKey(prefix.freeze()), val.clone(), None) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local.seal_current_epoch(next_epoch); + local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); let ssts = storage .seal_and_sync_epoch(epoch) @@ -1123,7 +1123,7 @@ pub(crate) mod tests { .insert(TableKey(Bytes::from(ramdom_key)), val.clone(), None) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local.seal_current_epoch(next_epoch); + local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); let ssts = storage .seal_and_sync_epoch(epoch) .await @@ -1294,7 +1294,7 @@ pub(crate) mod tests { .flush(vec![prefix_key_range(1u16), prefix_key_range(2u16)]) .await .unwrap(); - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); flush_and_commit(&hummock_meta_client, &storage, 130).await; diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index 114355654d1c8..cbfec13e354fe 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -95,7 +95,10 @@ async fn test_failpoints_state_store_read_upload() { .await .unwrap(); - local.seal_current_epoch(3); + local.seal_current_epoch( + 3, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); // Get the value after flushing to remote. let anchor_prefix_hint = { @@ -131,7 +134,10 @@ async fn test_failpoints_state_store_read_upload() { .await .unwrap(); - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch( + u64::MAX, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); // sync epoch1 test the read_error let ssts = hummock_storage diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 89ee5bb826f3f..d444b05802807 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -43,9 +43,9 @@ async fn test_read_version_basic() { let (pinned_version, _, _) = prepare_first_valid_version(env, hummock_manager_ref, worker_node).await; - let mut read_version = HummockReadVersion::new(pinned_version); let mut epoch = 1; let table_id = 0; + let mut read_version = HummockReadVersion::new(pinned_version); { // single imm @@ -266,9 +266,9 @@ async fn test_read_filter_basic() { let (pinned_version, _, _) = prepare_first_valid_version(env, hummock_manager_ref, worker_node).await; - let read_version = Arc::new(RwLock::new(HummockReadVersion::new(pinned_version))); let epoch = 1; let table_id = 0; + let read_version = Arc::new(RwLock::new(HummockReadVersion::new(pinned_version))); { // single imm diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 0df89516e863b..de120365156b7 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -156,7 +156,7 @@ async fn test_storage_basic() { assert_eq!(value, None); let epoch2 = epoch1 + 1; - hummock_storage.seal_current_epoch(epoch2); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); hummock_storage .ingest_batch( batch2, @@ -189,7 +189,7 @@ async fn test_storage_basic() { // Write the third batch. let epoch3 = epoch2 + 1; - hummock_storage.seal_current_epoch(epoch3); + hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); hummock_storage .ingest_batch( batch3, @@ -502,7 +502,7 @@ async fn test_state_store_sync() { .unwrap(); let epoch2 = epoch1 + 1; - hummock_storage.seal_current_epoch(epoch2); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // ingest more 8B then will trigger a sync behind the scene let mut batch3 = vec![( @@ -734,7 +734,7 @@ async fn test_delete_get() { .await .unwrap(); let epoch2 = initial_epoch + 2; - hummock_storage.seal_current_epoch(epoch2); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -814,7 +814,7 @@ async fn test_multiple_epoch_sync() { .unwrap(); let epoch2 = initial_epoch + 2; - hummock_storage.seal_current_epoch(epoch2); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -832,7 +832,7 @@ async fn test_multiple_epoch_sync() { .unwrap(); let epoch3 = initial_epoch + 3; - hummock_storage.seal_current_epoch(epoch3); + hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); let batch3 = vec![ ( gen_key_from_str(VirtualNode::ZERO, "bb"), @@ -965,7 +965,7 @@ async fn test_iter_with_min_epoch() { .unwrap(); let epoch2 = (32 * 1000) << 16; - hummock_storage.seal_current_epoch(epoch2); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // epoch 2 write let batch_epoch2: Vec<(TableKey, StorageValue)> = (20..30) .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) @@ -1186,7 +1186,7 @@ async fn test_hummock_version_reader() { .await .unwrap(); - hummock_storage.seal_current_epoch(epoch2); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); hummock_storage .ingest_batch( batch_epoch2, @@ -1199,7 +1199,7 @@ async fn test_hummock_version_reader() { .await .unwrap(); - hummock_storage.seal_current_epoch(epoch3); + hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); hummock_storage .ingest_batch( batch_epoch3, @@ -1601,7 +1601,7 @@ async fn test_get_with_min_epoch() { .unwrap(); let epoch2 = (32 * 1000) << 16; - hummock_storage.seal_current_epoch(epoch2); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // epoch 2 write let batch_epoch2: Vec<(TableKey, StorageValue)> = (20..30) .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index da11db48491e5..e93d2adb661b2 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -24,7 +24,8 @@ use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::CachePolicy; use risingwave_storage::storage_value::StorageValue; use risingwave_storage::store::{ - LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, WriteOptions, + LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, SealCurrentEpochOptions, + WriteOptions, }; use crate::local_state_store_test_utils::LocalStateStoreTestExt; @@ -123,7 +124,7 @@ async fn test_snapshot_inner( .await .unwrap(); let epoch2 = epoch1 + 1; - local.seal_current_epoch(epoch2); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch1) @@ -159,7 +160,7 @@ async fn test_snapshot_inner( .await .unwrap(); let epoch3 = epoch2 + 1; - local.seal_current_epoch(epoch3); + local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch2) @@ -195,7 +196,7 @@ async fn test_snapshot_inner( ) .await .unwrap(); - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch3) @@ -246,7 +247,7 @@ async fn test_snapshot_range_scan_inner( ) .await .unwrap(); - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch) diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 328d8a83738e1..5fb64e8522fad 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -161,7 +161,7 @@ async fn test_basic_inner( .unwrap(); let epoch2 = epoch1 + 1; - local.seal_current_epoch(epoch2); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // Get the value after flushing to remote. let value = hummock_storage @@ -219,7 +219,7 @@ async fn test_basic_inner( .unwrap(); let epoch3 = epoch2 + 1; - local.seal_current_epoch(epoch3); + local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); // Get the value after flushing to remote. let value = hummock_storage @@ -250,7 +250,7 @@ async fn test_basic_inner( .await .unwrap(); - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); // Get the value after flushing to remote. let value = hummock_storage @@ -482,7 +482,7 @@ async fn test_state_store_sync_inner( // ); epoch += 1; - local.seal_current_epoch(epoch); + local.seal_current_epoch(epoch, SealCurrentEpochOptions::for_test()); // ingest more 8B then will trigger a sync behind the scene let mut batch3 = vec![( @@ -510,7 +510,7 @@ async fn test_state_store_sync_inner( // hummock_storage.shared_buffer_manager().size() as u64 // ); - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); // trigger a sync hummock_storage @@ -976,7 +976,7 @@ async fn test_write_anytime_inner( assert_new_value(epoch1).await; let epoch2 = epoch1 + 1; - local.seal_current_epoch(epoch2); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // Write to epoch2 local @@ -990,7 +990,7 @@ async fn test_write_anytime_inner( ) .await .unwrap(); - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); // Assert epoch 1 unchanged assert_new_value(epoch1).await; // Assert epoch 2 correctness @@ -1059,7 +1059,7 @@ async fn test_delete_get_inner( meta_client.commit_epoch(epoch1, ssts).await.unwrap(); let epoch2 = initial_epoch + 2; - local.seal_current_epoch(epoch2); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -1075,7 +1075,7 @@ async fn test_delete_get_inner( ) .await .unwrap(); - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); let ssts = hummock_storage .seal_and_sync_epoch(epoch2) .await @@ -1138,7 +1138,7 @@ async fn test_multiple_epoch_sync_inner( .unwrap(); let epoch2 = initial_epoch + 2; - local.seal_current_epoch(epoch2); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -1166,7 +1166,7 @@ async fn test_multiple_epoch_sync_inner( StorageValue::new_put("555"), ), ]; - local.seal_current_epoch(epoch3); + local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); local .ingest_batch( batch3, @@ -1178,7 +1178,7 @@ async fn test_multiple_epoch_sync_inner( ) .await .unwrap(); - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); let test_get = || { let hummock_storage_clone = &hummock_storage; async move { @@ -1290,7 +1290,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { ); let epoch2 = initial_epoch + 2; - local_hummock_storage.seal_current_epoch(epoch2); + local_hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); local_hummock_storage .delete( gen_key_from_str(VirtualNode::ZERO, "bb"), @@ -1313,7 +1313,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .min() .unwrap() }; - local_hummock_storage.seal_current_epoch(u64::MAX); + local_hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); let sync_result1 = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); let min_object_id_epoch1 = min_object_id(&sync_result1); assert_eq!( diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index 1b7d6d80029a5..aa862d80085f7 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -298,7 +298,10 @@ async fn test_syncpoints_get_in_delete_range_boundary() { ) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local.seal_current_epoch(101); + local.seal_current_epoch( + 101, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); flush_and_commit(&hummock_meta_client, &storage, 100).await; compact_once( hummock_manager_ref.clone(), @@ -329,7 +332,10 @@ async fn test_syncpoints_get_in_delete_range_boundary() { )]) .await .unwrap(); - local.seal_current_epoch(102); + local.seal_current_epoch( + 102, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); flush_and_commit(&hummock_meta_client, &storage, 101).await; compact_once( hummock_manager_ref.clone(), @@ -360,7 +366,10 @@ async fn test_syncpoints_get_in_delete_range_boundary() { )]) .await .unwrap(); - local.seal_current_epoch(103); + local.seal_current_epoch( + 103, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); flush_and_commit(&hummock_meta_client, &storage, 102).await; // move this two file to the same level. compact_once( @@ -386,7 +395,10 @@ async fn test_syncpoints_get_in_delete_range_boundary() { ) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local.seal_current_epoch(u64::MAX); + local.seal_current_epoch( + u64::MAX, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); flush_and_commit(&hummock_meta_client, &storage, 103).await; // move this two file to the same level. compact_once( diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index aaa18205a90e6..1d2d32577fa9f 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -34,7 +34,8 @@ use tokio::task_local; use crate::write::{TraceWriter, TraceWriterImpl}; use crate::{ ConcurrentIdGenerator, Operation, OperationResult, Record, RecordId, RecordIdGenerator, - TracedInitOptions, TracedNewLocalOptions, TracedReadOptions, TracedSubResp, UniqueIdGenerator, + TracedInitOptions, TracedNewLocalOptions, TracedReadOptions, TracedSealCurrentEpochOptions, + TracedSubResp, UniqueIdGenerator, }; // Global collector instance used for trace collection @@ -205,8 +206,12 @@ impl TraceSpan { Self::new_global_op(Operation::LocalStorageIsDirty, storage_type) } - pub fn new_seal_current_epoch_span(epoch: u64, storage_type: StorageType) -> MayTraceSpan { - Self::new_global_op(Operation::SealCurrentEpoch(epoch), storage_type) + pub fn new_seal_current_epoch_span( + epoch: u64, + opts: TracedSealCurrentEpochOptions, + storage_type: StorageType, + ) -> MayTraceSpan { + Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type) } pub fn new_clear_shared_buffer_span() -> MayTraceSpan { diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 8d9fa20af6ebf..9dcc0fd22e4e5 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -213,3 +213,6 @@ impl From for EpochPair { pub struct TracedInitOptions { pub epoch: TracedEpochPair, } + +#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] +pub struct TracedSealCurrentEpochOptions {} diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index ace14b0c0412c..43929fdda1d96 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -23,7 +23,7 @@ use risingwave_pb::meta::SubscribeResponse; use crate::{ LocalStorageId, StorageType, TracedHummockReadEpoch, TracedInitOptions, TracedNewLocalOptions, - TracedReadOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, }; pub type RecordId = u64; @@ -173,7 +173,10 @@ pub enum Operation { ClearSharedBuffer, /// Seal current epoch - SealCurrentEpoch(u64), + SealCurrentEpoch { + epoch: u64, + opts: TracedSealCurrentEpochOptions, + }, /// validate read epoch ValidateReadEpoch(TracedHummockReadEpoch), diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index 41372f5fb2b5e..df6c191f31764 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -33,7 +33,7 @@ use crate::error::Result; use crate::TraceError; use crate::{ LocalStorageId, Record, TracedBytes, TracedInitOptions, TracedNewLocalOptions, - TracedReadOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, }; pub type ReplayItem = (TracedBytes, TracedBytes); @@ -60,7 +60,7 @@ pub(crate) enum WorkerId { #[async_trait::async_trait] pub trait LocalReplay: LocalReplayRead + ReplayWrite + Send + Sync { async fn init(&mut self, options: TracedInitOptions) -> Result<()>; - fn seal_current_epoch(&mut self, next_epoch: u64); + fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions); fn is_dirty(&self) -> bool; fn epoch(&self) -> u64; async fn flush( @@ -185,7 +185,7 @@ mock! { #[async_trait::async_trait] impl LocalReplay for LocalReplayInterface{ async fn init(&mut self, options: TracedInitOptions) -> Result<()>; - fn seal_current_epoch(&mut self, next_epoch: u64); + fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions); fn is_dirty(&self) -> bool; fn epoch(&self) -> u64; async fn flush(&mut self, delete_ranges: Vec<(Bound, Bound)>) -> Result; diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index a4d99707d3516..f77543cf92b9d 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -344,10 +344,10 @@ impl ReplayWorker { ); } } - Operation::SealCurrentEpoch(epoch) => { + Operation::SealCurrentEpoch { epoch, opts } => { assert_ne!(storage_type, StorageType::Global); let local_storage = local_storages.get_mut(&storage_type).unwrap(); - local_storage.seal_current_epoch(epoch); + local_storage.seal_current_epoch(epoch, opts); } Operation::ValidateReadEpoch(epoch) => { assert_eq!(storage_type, StorageType::Global); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 1e0dde692bed0..b19464e0322cf 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -578,6 +578,9 @@ impl HummockEventHandler { self.uploader.start_merge_imms(epoch); } } + + HummockEvent::LocalSealEpoch { .. } => {} + #[cfg(any(test, feature = "test"))] HummockEvent::FlushEvent(sender) => { let _ = sender.send(()).inspect_err(|e| { diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 4a80ff1a51033..a6722b0d77116 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -24,7 +24,7 @@ use tokio::sync::{mpsc, oneshot}; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; use crate::hummock::HummockResult; use crate::mem_table::ImmutableMemtable; -use crate::store::SyncResult; +use crate::store::{SealCurrentEpochOptions, SyncResult}; pub mod hummock_event_handler; pub mod refiller; @@ -67,6 +67,13 @@ pub enum HummockEvent { is_checkpoint: bool, }, + LocalSealEpoch { + instance_id: LocalInstanceId, + table_id: TableId, + epoch: HummockEpoch, + opts: SealCurrentEpochOptions, + }, + #[cfg(any(test, feature = "test"))] /// Flush all previous event. When all previous events has been consumed, the event handler /// will notify @@ -114,6 +121,19 @@ impl HummockEvent { "SealEpoch epoch {:?} is_checkpoint {:?}", epoch, is_checkpoint ), + + HummockEvent::LocalSealEpoch { + epoch, + instance_id, + table_id, + opts, + } => { + format!( + "LocalSealEpoch epoch: {}, table_id: {}, instance_id: {}, opts: {:?}", + epoch, table_id.table_id, instance_id, opts + ) + } + HummockEvent::RegisterReadVersion { table_id, new_read_version_sender: _, diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 26730dcaeac5b..2fd8c5eb667da 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -409,7 +409,7 @@ impl LocalStateStore for LocalHummockStorage { Ok(()) } - fn seal_current_epoch(&mut self, next_epoch: u64) { + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { assert!(!self.is_dirty()); let prev_epoch = self .epoch @@ -422,6 +422,14 @@ impl LocalStateStore for LocalHummockStorage { next_epoch, prev_epoch ); + self.event_sender + .send(HummockEvent::LocalSealEpoch { + instance_id: self.instance_id(), + table_id: self.table_id, + epoch: prev_epoch, + opts, + }) + .expect("should be able to send") } } diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 79a7fb50f381b..25342f2b163e2 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -600,7 +600,7 @@ impl LocalStateStore for MemtableLocalState Ok(()) } - fn seal_current_epoch(&mut self, next_epoch: u64) { + fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) { assert!(!self.is_dirty()); let prev_epoch = self .epoch diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 024946b20bbc3..a4944a7a99195 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -280,9 +280,9 @@ impl LocalStateStore for MonitoredStateStore { self.inner.init(options).await } - fn seal_current_epoch(&mut self, next_epoch: u64) { + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { // TODO: may collect metrics - self.inner.seal_current_epoch(next_epoch) + self.inner.seal_current_epoch(next_epoch, opts) } fn try_flush(&mut self) -> impl Future> + Send + '_ { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 2a7c87a738174..cd65931a7e602 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -21,7 +21,7 @@ use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_trace::{ init_collector, should_use_trace, ConcurrentId, MayTraceSpan, OperationResult, StorageType, - TraceResult, TraceSpan, TracedBytes, LOCAL_ID, + TraceResult, TraceSpan, TracedBytes, TracedSealCurrentEpochOptions, LOCAL_ID, }; use super::identity; @@ -208,9 +208,13 @@ impl LocalStateStore for TracedStateStore { self.inner.init(options).await } - fn seal_current_epoch(&mut self, next_epoch: u64) { - let _span = TraceSpan::new_seal_current_epoch_span(next_epoch, self.storage_type); - self.inner.seal_current_epoch(next_epoch) + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + let _span = TraceSpan::new_seal_current_epoch_span( + next_epoch, + TracedSealCurrentEpochOptions::from(opts.clone()), + self.storage_type, + ); + self.inner.seal_current_epoch(next_epoch, opts) } async fn try_flush(&mut self) -> StorageResult<()> { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 0457bd1828c2c..ffeb35e77bf5d 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -130,7 +130,7 @@ impl LocalStateStore for PanicStateStore { panic!("should not operate on the panic state store!"); } - fn seal_current_epoch(&mut self, _next_epoch: u64) { + fn seal_current_epoch(&mut self, _next_epoch: u64, _opts: SealCurrentEpochOptions) { panic!("should not operate on the panic state store!") } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 004ff7937ea92..f869ab833578b 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -26,7 +26,7 @@ use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; use risingwave_hummock_sdk::{HummockReadEpoch, LocalSstableInfo}; use risingwave_hummock_trace::{ TracedInitOptions, TracedNewLocalOptions, TracedPrefetchOptions, TracedReadOptions, - TracedWriteOptions, + TracedSealCurrentEpochOptions, TracedWriteOptions, }; use crate::error::{StorageError, StorageResult}; @@ -244,12 +244,12 @@ pub trait LocalStateStore: StaticSendSync { /// In some cases like replicated state table, state table may not be empty initially, /// as such we need to wait for `epoch.prev` checkpoint to complete, /// hence this interface is made async. - fn init(&mut self, epoch: InitOptions) -> impl Future> + Send + '_; + fn init(&mut self, opts: InitOptions) -> impl Future> + Send + '_; /// Updates the monotonically increasing write epoch to `new_epoch`. /// All writes after this function is called will be tagged with `new_epoch`. In other words, /// the previous write epoch is sealed. - fn seal_current_epoch(&mut self, next_epoch: u64); + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); /// Check existence of a given `key_range`. /// It is better to provide `prefix_hint` in `read_options`, which will be used @@ -483,3 +483,32 @@ impl From for InitOptions { } } } + +#[derive(Clone, Debug)] +pub struct SealCurrentEpochOptions {} + +impl From for TracedSealCurrentEpochOptions { + fn from(_value: SealCurrentEpochOptions) -> Self { + TracedSealCurrentEpochOptions {} + } +} + +impl TryInto for TracedSealCurrentEpochOptions { + type Error = anyhow::Error; + + fn try_into(self) -> Result { + Ok(SealCurrentEpochOptions {}) + } +} + +impl SealCurrentEpochOptions { + #[expect(clippy::new_without_default)] + pub fn new() -> Self { + Self {} + } + + #[cfg(any(test, feature = "test"))] + pub fn for_test() -> Self { + Self::new() + } +} diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 20957c96ba4ce..df8190b42fb74 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -445,11 +445,11 @@ pub mod verify { Ok(()) } - fn seal_current_epoch(&mut self, next_epoch: u64) { - self.actual.seal_current_epoch(next_epoch); + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { if let Some(expected) = &mut self.expected { - expected.seal_current_epoch(next_epoch); + expected.seal_current_epoch(next_epoch, opts.clone()); } + self.actual.seal_current_epoch(next_epoch, opts); } fn epoch(&self) -> u64 { @@ -796,7 +796,7 @@ pub mod boxed_state_store { async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>; - fn seal_current_epoch(&mut self, next_epoch: u64); + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); } #[async_trait::async_trait] @@ -861,8 +861,8 @@ pub mod boxed_state_store { self.init(options).await } - fn seal_current_epoch(&mut self, next_epoch: u64) { - self.seal_current_epoch(next_epoch) + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + self.seal_current_epoch(next_epoch, opts) } } @@ -934,8 +934,8 @@ pub mod boxed_state_store { self.deref_mut().init(options) } - fn seal_current_epoch(&mut self, next_epoch: u64) { - self.deref_mut().seal_current_epoch(next_epoch) + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + self.deref_mut().seal_current_epoch(next_epoch, opts) } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 1e6e8681fcd77..061bd61be81dd 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -23,7 +23,7 @@ use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_connector::sink::log_store::{LogStoreResult, LogWriter}; -use risingwave_storage::store::{InitOptions, LocalStateStore}; +use risingwave_storage::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions}; use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferSender; use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde; @@ -143,7 +143,8 @@ impl LogWriter for KvLogStoreWriter { } } self.state_store.flush(delete_range).await?; - self.state_store.seal_current_epoch(next_epoch); + self.state_store + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::new()); self.tx.barrier(epoch, is_checkpoint, next_epoch); self.seq_id = FIRST_SEQ_ID; Ok(()) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 83c722d8683f9..a51cef284939b 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -48,7 +48,7 @@ use risingwave_storage::row_serde::row_serde_util::{ use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::store::{ InitOptions, LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, - StateStoreIterItemStream, + SealCurrentEpochOptions, StateStoreIterItemStream, }; use risingwave_storage::table::merge_sort::merge_sort; use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, Distribution, KeyedRow}; @@ -910,7 +910,8 @@ where self.watermark_buffer_strategy.tick(); if !self.is_dirty() { // If the state table is not modified, go fast path. - self.local_store.seal_current_epoch(new_epoch.curr); + self.local_store + .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::new()); return Ok(()); } else { self.seal_current_epoch(new_epoch.curr) @@ -978,7 +979,8 @@ where // Tick the watermark buffer here because state table is expected to be committed once // per epoch. self.watermark_buffer_strategy.tick(); - self.local_store.seal_current_epoch(new_epoch.curr); + self.local_store + .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::new()); } /// Write to state store. @@ -1087,7 +1089,8 @@ where } self.local_store.flush(delete_ranges).await?; - self.local_store.seal_current_epoch(next_epoch); + self.local_store + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::new()); Ok(()) } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 27e8ada89f55d..f6129ce60a84f 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -55,7 +55,9 @@ use risingwave_storage::hummock::{ }; use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics}; use risingwave_storage::opts::StorageOpts; -use risingwave_storage::store::{LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions}; +use risingwave_storage::store::{ + LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, SealCurrentEpochOptions, +}; use risingwave_storage::StateStore; use crate::CompactionTestOpts; @@ -419,7 +421,8 @@ impl NormalState { .flush(delete_ranges) .await .map_err(|e| format!("{:?}", e))?; - self.storage.seal_current_epoch(next_epoch); + self.storage + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); Ok(()) }