Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 20, 2024
1 parent f5f78d4 commit 902c065
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 34 deletions.
41 changes: 30 additions & 11 deletions src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use risingwave_hummock_sdk::{
HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult,
};
use risingwave_meta::hummock::test_utils::setup_compute_env;
use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::iterator::change_log::test_utils::{
apply_test_log_data, gen_test_data,
Expand Down Expand Up @@ -1420,7 +1421,31 @@ async fn test_gc_watermark_and_clear_shared_buffer() {
async fn test_replicated_local_hummock_storage() {
const TEST_TABLE_ID: TableId = TableId { table_id: 233 };

let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await;
let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await;

let epoch0 = meta_client
.hummock_manager_ref()
.on_current_version(|version| version.visible_table_committed_epoch())
.await;

let epoch0 = epoch0.next_epoch();

meta_client
.hummock_manager_ref()
.commit_epoch(CommitEpochInfo {
sstables: vec![],
new_table_watermarks: Default::default(),
sst_to_context: Default::default(),
new_table_fragment_info: NewTableFragmentInfo::NewCompactionGroup {
table_ids: HashSet::from_iter([TEST_TABLE_ID]),
},
change_log_delta: Default::default(),
committed_epoch: epoch0,
tables_to_commit: Default::default(),
is_visible_table_committed_epoch: true,
})
.await
.unwrap();

let read_options = ReadOptions {
table_id: TableId {
Expand All @@ -1441,12 +1466,6 @@ async fn test_replicated_local_hummock_storage() {
))
.await;

let epoch0 = local_hummock_storage
.read_version()
.read()
.committed()
.max_committed_epoch();

let epoch1 = epoch0.next_epoch();

local_hummock_storage.init_for_test(epoch1).await.unwrap();
Expand Down Expand Up @@ -1496,13 +1515,13 @@ async fn test_replicated_local_hummock_storage() {
[
Ok(
(
FullKey { UserKey { 233, TableKey { 000061616161 } }, epoch: 65536, epoch_with_gap: 65536, spill_offset: 0},
FullKey { UserKey { 233, TableKey { 000061616161 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0},
b"1111",
),
),
Ok(
(
FullKey { UserKey { 233, TableKey { 000062626262 } }, epoch: 65536, epoch_with_gap: 65536, spill_offset: 0},
FullKey { UserKey { 233, TableKey { 000062626262 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0},
b"2222",
),
),
Expand Down Expand Up @@ -1564,13 +1583,13 @@ async fn test_replicated_local_hummock_storage() {
[
Ok(
(
FullKey { UserKey { 233, TableKey { 000063636363 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0},
FullKey { UserKey { 233, TableKey { 000063636363 } }, epoch: 196608, epoch_with_gap: 196608, spill_offset: 0},
b"3333",
),
),
Ok(
(
FullKey { UserKey { 233, TableKey { 000064646464 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0},
FullKey { UserKey { 233, TableKey { 000064646464 } }, epoch: 196608, epoch_with_gap: 196608, spill_offset: 0},
b"4444",
),
),
Expand Down
38 changes: 15 additions & 23 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use risingwave_hummock_sdk::key::{
};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tracing::warn;

use super::{HummockError, SstableStoreRef};
use crate::error::StorageResult;
Expand Down Expand Up @@ -582,23 +581,19 @@ pub(crate) async fn wait_for_epoch(
options: TryWaitEpochOptions,
) -> StorageResult<()> {
let mut receiver = notifier.subscribe();
{
let mut committed_epoch = {
// avoid unnecessary check in the loop if the value does not change
let committed_epoch = receiver
.borrow_and_update()
.version()
.table_committed_epoch(options.table_id);
if let Some(committed_epoch) = committed_epoch {
if committed_epoch >= wait_epoch {
return Ok(());
}
} else {
warn!(
table_id = options.table_id.table_id,
"table id not exist yet. wait for table creation"
);
if let Some(committed_epoch) = committed_epoch
&& committed_epoch >= wait_epoch
{
return Ok(());
}
}
committed_epoch
};
let start_time = Instant::now();
loop {
match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
Expand All @@ -614,6 +609,8 @@ pub(crate) async fn wait_for_epoch(
// See #3845 for more details.
tracing::warn!(
epoch = wait_epoch,
?committed_epoch,
table_id = options.table_id.table_id,
elapsed = ?start_time.elapsed(),
"wait_epoch timeout when waiting for version update",
);
Expand All @@ -624,21 +621,16 @@ pub(crate) async fn wait_for_epoch(
}
Ok(Ok(_)) => {
// TODO: should handle the corner case of drop table
let committed_epoch = receiver
let new_committed_epoch = receiver
.borrow()
.version()
.table_committed_epoch(options.table_id);
if let Some(committed_epoch) = committed_epoch {
if committed_epoch >= wait_epoch {
return Ok(());
}
} else {
warn!(
table_id = options.table_id.table_id,
elapsed = ?start_time.elapsed(),
"table id not exist yet. wait for table creation"
);
if let Some(committed_epoch) = new_committed_epoch
&& committed_epoch >= wait_epoch
{
return Ok(());
}
committed_epoch = new_committed_epoch;
}
}
}
Expand Down

0 comments on commit 902c065

Please sign in to comment.