Skip to content

Commit

Permalink
fix all UTs
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Mar 1, 2024
1 parent 85e0048 commit e832693
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 20 deletions.
5 changes: 5 additions & 0 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use std::sync::Arc;
use bytes::Bytes;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{key_with_epoch, map_table_key_range};
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_meta::hummock::test_utils::setup_compute_env;
Expand Down Expand Up @@ -265,6 +267,9 @@ async fn test_read_filter_basic() {
TableId::from(table_id),
pinned_version,
)));
read_version
.write()
.update_vnode_bitmap(Arc::new(Bitmap::ones(VirtualNode::COUNT)));

{
// single imm
Expand Down
14 changes: 5 additions & 9 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,7 @@ async fn test_table_watermark() {
local1.init_for_test(epoch1).await.unwrap();
local1.update_vnode_bitmap(vnode_bitmap1.clone());
local2.init_for_test(epoch1).await.unwrap();
local1.update_vnode_bitmap(vnode_bitmap2.clone());
local2.update_vnode_bitmap(vnode_bitmap2.clone());

fn gen_inner_key(index: usize) -> Bytes {
Bytes::copy_from_slice(format!("key_{:05}", index).as_bytes())
Expand Down Expand Up @@ -1934,10 +1934,7 @@ async fn test_table_watermark() {
SealCurrentEpochOptions {
table_watermarks: Some((
WatermarkDirection::Ascending,
vec![VnodeWatermark::new(
vnode_bitmap,
gen_inner_key(watermark1),
)],
vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(watermark1))],
)),
switch_op_consistency_level: None,
},
Expand Down Expand Up @@ -2140,6 +2137,8 @@ async fn test_table_watermark() {
test_env.commit_epoch(epoch1).await;
test_env.storage.try_wait_epoch_for_test(epoch1).await;

let (local1, local2) = test_after_epoch2(local1, local2).await;

let test_global_read = |storage: HummockStorage, epoch: u64| async move {
// inner vnode read
for vnode in [vnode1, vnode2] {
Expand Down Expand Up @@ -2243,10 +2242,7 @@ async fn test_table_watermark() {
SealCurrentEpochOptions {
table_watermarks: Some((
WatermarkDirection::Ascending,
vec![VnodeWatermark::new(
vnode_bitmap,
gen_inner_key(5),
)],
vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(5))],
)),
switch_op_consistency_level: None,
},
Expand Down
7 changes: 3 additions & 4 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ pub fn get_committed_read_version_tuple(
mut key_range: TableKeyRange,
epoch: HummockEpoch,
) -> (TableKeyRange, ReadVersionTuple) {
version
.table_watermark_index()
.get(&table_id)
.map(|index| index.rewrite_range_with_table_watermark(epoch, &mut key_range));
if let Some(index) = version.table_watermark_index().get(&table_id) {
index.rewrite_range_with_table_watermark(epoch, &mut key_range)
}
(key_range, (vec![], vec![], version))
}

Expand Down
7 changes: 3 additions & 4 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,10 +617,9 @@ pub fn read_filter_for_version(

let committed_version = read_version_guard.committed().clone();

read_version_guard
.table_watermarks
.as_ref()
.map(|watermark| watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range));
if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
}

let (imm_iter, sst_iter) =
read_version_guard
Expand Down
10 changes: 7 additions & 3 deletions src/stream/src/common/log_store_impl/kv_log_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,8 +1375,10 @@ mod tests {
let chunk_ids = check_reader(&mut reader, [(epoch3, None)].iter()).await;
assert_eq!(0, chunk_ids.len());

// Recovery happens. Test rewind while consuming persisted log. No new data written

// Recovery happens. Test rewind while consuming persisted log. No new data written.
// Writer must be dropped first to ensure vnode assignment is exclusive.
drop(reader);
drop(writer);
let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
Expand Down Expand Up @@ -1432,7 +1434,9 @@ mod tests {
assert_eq!(1, chunk_ids.len());

// Recovery happens again. Test rewind with some new data written and flushed.

// Writer must be dropped first to ensure vnode assignment is exclusive.
drop(reader);
drop(writer);
let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
Expand Down

0 comments on commit e832693

Please sign in to comment.