diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 81f16490bae47..52cb35388cf0b 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -18,7 +18,9 @@ use std::sync::Arc; use bytes::Bytes; use itertools::Itertools; use parking_lot::RwLock; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; +use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{key_with_epoch, map_table_key_range}; use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_meta::hummock::test_utils::setup_compute_env; @@ -265,6 +267,9 @@ async fn test_read_filter_basic() { TableId::from(table_id), pinned_version, ))); + read_version + .write() + .update_vnode_bitmap(Arc::new(Bitmap::ones(VirtualNode::COUNT))); { // single imm diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index c8f55a0f62641..04ff5c456557e 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -1840,7 +1840,7 @@ async fn test_table_watermark() { local1.init_for_test(epoch1).await.unwrap(); local1.update_vnode_bitmap(vnode_bitmap1.clone()); local2.init_for_test(epoch1).await.unwrap(); - local1.update_vnode_bitmap(vnode_bitmap2.clone()); + local2.update_vnode_bitmap(vnode_bitmap2.clone()); fn gen_inner_key(index: usize) -> Bytes { Bytes::copy_from_slice(format!("key_{:05}", index).as_bytes()) @@ -1934,10 +1934,7 @@ async fn test_table_watermark() { SealCurrentEpochOptions { table_watermarks: Some(( WatermarkDirection::Ascending, - vec![VnodeWatermark::new( - vnode_bitmap, - gen_inner_key(watermark1), - )], + vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(watermark1))], )), switch_op_consistency_level: None, }, @@ -2140,6 +2137,8 @@ async fn test_table_watermark() { test_env.commit_epoch(epoch1).await; test_env.storage.try_wait_epoch_for_test(epoch1).await; + let (local1, local2) = test_after_epoch2(local1, local2).await; + let test_global_read = |storage: HummockStorage, epoch: u64| async move { // inner vnode read for vnode in [vnode1, vnode2] { @@ -2243,10 +2242,7 @@ async fn test_table_watermark() { SealCurrentEpochOptions { table_watermarks: Some(( WatermarkDirection::Ascending, - vec![VnodeWatermark::new( - vnode_bitmap, - gen_inner_key(5), - )], + vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(5))], )), switch_op_consistency_level: None, }, diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index dafde3122e9a7..7c6a28582b30c 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -118,10 +118,9 @@ pub fn get_committed_read_version_tuple( mut key_range: TableKeyRange, epoch: HummockEpoch, ) -> (TableKeyRange, ReadVersionTuple) { - version - .table_watermark_index() - .get(&table_id) - .map(|index| index.rewrite_range_with_table_watermark(epoch, &mut key_range)); + if let Some(index) = version.table_watermark_index().get(&table_id) { + index.rewrite_range_with_table_watermark(epoch, &mut key_range) + } (key_range, (vec![], vec![], version)) } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 4c6ce94d17606..a089bade1d7ad 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -617,10 +617,9 @@ pub fn read_filter_for_version( let committed_version = read_version_guard.committed().clone(); - read_version_guard - .table_watermarks - .as_ref() - .map(|watermark| watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)); + if let Some(watermark) = read_version_guard.table_watermarks.as_ref() { + watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range) + } let (imm_iter, sst_iter) = read_version_guard diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 9b77a9a7bf096..4accff0f6fe4e 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -1375,8 +1375,10 @@ mod tests { let chunk_ids = check_reader(&mut reader, [(epoch3, None)].iter()).await; assert_eq!(0, chunk_ids.len()); - // Recovery happens. Test rewind while consuming persisted log. No new data written - + // Recovery happens. Test rewind while consuming persisted log. No new data written. + // Writer must be dropped first to ensure vnode assignment is exclusive. + drop(reader); + drop(writer); let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), @@ -1432,7 +1434,9 @@ mod tests { assert_eq!(1, chunk_ids.len()); // Recovery happens again. Test rewind with some new data written and flushed. - + // Writer must be dropped first to ensure vnode assignment is exclusive. + drop(reader); + drop(writer); let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(),