Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Sep 12, 2024
1 parent 56cb908 commit 0d50ccf
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 31 deletions.
40 changes: 26 additions & 14 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use risingwave_hummock_sdk::table_stats::TableStats;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
Expand Down Expand Up @@ -654,6 +654,7 @@ async fn test_state_store_sync() {
ReadOptions {
table_id: TEST_TABLE_ID,
cache_policy: CachePolicy::Fill(CacheContext::Default),
read_committed: true,
..Default::default()
},
)
Expand All @@ -673,6 +674,7 @@ async fn test_state_store_sync() {
ReadOptions {
table_id: TEST_TABLE_ID,
cache_policy: CachePolicy::Fill(CacheContext::Default),
read_committed: true,
..Default::default()
},
)
Expand Down Expand Up @@ -996,7 +998,7 @@ async fn test_multiple_epoch_sync() {
)
.await
.unwrap();
let test_get = || {
let test_get = |read_committed: bool| {
let hummock_storage_clone = &test_env.storage;
async move {
assert_eq!(
Expand All @@ -1006,6 +1008,7 @@ async fn test_multiple_epoch_sync() {
epoch1,
ReadOptions {
table_id: TEST_TABLE_ID,
read_committed,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
Expand All @@ -1021,7 +1024,7 @@ async fn test_multiple_epoch_sync() {
epoch2,
ReadOptions {
table_id: TEST_TABLE_ID,

read_committed,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
Expand All @@ -1036,6 +1039,7 @@ async fn test_multiple_epoch_sync() {
epoch3,
ReadOptions {
table_id: TEST_TABLE_ID,
read_committed,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
Expand All @@ -1047,16 +1051,23 @@ async fn test_multiple_epoch_sync() {
);
}
};
test_get().await;
test_get(false).await;

let epoch4 = epoch3.next_epoch();
test_env
.storage
.start_epoch(epoch4, HashSet::from_iter([TEST_TABLE_ID]));
hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test());
let sync_result1 = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap();
let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap();
let sync_result3 = test_env.storage.seal_and_sync_epoch(epoch3).await.unwrap();
test_get().await;
test_get(false).await;

test_env
.meta_client
.commit_epoch(epoch1, sync_result1)
.await
.unwrap();

test_env
.meta_client
Expand All @@ -1070,7 +1081,7 @@ async fn test_multiple_epoch_sync() {
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch3).await;
test_get().await;
test_get(true).await;
}

#[tokio::test]
Expand Down Expand Up @@ -1249,6 +1260,7 @@ async fn test_iter_with_min_epoch() {
table_id: TEST_TABLE_ID,
prefetch_options: PrefetchOptions::default(),
cache_policy: CachePolicy::Fill(CacheContext::Default),
read_committed: true,
..Default::default()
},
)
Expand Down Expand Up @@ -1939,6 +1951,7 @@ async fn test_get_with_min_epoch() {
ReadOptions {
table_id: TEST_TABLE_ID,
cache_policy: CachePolicy::Fill(CacheContext::Default),
read_committed: true,
..Default::default()
},
)
Expand All @@ -1955,15 +1968,14 @@ async fn test_get_with_min_epoch() {
epoch1,
ReadOptions {
table_id: TEST_TABLE_ID,

read_committed: true,
prefix_hint: Some(Bytes::from(prefix_hint.clone())),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
.await
.unwrap();

assert!(v.is_some());
}

Expand Down Expand Up @@ -2329,7 +2341,7 @@ async fn test_table_watermark() {

let (local1, local2) = test_after_epoch2(local1, local2).await;

let check_version_table_watermark = |version: PinnedVersion| {
let check_version_table_watermark = |version: PinnedVersion, epoch: HummockEpoch| {
let table_watermarks = TableWatermarksIndex::new_committed(
version
.version()
Expand All @@ -2342,11 +2354,11 @@ async fn test_table_watermark() {
assert_eq!(WatermarkDirection::Ascending, table_watermarks.direction());
assert_eq!(
gen_inner_key(watermark1),
table_watermarks.read_watermark(vnode1, epoch1).unwrap()
table_watermarks.read_watermark(vnode1, epoch).unwrap()
);
assert_eq!(
gen_inner_key(watermark1),
table_watermarks.read_watermark(vnode2, epoch1).unwrap()
table_watermarks.read_watermark(vnode2, epoch).unwrap()
);
};

Expand Down Expand Up @@ -2435,7 +2447,7 @@ async fn test_table_watermark() {

test_global_read(test_env.storage.clone(), epoch2).await;

check_version_table_watermark(test_env.storage.get_pinned_version());
check_version_table_watermark(test_env.storage.get_pinned_version(), epoch1);

let (local1, local2) = test_after_epoch2(local1, local2).await;

Expand All @@ -2444,7 +2456,7 @@ async fn test_table_watermark() {

test_global_read(test_env.storage.clone(), epoch2).await;

check_version_table_watermark(test_env.storage.get_pinned_version());
check_version_table_watermark(test_env.storage.get_pinned_version(), epoch2);

let (mut local1, mut local2) = test_after_epoch2(local1, local2).await;

Expand Down Expand Up @@ -2477,7 +2489,7 @@ async fn test_table_watermark() {
test_env.commit_epoch(epoch3).await;
test_env.storage.try_wait_epoch_for_test(epoch3).await;

check_version_table_watermark(test_env.storage.get_pinned_version());
check_version_table_watermark(test_env.storage.get_pinned_version(), epoch3);

let (_local1, _local2) = test_after_epoch2(local1, local2).await;

Expand Down
85 changes: 72 additions & 13 deletions src/storage/hummock_test/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@ use crate::local_state_store_test_utils::LocalStateStoreTestExt;
use crate::test_utils::{gen_key_from_bytes, with_hummock_storage_v2, TestIngestBatch};

macro_rules! assert_count_range_scan {
($storage:expr, $vnode:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{
(
$storage:expr,
$vnode:expr,
$range:expr,
$expect_count:expr,
$epoch:expr,
$read_committed:expr
) => {{
use std::ops::RangeBounds;

use risingwave_storage::StateStoreIter;
Expand All @@ -53,6 +60,7 @@ macro_rules! assert_count_range_scan {
ReadOptions {
prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::Fill(CacheContext::Default),
read_committed: $read_committed,
..Default::default()
},
)
Expand Down Expand Up @@ -151,7 +159,7 @@ async fn test_snapshot_inner(
.unwrap();
}
}
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 2, epoch1);
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 2, epoch1, false);

local
.ingest_batch(
Expand Down Expand Up @@ -192,8 +200,15 @@ async fn test_snapshot_inner(
.unwrap();
}
}
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 3, epoch2);
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 2, epoch1);
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 3, epoch2, false);
assert_count_range_scan!(
hummock_storage,
VirtualNode::ZERO,
..,
2,
epoch1,
enable_commit
);

local
.ingest_batch(
Expand Down Expand Up @@ -232,9 +247,30 @@ async fn test_snapshot_inner(
.unwrap();
}
}
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 0, epoch3);
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 3, epoch2);
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 2, epoch1);
assert_count_range_scan!(
hummock_storage,
VirtualNode::ZERO,
..,
0,
epoch3,
enable_commit
);
assert_count_range_scan!(
hummock_storage,
VirtualNode::ZERO,
..,
3,
epoch2,
enable_commit
);
assert_count_range_scan!(
hummock_storage,
VirtualNode::ZERO,
..,
2,
epoch1,
enable_commit
);
}

async fn test_snapshot_range_scan_inner(
Expand Down Expand Up @@ -302,19 +338,42 @@ async fn test_snapshot_range_scan_inner(
VirtualNode::ZERO,
key!(2)..=key!(3),
2,
epoch
epoch,
false
);
assert_count_range_scan!(
hummock_storage,
VirtualNode::ZERO,
key!(2)..key!(3),
1,
epoch
epoch,
false
);
assert_count_range_scan!(
hummock_storage,
VirtualNode::ZERO,
key!(2)..,
3,
epoch,
false
);
assert_count_range_scan!(
hummock_storage,
VirtualNode::ZERO,
..=key!(3),
3,
epoch,
false
);
assert_count_range_scan!(
hummock_storage,
VirtualNode::ZERO,
..key!(3),
2,
epoch,
false
);
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, key!(2).., 3, epoch);
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, ..=key!(3), 3, epoch);
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, ..key!(3), 2, epoch);
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 4, epoch);
assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 4, epoch, false);
}

#[tokio::test]
Expand Down
17 changes: 13 additions & 4 deletions src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ async fn test_multiple_epoch_sync_v2() {
.await
.unwrap();
local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test());
let test_get = || {
let test_get = |read_committed: bool| {
let hummock_storage_clone = &hummock_storage;
async move {
assert_eq!(
Expand All @@ -1178,6 +1178,7 @@ async fn test_multiple_epoch_sync_v2() {
gen_key_from_str(VirtualNode::ZERO, "bb"),
epoch1,
ReadOptions {
read_committed,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
}
Expand All @@ -1192,6 +1193,7 @@ async fn test_multiple_epoch_sync_v2() {
gen_key_from_str(VirtualNode::ZERO, "bb"),
epoch2,
ReadOptions {
read_committed,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
}
Expand All @@ -1205,6 +1207,7 @@ async fn test_multiple_epoch_sync_v2() {
gen_key_from_str(VirtualNode::ZERO, "bb"),
epoch3,
ReadOptions {
read_committed,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
}
Expand All @@ -1216,10 +1219,16 @@ async fn test_multiple_epoch_sync_v2() {
);
}
};
test_get().await;
test_get(false).await;
let sync_result1 = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap();
let sync_result2 = hummock_storage.seal_and_sync_epoch(epoch2).await.unwrap();
let sync_result3 = hummock_storage.seal_and_sync_epoch(epoch3).await.unwrap();
test_get().await;
test_get(false).await;

meta_client
.commit_epoch(epoch1, sync_result1)
.await
.unwrap();

meta_client
.commit_epoch(epoch2, sync_result2)
Expand All @@ -1234,7 +1243,7 @@ async fn test_multiple_epoch_sync_v2() {
.try_wait_epoch(HummockReadEpoch::Committed(epoch3))
.await
.unwrap();
test_get().await;
test_get(true).await;
}

#[tokio::test]
Expand Down

0 comments on commit 0d50ccf

Please sign in to comment.