From 902c065a7635f3c989651b4471c771d30c51c877 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 20 Sep 2024 18:20:24 +0800 Subject: [PATCH] fix test --- .../hummock_test/src/state_store_tests.rs | 41 ++++++++++++++----- src/storage/src/hummock/utils.rs | 38 +++++++---------- 2 files changed, 45 insertions(+), 34 deletions(-) diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 468bc91a13e3a..1018a72e7d274 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -31,6 +31,7 @@ use risingwave_hummock_sdk::{ HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, }; use risingwave_meta::hummock::test_utils::setup_compute_env; +use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::iterator::change_log::test_utils::{ apply_test_log_data, gen_test_data, @@ -1420,7 +1421,31 @@ async fn test_gc_watermark_and_clear_shared_buffer() { async fn test_replicated_local_hummock_storage() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; - let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await; + let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + + let epoch0 = meta_client + .hummock_manager_ref() + .on_current_version(|version| version.visible_table_committed_epoch()) + .await; + + let epoch0 = epoch0.next_epoch(); + + meta_client + .hummock_manager_ref() + .commit_epoch(CommitEpochInfo { + sstables: vec![], + new_table_watermarks: Default::default(), + sst_to_context: Default::default(), + new_table_fragment_info: NewTableFragmentInfo::NewCompactionGroup { + table_ids: HashSet::from_iter([TEST_TABLE_ID]), + }, + change_log_delta: Default::default(), + committed_epoch: epoch0, + tables_to_commit: Default::default(), + is_visible_table_committed_epoch: true, + }) + .await + .unwrap(); let read_options = ReadOptions { table_id: TableId { @@ -1441,12 +1466,6 @@ async fn test_replicated_local_hummock_storage() { )) .await; - let epoch0 = local_hummock_storage - .read_version() - .read() - .committed() - .max_committed_epoch(); - let epoch1 = epoch0.next_epoch(); local_hummock_storage.init_for_test(epoch1).await.unwrap(); @@ -1496,13 +1515,13 @@ async fn test_replicated_local_hummock_storage() { [ Ok( ( - FullKey { UserKey { 233, TableKey { 000061616161 } }, epoch: 65536, epoch_with_gap: 65536, spill_offset: 0}, + FullKey { UserKey { 233, TableKey { 000061616161 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0}, b"1111", ), ), Ok( ( - FullKey { UserKey { 233, TableKey { 000062626262 } }, epoch: 65536, epoch_with_gap: 65536, spill_offset: 0}, + FullKey { UserKey { 233, TableKey { 000062626262 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0}, b"2222", ), ), @@ -1564,13 +1583,13 @@ async fn test_replicated_local_hummock_storage() { [ Ok( ( - FullKey { UserKey { 233, TableKey { 000063636363 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0}, + FullKey { UserKey { 233, TableKey { 000063636363 } }, epoch: 196608, epoch_with_gap: 196608, spill_offset: 0}, b"3333", ), ), Ok( ( - FullKey { UserKey { 233, TableKey { 000064646464 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0}, + FullKey { UserKey { 233, TableKey { 000064646464 } }, epoch: 196608, epoch_with_gap: 196608, spill_offset: 0}, b"4444", ), ), diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index b8761da23143e..235edc884ae5b 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -32,7 +32,6 @@ use risingwave_hummock_sdk::key::{ }; use risingwave_hummock_sdk::sstable_info::SstableInfo; use tokio::sync::oneshot::{channel, Receiver, Sender}; -use tracing::warn; use super::{HummockError, SstableStoreRef}; use crate::error::StorageResult; @@ -582,23 +581,19 @@ pub(crate) async fn wait_for_epoch( options: TryWaitEpochOptions, ) -> StorageResult<()> { let mut receiver = notifier.subscribe(); - { + let mut committed_epoch = { // avoid unnecessary check in the loop if the value does not change let committed_epoch = receiver .borrow_and_update() .version() .table_committed_epoch(options.table_id); - if let Some(committed_epoch) = committed_epoch { - if committed_epoch >= wait_epoch { - return Ok(()); - } - } else { - warn!( - table_id = options.table_id.table_id, - "table id not exist yet. wait for table creation" - ); + if let Some(committed_epoch) = committed_epoch + && committed_epoch >= wait_epoch + { + return Ok(()); } - } + committed_epoch + }; let start_time = Instant::now(); loop { match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await { @@ -614,6 +609,8 @@ pub(crate) async fn wait_for_epoch( // See #3845 for more details. tracing::warn!( epoch = wait_epoch, + ?committed_epoch, + table_id = options.table_id.table_id, elapsed = ?start_time.elapsed(), "wait_epoch timeout when waiting for version update", ); @@ -624,21 +621,16 @@ pub(crate) async fn wait_for_epoch( } Ok(Ok(_)) => { // TODO: should handle the corner case of drop table - let committed_epoch = receiver + let new_committed_epoch = receiver .borrow() .version() .table_committed_epoch(options.table_id); - if let Some(committed_epoch) = committed_epoch { - if committed_epoch >= wait_epoch { - return Ok(()); - } - } else { - warn!( - table_id = options.table_id.table_id, - elapsed = ?start_time.elapsed(), - "table id not exist yet. wait for table creation" - ); + if let Some(committed_epoch) = new_committed_epoch + && committed_epoch >= wait_epoch + { + return Ok(()); } + committed_epoch = new_committed_epoch; } } }