diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index 0a6ea9d99378..088fa0af7894 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -78,6 +78,13 @@ pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) { (left, right) } +// Ensure there is only one vnode involved in table key range and return the vnode +pub fn vnode(range: &TableKeyRange) -> VirtualNode { + let (l, r_exclusive) = vnode_range(range); + assert_eq!(r_exclusive - l, 1); + VirtualNode::from_index(l) +} + /// Converts user key to full key by appending `epoch` to the user key. pub fn key_with_epoch(mut user_key: Vec, epoch: HummockEpoch) -> Vec { let res = epoch.to_be(); diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 44bde5882310..aa835f3a0906 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -28,7 +28,7 @@ use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; use risingwave_pb::hummock::{PbTableWatermarks, PbVnodeWatermark}; use tracing::{debug, warn}; -use crate::key::{prefix_slice_with_vnode, vnode_range, TableKey, TableKeyRange}; +use crate::key::{prefix_slice_with_vnode, vnode, TableKey, TableKeyRange}; use crate::HummockEpoch; #[derive(Clone)] @@ -102,79 +102,54 @@ impl TableWatermarksIndex { self.read_watermark(vnode, HummockEpoch::MAX) } - pub fn range_watermarks( + pub fn rewrite_range_with_table_watermark( &self, epoch: HummockEpoch, key_range: &mut TableKeyRange, - ) -> Option { - let mut ret = BTreeMap::new(); - let (left, right) = vnode_range(key_range); - if right - left == 1 { - // the table key range falls in a single vnode. No table watermark will be returned, and instead the key range - // will be modified. - let vnode = VirtualNode::from_index(left); - if let Some(watermark) = self.read_watermark(vnode, epoch) { - match self.watermark_direction { - WatermarkDirection::Ascending => { - let overwrite_start_key = match &key_range.0 { - Included(start_key) | Excluded(start_key) => { - start_key.key_part() < watermark - } - Unbounded => true, + ) { + let vnode = vnode(key_range); + if let Some(watermark) = self.read_watermark(vnode, epoch) { + match self.watermark_direction { + WatermarkDirection::Ascending => { + let overwrite_start_key = match &key_range.0 { + Included(start_key) | Excluded(start_key) => { + start_key.key_part() < watermark + } + Unbounded => true, + }; + if overwrite_start_key { + let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark)); + let fully_filtered = match &key_range.1 { + Included(end_key) => end_key < &watermark_key, + Excluded(end_key) => end_key <= &watermark_key, + Unbounded => false, }; - if overwrite_start_key { - let watermark_key = - TableKey(prefix_slice_with_vnode(vnode, &watermark)); - let fully_filtered = match &key_range.1 { - Included(end_key) => end_key < &watermark_key, - Excluded(end_key) => end_key <= &watermark_key, - Unbounded => false, - }; - if fully_filtered { - key_range.1 = Excluded(watermark_key.clone()); - } - key_range.0 = Included(watermark_key); + if fully_filtered { + key_range.1 = Excluded(watermark_key.clone()); } + key_range.0 = Included(watermark_key); } - WatermarkDirection::Descending => { - let overwrite_end_key = match &key_range.1 { - Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark, - Unbounded => true, + } + WatermarkDirection::Descending => { + let overwrite_end_key = match &key_range.1 { + Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark, + Unbounded => true, + }; + if overwrite_end_key { + let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark)); + let fully_filtered = match &key_range.0 { + Included(start_key) => start_key > &watermark_key, + Excluded(start_key) => start_key >= &watermark_key, + Unbounded => false, }; - if overwrite_end_key { - let watermark_key = - TableKey(prefix_slice_with_vnode(vnode, &watermark)); - let fully_filtered = match &key_range.0 { - Included(start_key) => start_key > &watermark_key, - Excluded(start_key) => start_key >= &watermark_key, - Unbounded => false, - }; - if fully_filtered { - *key_range = - (Included(watermark_key.clone()), Excluded(watermark_key)); - } else { - key_range.1 = Included(watermark_key); - } + if fully_filtered { + *key_range = (Included(watermark_key.clone()), Excluded(watermark_key)); + } else { + key_range.1 = Included(watermark_key); } } } } - None - } else { - for i in left..right { - let vnode = VirtualNode::from_index(i); - if let Some(watermark) = self.read_watermark(vnode, epoch) { - assert!(ret.insert(vnode, watermark).is_none()); - } - } - if ret.is_empty() { - None - } else { - Some(ReadTableWatermark { - direction: self.direction(), - vnode_watermarks: ret, - }) - } } } @@ -606,10 +581,7 @@ mod tests { use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; - use crate::key::{ - is_empty_key_range, map_table_key_range, prefix_slice_with_vnode, - prefixed_range_with_vnode, TableKeyRange, - }; + use crate::key::{is_empty_key_range, prefixed_range_with_vnode, TableKeyRange}; use crate::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, @@ -969,42 +941,12 @@ mod tests { Some(watermark2.clone()) ); - // test read from multiple vnodes - { - let range = map_table_key_range(( - Included(prefix_slice_with_vnode( - VirtualNode::from_index(1), - b"begin", - )), - Excluded(prefix_slice_with_vnode(VirtualNode::from_index(2), b"end")), - )); - let mut range_mut = range.clone(); - let read_watermarks = index.range_watermarks(EPOCH2, &mut range_mut).unwrap(); - assert_eq!(range_mut, range); - assert_eq!(direction, read_watermarks.direction); - assert_eq!(2, read_watermarks.vnode_watermarks.len()); - assert_eq!( - &watermark2, - read_watermarks - .vnode_watermarks - .get(&VirtualNode::from_index(1)) - .unwrap() - ); - assert_eq!( - &watermark2, - read_watermarks - .vnode_watermarks - .get(&VirtualNode::from_index(2)) - .unwrap() - ); - } - // watermark is watermark2 let check_watermark_range = |query_range: (Bound, Bound), output_range: Option<(Bound, Bound)>| { let mut range = build_watermark_range(direction, query_range); - assert!(index.range_watermarks(EPOCH2, &mut range).is_none()); + index.rewrite_range_with_table_watermark(EPOCH2, &mut range); if let Some(output_range) = output_range { assert_eq!(range, build_watermark_range(direction, output_range)); } else { diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 390d0c5991f0..8229ec2f84aa 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -33,7 +33,8 @@ pub(crate) mod tests { use risingwave_hummock_sdk::can_concat; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{ - next_key, prefix_slice_with_vnode, FullKey, TableKey, TABLE_PREFIX_LEN, + next_key, prefix_slice_with_vnode, prefixed_range_with_vnode, FullKey, TableKey, + TABLE_PREFIX_LEN, }; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; @@ -156,7 +157,9 @@ pub(crate) mod tests { value_size: usize, epochs: Vec, ) { - let mut local = storage.new_local(Default::default()).await; + let mut local = storage + .new_local(NewLocalOptions::for_test(TableId::default())) + .await; // 1. add sstables let val = b"0"[..].repeat(value_size); local.init_for_test(epochs[0]).await.unwrap(); @@ -730,6 +733,8 @@ pub(crate) mod tests { StaticCompactionGroupId::StateDefault.into(), ) .await; + + let vnode = VirtualNode::from_index(1); for index in 0..kv_count { epoch += 1; let next_epoch = epoch + 1; @@ -746,7 +751,7 @@ pub(crate) mod tests { let mut prefix = BytesMut::default(); let random_key = rand::thread_rng().gen::<[u8; 32]>(); - prefix.put_u16(1); + prefix.extend_from_slice(&vnode.to_be_bytes()); prefix.put_slice(random_key.as_slice()); storage @@ -852,7 +857,10 @@ pub(crate) mod tests { // 7. scan kv to check key table_id let scan_result = global_storage .scan( - (Bound::Unbounded, Bound::Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + vnode, + ), epoch, None, ReadOptions { @@ -922,6 +930,7 @@ pub(crate) mod tests { let base_epoch = Epoch::now(); let mut epoch: u64 = base_epoch.0; let millisec_interval_epoch: u64 = (1 << 16) * 100; + let vnode = VirtualNode::from_index(1); let mut epoch_set = BTreeSet::new(); let mut local = storage @@ -936,7 +945,7 @@ pub(crate) mod tests { epoch_set.insert(epoch); let mut prefix = BytesMut::default(); let random_key = rand::thread_rng().gen::<[u8; 32]>(); - prefix.put_u16(1); + prefix.extend_from_slice(&vnode.to_be_bytes()); prefix.put_slice(random_key.as_slice()); local @@ -1047,7 +1056,10 @@ pub(crate) mod tests { // 6. scan kv to check key table_id let scan_result = storage .scan( - (Bound::Unbounded, Bound::Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + vnode, + ), epoch, None, ReadOptions { diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index 2b2f3da3f15a..8112e99358d8 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -61,7 +61,9 @@ async fn test_failpoints_state_store_read_upload() { .await .unwrap(); - let mut local = hummock_storage.new_local(NewLocalOptions::default()).await; + let mut local = hummock_storage + .new_local(NewLocalOptions::for_test(TableId::default())) + .await; let anchor = gen_key_from_str(VirtualNode::ZERO, "aa"); let mut batch1 = vec![ diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index bf841b5e4956..ed4f68a289e6 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -18,9 +18,11 @@ use std::sync::Arc; use bytes::Bytes; use itertools::Itertools; use parking_lot::RwLock; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; +use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{key_with_epoch, map_table_key_range}; -use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_meta::hummock::test_utils::setup_compute_env; use risingwave_pb::hummock::{KeyRange, SstableInfo}; use risingwave_storage::hummock::iterator::test_utils::{ @@ -28,10 +30,9 @@ use risingwave_storage::hummock::iterator::test_utils::{ }; use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; use risingwave_storage::hummock::store::version::{ - read_filter_for_batch, read_filter_for_local, HummockReadVersion, StagingData, - StagingSstableInfo, VersionUpdate, + read_filter_for_version, HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate, }; -use risingwave_storage::hummock::test_utils::{gen_dummy_batch, gen_dummy_sst_info}; +use risingwave_storage::hummock::test_utils::gen_dummy_batch; use crate::test_utils::prepare_first_valid_version; @@ -45,7 +46,8 @@ async fn test_read_version_basic() { let mut epoch = 1; let table_id = 0; - let mut read_version = HummockReadVersion::new(TableId::from(table_id), pinned_version); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); + let mut read_version = HummockReadVersion::new(TableId::from(table_id), pinned_version, vnodes); { // single imm @@ -262,10 +264,13 @@ async fn test_read_filter_basic() { let epoch = 1; let table_id = 0; + let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); let read_version = Arc::new(RwLock::new(HummockReadVersion::new( TableId::from(table_id), pinned_version, + vnodes.clone(), ))); + read_version.write().update_vnode_bitmap(vnodes); { // single imm @@ -306,11 +311,11 @@ async fn test_read_filter_basic() { assert_eq!(0, staging_sst.len()); assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch)); - // build for local + // test read_filter_for_version { let key_range = key_range.clone(); let (_, hummock_read_snapshot) = - read_filter_for_local(epoch, TableId::from(table_id), key_range, &read_version) + read_filter_for_version(epoch, TableId::from(table_id), key_range, &read_version) .unwrap(); assert_eq!(1, hummock_read_snapshot.0.len()); @@ -320,84 +325,71 @@ async fn test_read_filter_basic() { hummock_read_snapshot.2.max_committed_epoch() ); } - - // build for batch - { - let key_range = key_range.clone(); - let read_version_vec = vec![read_version]; - - let (_, hummock_read_snapshot) = - read_filter_for_batch(epoch, TableId::from(table_id), key_range, read_version_vec) - .unwrap(); - - assert_eq!(1, hummock_read_snapshot.0.len()); - assert_eq!(0, hummock_read_snapshot.1.len()); - } } } -#[tokio::test] -async fn test_read_filter_for_batch_issue_14659() { - use std::ops::Bound::Unbounded; - - let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = - setup_compute_env(8080).await; - - let (pinned_version, _, _) = - prepare_first_valid_version(env, hummock_manager_ref, worker_node).await; - - const NUM_SHARDS: u64 = 2; - let table_id = TableId::from(2); - let epoch = 1; - let mut read_version_vec = vec![]; - let mut imms = vec![]; - - // Populate IMMs - for i in 0..NUM_SHARDS { - let read_version = Arc::new(RwLock::new(HummockReadVersion::new( - table_id, - pinned_version.clone(), - ))); - - let items = SharedBufferBatch::build_shared_buffer_item_batches(gen_dummy_batch(i)); - let size = SharedBufferBatch::measure_batch_size(&items); - let imm = - SharedBufferBatch::build_shared_buffer_batch_for_test(epoch, 0, items, size, table_id); - - imms.push(imm.clone()); - - read_version - .write() - .update(VersionUpdate::Staging(StagingData::ImmMem(imm))); - - read_version_vec.push(read_version); - } - - // Update read version via staging SSTs - let sst_id = 233; - let staging_sst = gen_dummy_sst_info(sst_id, imms.clone(), table_id, epoch); - read_version_vec.iter().for_each(|v| { - v.write().update(VersionUpdate::Staging(StagingData::Sst( - StagingSstableInfo::new( - vec![LocalSstableInfo::for_test(staging_sst.clone())], - vec![epoch], - imms.iter().map(|imm| imm.batch_id()).collect_vec(), - imms.iter().map(|imm| imm.size()).sum(), - ), - ))); - }); - - // build for batch with max epoch - let (_, hummock_read_snapshot) = read_filter_for_batch( - HummockEpoch::MAX, - table_id, - (Unbounded, Unbounded), - read_version_vec, - ) - .unwrap(); - - // No imms should be proivided - assert_eq!(0, hummock_read_snapshot.0.len()); - // Only 1 staging sst is provided - assert_eq!(1, hummock_read_snapshot.1.len()); -} +// #[tokio::test] +// async fn test_read_filter_for_batch_issue_14659() { +// use std::ops::Bound::Unbounded; + +// let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = +// setup_compute_env(8080).await; + +// let (pinned_version, _, _) = +// prepare_first_valid_version(env, hummock_manager_ref, worker_node).await; + +// const NUM_SHARDS: u64 = 2; +// let table_id = TableId::from(2); +// let epoch = 1; +// let mut read_version_vec = vec![]; +// let mut imms = vec![]; + +// // Populate IMMs +// for i in 0..NUM_SHARDS { +// let read_version = Arc::new(RwLock::new(HummockReadVersion::new( +// table_id, +// pinned_version.clone(), +// ))); + +// let items = SharedBufferBatch::build_shared_buffer_item_batches(gen_dummy_batch(i)); +// let size = SharedBufferBatch::measure_batch_size(&items); +// let imm = +// SharedBufferBatch::build_shared_buffer_batch_for_test(epoch, 0, items, size, table_id); + +// imms.push(imm.clone()); + +// read_version +// .write() +// .update(VersionUpdate::Staging(StagingData::ImmMem(imm))); + +// read_version_vec.push(read_version); +// } + +// // Update read version via staging SSTs +// let sst_id = 233; +// let staging_sst = gen_dummy_sst_info(sst_id, imms.clone(), table_id, epoch); +// read_version_vec.iter().for_each(|v| { +// v.write().update(VersionUpdate::Staging(StagingData::Sst( +// StagingSstableInfo::new( +// vec![LocalSstableInfo::for_test(staging_sst.clone())], +// vec![epoch], +// imms.iter().map(|imm| imm.batch_id()).collect_vec(), +// imms.iter().map(|imm| imm.size()).sum(), +// ), +// ))); +// }); + +// // build for batch with max epoch +// let (_, hummock_read_snapshot) = read_filter_for_batch( +// HummockEpoch::MAX, +// table_id, +// (Unbounded, Unbounded), +// read_version_vec, +// ) +// .unwrap(); + +// // No imms should be proivided +// assert_eq!(0, hummock_read_snapshot.0.len()); +// // Only 1 staging sst is provided +// assert_eq!(1, hummock_read_snapshot.1.len()); +// } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index b60481f69921..04ff5c456557 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -11,26 +11,25 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Bound::{Excluded, Included, Unbounded}; +use std::ops::Bound::{self, Excluded, Included, Unbounded}; use std::ops::Range; use std::sync::Arc; use bytes::{BufMut, Bytes}; use futures::TryStreamExt; use itertools::Itertools; -use parking_lot::RwLock; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::range::RangeBoundsExt; use risingwave_hummock_sdk::key::{ - gen_key_from_bytes, prefix_slice_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, + gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, }; use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; -use risingwave_storage::hummock::store::version::{read_filter_for_batch, read_filter_for_local}; +use risingwave_storage::hummock::store::version::read_filter_for_version; use risingwave_storage::hummock::{CachePolicy, HummockStorage, LocalHummockStorage}; use risingwave_storage::storage_value::StorageValue; use risingwave_storage::store::*; @@ -984,7 +983,10 @@ async fn test_iter_with_min_epoch() { let iter = test_env .storage .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch1, ReadOptions { table_id: TEST_TABLE_ID, @@ -1006,7 +1008,10 @@ async fn test_iter_with_min_epoch() { let iter = test_env .storage .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch2, ReadOptions { table_id: TEST_TABLE_ID, @@ -1026,7 +1031,10 @@ async fn test_iter_with_min_epoch() { let iter = test_env .storage .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch2, ReadOptions { table_id: TEST_TABLE_ID, @@ -1067,7 +1075,10 @@ async fn test_iter_with_min_epoch() { let iter = test_env .storage .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch1, ReadOptions { table_id: TEST_TABLE_ID, @@ -1089,7 +1100,10 @@ async fn test_iter_with_min_epoch() { let iter = test_env .storage .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch2, ReadOptions { table_id: TEST_TABLE_ID, @@ -1111,7 +1125,10 @@ async fn test_iter_with_min_epoch() { let iter = test_env .storage .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch2, ReadOptions { table_id: TEST_TABLE_ID, @@ -1207,17 +1224,23 @@ async fn test_hummock_version_reader() { { // test before sync { - let (_, read_snapshot) = read_filter_for_local( + let (_, read_snapshot) = read_filter_for_version( epoch1, TEST_TABLE_ID, - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), &hummock_storage.read_version(), ) .unwrap(); let iter = hummock_version_reader .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch1, ReadOptions { table_id: TEST_TABLE_ID, @@ -1235,17 +1258,23 @@ async fn test_hummock_version_reader() { } { - let (_, read_snapshot) = read_filter_for_local( + let (_, read_snapshot) = read_filter_for_version( epoch2, TEST_TABLE_ID, - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), &hummock_storage.read_version(), ) .unwrap(); let iter = hummock_version_reader .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch2, ReadOptions { table_id: TEST_TABLE_ID, @@ -1263,17 +1292,23 @@ async fn test_hummock_version_reader() { } { - let (_, read_snapshot) = read_filter_for_local( + let (_, read_snapshot) = read_filter_for_version( epoch2, TEST_TABLE_ID, - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), &hummock_storage.read_version(), ) .unwrap(); let iter = hummock_version_reader .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch2, ReadOptions { table_id: TEST_TABLE_ID, @@ -1293,9 +1328,6 @@ async fn test_hummock_version_reader() { } { - let basic_read_version = - Arc::new(RwLock::new(hummock_storage.read_version().read().clone())); - let sync_result1 = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); test_env .meta_client @@ -1311,8 +1343,6 @@ async fn test_hummock_version_reader() { .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch2).await; - let read_version_2 = - Arc::new(RwLock::new(hummock_storage.read_version().read().clone())); let sync_result3 = test_env.storage.seal_and_sync_epoch(epoch3).await.unwrap(); test_env @@ -1321,25 +1351,24 @@ async fn test_hummock_version_reader() { .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch3).await; - let read_version_3 = - Arc::new(RwLock::new(hummock_storage.read_version().read().clone())); - { - let (_, read_snapshot) = read_filter_for_batch( + let (_, read_snapshot) = read_filter_for_version( epoch1, TEST_TABLE_ID, - (Unbounded, Unbounded), - vec![ - basic_read_version.clone(), - read_version_2.clone(), - read_version_3.clone(), - ], + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), + &hummock_storage.read_version(), ) .unwrap(); let iter = hummock_version_reader .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch1, ReadOptions { table_id: TEST_TABLE_ID, @@ -1357,26 +1386,32 @@ async fn test_hummock_version_reader() { } { - let (_, read_snapshot) = read_filter_for_batch( + let (_, read_snapshot) = read_filter_for_version( epoch2, TEST_TABLE_ID, - (Unbounded, Unbounded), - vec![ - basic_read_version.clone(), - read_version_2.clone(), - read_version_3.clone(), - ], + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), + &hummock_storage.read_version(), ) .unwrap(); assert_eq!( - read_version_3.read().committed().max_committed_epoch(), + hummock_storage + .read_version() + .read() + .committed() + .max_committed_epoch(), read_snapshot.2.max_committed_epoch() ); let iter = hummock_version_reader .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch2, ReadOptions { table_id: TEST_TABLE_ID, @@ -1394,21 +1429,23 @@ async fn test_hummock_version_reader() { } { - let (_, read_snapshot) = read_filter_for_batch( + let (_, read_snapshot) = read_filter_for_version( epoch2, TEST_TABLE_ID, - (Unbounded, Unbounded), - vec![ - basic_read_version.clone(), - read_version_2.clone(), - read_version_3.clone(), - ], + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), + &hummock_storage.read_version(), ) .unwrap(); let iter = hummock_version_reader .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch2, ReadOptions { table_id: TEST_TABLE_ID, @@ -1427,21 +1464,23 @@ async fn test_hummock_version_reader() { } { - let (_, read_snapshot) = read_filter_for_batch( + let (_, read_snapshot) = read_filter_for_version( epoch2, TEST_TABLE_ID, - (Unbounded, Unbounded), - vec![ - basic_read_version.clone(), - read_version_2.clone(), - read_version_3.clone(), - ], + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), + &hummock_storage.read_version(), ) .unwrap(); let iter = hummock_version_reader .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), epoch3, ReadOptions { table_id: TEST_TABLE_ID, @@ -1465,15 +1504,14 @@ async fn test_hummock_version_reader() { let key_range = (Included(start_key), Excluded(end_key)); { - let (_, read_snapshot) = read_filter_for_batch( + let (_, read_snapshot) = read_filter_for_version( epoch2, TEST_TABLE_ID, - (Unbounded, Unbounded), - vec![ - basic_read_version.clone(), - read_version_2.clone(), - read_version_3.clone(), - ], + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), + &hummock_storage.read_version(), ) .unwrap(); @@ -1497,15 +1535,14 @@ async fn test_hummock_version_reader() { } { - let (_, read_snapshot) = read_filter_for_batch( + let (_, read_snapshot) = read_filter_for_version( epoch2, TEST_TABLE_ID, - (Unbounded, Unbounded), - vec![ - basic_read_version.clone(), - read_version_2.clone(), - read_version_3.clone(), - ], + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), + &hummock_storage.read_version(), ) .unwrap(); @@ -1787,21 +1824,23 @@ async fn test_table_watermark() { .await; let vnode1 = VirtualNode::from_index(1); - let vnode_bitmap1 = { + let vnode_bitmap1 = Arc::new({ let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); builder.set(1, true); builder.finish() - }; + }); let vnode2 = VirtualNode::from_index(2); - let vnode_bitmap2 = { + let vnode_bitmap2 = Arc::new({ let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); builder.set(2, true); builder.finish() - }; + }); let epoch1 = (31 * 1000) << 16; local1.init_for_test(epoch1).await.unwrap(); + local1.update_vnode_bitmap(vnode_bitmap1.clone()); local2.init_for_test(epoch1).await.unwrap(); + local2.update_vnode_bitmap(vnode_bitmap2.clone()); fn gen_inner_key(index: usize) -> Bytes { Bytes::copy_from_slice(format!("key_{:05}", index).as_bytes()) @@ -1895,10 +1934,7 @@ async fn test_table_watermark() { SealCurrentEpochOptions { table_watermarks: Some(( WatermarkDirection::Ascending, - vec![VnodeWatermark::new( - Arc::new(vnode_bitmap), - gen_inner_key(watermark1), - )], + vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(watermark1))], )), switch_op_consistency_level: None, }, @@ -2101,6 +2137,8 @@ async fn test_table_watermark() { test_env.commit_epoch(epoch1).await; test_env.storage.try_wait_epoch_for_test(epoch1).await; + let (local1, local2) = test_after_epoch2(local1, local2).await; + let test_global_read = |storage: HummockStorage, epoch: u64| async move { // inner vnode read for vnode in [vnode1, vnode2] { @@ -2119,6 +2157,7 @@ async fn test_table_watermark() { if index < watermark1 { assert!(value.is_none()); } else { + println!("index {} vnode {}", index, vnode); assert_eq!(value.unwrap(), gen_val(index)); } } @@ -2174,47 +2213,6 @@ async fn test_table_watermark() { assert!(result.is_empty()); } } - - // cross vnode read - let result = storage - .iter( - ( - Included(TableKey(prefix_slice_with_vnode( - vnode1, - &gen_inner_key(gen_range().start), - ))), - Included(TableKey(prefix_slice_with_vnode( - vnode2, - &gen_inner_key(gen_range().end), - ))), - ), - epoch, - ReadOptions { - table_id: TEST_TABLE_ID, - ..Default::default() - }, - ) - .await - .unwrap() - .map_ok(|(full_key, value)| (full_key.user_key, value)) - .try_collect::>() - .await - .unwrap(); - let expected = [vnode1, vnode2] - .into_iter() - .flat_map(|vnode| { - gen_range() - .filter(|index| index % 3 == 0 || index % 3 == 1) - .filter(|index| index >= &watermark1) - .map(move |index| { - ( - UserKey::new(TEST_TABLE_ID, gen_key(vnode, index)), - gen_val(index), - ) - }) - }) - .collect_vec(); - assert_eq!(expected, result); }; test_global_read(test_env.storage.clone(), epoch2).await; @@ -2244,10 +2242,7 @@ async fn test_table_watermark() { SealCurrentEpochOptions { table_watermarks: Some(( WatermarkDirection::Ascending, - vec![VnodeWatermark::new( - Arc::new(vnode_bitmap), - gen_inner_key(5), - )], + vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(5))], )), switch_op_consistency_level: None, }, diff --git a/src/storage/hummock_test/src/local_state_store_test_utils.rs b/src/storage/hummock_test/src/local_state_store_test_utils.rs index d9f2a503c39e..fed253a0488d 100644 --- a/src/storage/hummock_test/src/local_state_store_test_utils.rs +++ b/src/storage/hummock_test/src/local_state_store_test_utils.rs @@ -13,20 +13,14 @@ // limitations under the License. use std::future::Future; -use std::sync::Arc; -use risingwave_common::buffer::Bitmap; -use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::EpochPair; use risingwave_storage::error::StorageResult; use risingwave_storage::store::{InitOptions, LocalStateStore}; pub trait LocalStateStoreTestExt: LocalStateStore { fn init_for_test(&mut self, epoch: u64) -> impl Future> + Send + '_ { - self.init(InitOptions::new( - EpochPair::new_test_epoch(epoch), - Arc::new(Bitmap::ones(VirtualNode::COUNT)), - )) + self.init(InitOptions::new(EpochPair::new_test_epoch(epoch))) } } impl LocalStateStoreTestExt for T {} diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index 2a6dd31dbfee..00688a978038 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -18,7 +18,7 @@ use bytes::Bytes; use futures::TryStreamExt; use risingwave_common::cache::CachePriority; use risingwave_common::hash::VirtualNode; -use risingwave_hummock_sdk::key::TableKey; +use risingwave_hummock_sdk::key::prefixed_range_with_vnode; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_rpc_client::HummockMetaClient; @@ -35,16 +35,18 @@ use crate::test_utils::{ }; macro_rules! assert_count_range_scan { - ($storage:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{ + ($storage:expr, $vnode:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{ use std::ops::RangeBounds; let range = $range; - let bounds: (Bound>, Bound>) = ( - range.start_bound().map(|x: &TableKey| x.clone()), - range.end_bound().map(|x: &TableKey| x.clone()), + let bounds: (Bound, Bound) = ( + range.start_bound().map(|x: &Bytes| x.clone()), + range.end_bound().map(|x: &Bytes| x.clone()), ); + let vnode = $vnode; + let table_key_range = prefixed_range_with_vnode(bounds, vnode); let it = $storage .iter( - bounds, + table_key_range, $epoch, ReadOptions { prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(), @@ -150,7 +152,7 @@ async fn test_snapshot_inner( .unwrap(); } } - assert_count_range_scan!(hummock_storage, .., 2, epoch1); + assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 2, epoch1); local .ingest_batch( @@ -194,8 +196,8 @@ async fn test_snapshot_inner( .unwrap(); } } - assert_count_range_scan!(hummock_storage, .., 3, epoch2); - assert_count_range_scan!(hummock_storage, .., 2, epoch1); + assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 3, epoch2); + assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 2, epoch1); local .ingest_batch( @@ -238,9 +240,9 @@ async fn test_snapshot_inner( .unwrap(); } } - assert_count_range_scan!(hummock_storage, .., 0, epoch3); - assert_count_range_scan!(hummock_storage, .., 3, epoch2); - assert_count_range_scan!(hummock_storage, .., 2, epoch1); + assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 0, epoch3); + assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 3, epoch2); + assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 2, epoch1); } async fn test_snapshot_range_scan_inner( @@ -302,16 +304,28 @@ async fn test_snapshot_range_scan_inner( } macro_rules! key { ($idx:expr) => { - gen_key_from_bytes(VirtualNode::ZERO, &Bytes::from(stringify!($idx))) + Bytes::from(stringify!($idx)) }; } - assert_count_range_scan!(hummock_storage, key!(2)..=key!(3), 2, epoch); - assert_count_range_scan!(hummock_storage, key!(2)..key!(3), 1, epoch); - assert_count_range_scan!(hummock_storage, key!(2).., 3, epoch); - assert_count_range_scan!(hummock_storage, ..=key!(3), 3, epoch); - assert_count_range_scan!(hummock_storage, ..key!(3), 2, epoch); - assert_count_range_scan!(hummock_storage, .., 4, epoch); + assert_count_range_scan!( + hummock_storage, + VirtualNode::ZERO, + key!(2)..=key!(3), + 2, + epoch + ); + assert_count_range_scan!( + hummock_storage, + VirtualNode::ZERO, + key!(2)..key!(3), + 1, + epoch + ); + assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, key!(2).., 3, epoch); + assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, ..=key!(3), 3, epoch); + assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, ..key!(3), 2, epoch); + assert_count_range_scan!(hummock_storage, VirtualNode::ZERO, .., 4, epoch); } #[tokio::test] diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index d7f7531b8b65..f8372c484396 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -13,15 +13,16 @@ // limitations under the License. use std::ops::Bound; -use std::ops::Bound::Unbounded; use std::sync::Arc; use bytes::Bytes; use expect_test::expect; use futures::{pin_mut, StreamExt, TryStreamExt}; +use risingwave_common::buffer::Bitmap; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; +use risingwave_hummock_sdk::key::prefixed_range_with_vnode; use risingwave_hummock_sdk::{ HummockEpoch, HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, }; @@ -58,7 +59,10 @@ async fn test_empty_read_v2() { .is_none()); let stream = hummock_storage .iter( - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), u64::MAX, ReadOptions { table_id: TableId { table_id: 2333 }, @@ -124,7 +128,9 @@ async fn test_basic_inner( // Make sure the batch is sorted. batch3.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); - let mut local = hummock_storage.new_local(Default::default()).await; + let mut local = hummock_storage + .new_local(NewLocalOptions::for_test(TableId::default())) + .await; // epoch 0 is reserved by storage service let epoch1: u64 = 1; @@ -277,9 +283,12 @@ async fn test_basic_inner( // Write aa bb let iter = hummock_storage .iter( - ( - Bound::Unbounded, - Bound::Included(gen_key_from_str(VirtualNode::ZERO, "ee")), + prefixed_range_with_vnode( + ( + Bound::::Unbounded, + Bound::Included(Bytes::from("ee")), + ), + VirtualNode::ZERO, ), epoch1, ReadOptions { @@ -324,9 +333,12 @@ async fn test_basic_inner( // Update aa, write cc let iter = hummock_storage .iter( - ( - Bound::Unbounded, - Bound::Included(gen_key_from_str(VirtualNode::ZERO, "ee")), + prefixed_range_with_vnode( + ( + Bound::::Unbounded, + Bound::Included(Bytes::from("ee")), + ), + VirtualNode::ZERO, ), epoch2, ReadOptions { @@ -343,9 +355,12 @@ async fn test_basic_inner( // Delete aa, write dd,ee let iter = hummock_storage .iter( - ( - Bound::Unbounded, - Bound::Included(gen_key_from_str(VirtualNode::ZERO, "ee")), + prefixed_range_with_vnode( + ( + Bound::::Unbounded, + Bound::Included(Bytes::from("ee")), + ), + VirtualNode::ZERO, ), epoch3, ReadOptions { @@ -638,9 +653,12 @@ async fn test_reload_storage() { // Write aa bb let iter = hummock_storage .iter( - ( - Bound::Unbounded, - Bound::Included(gen_key_from_str(VirtualNode::ZERO, "ee")), + prefixed_range_with_vnode( + ( + Bound::::Unbounded, + Bound::Included(Bytes::from("ee")), + ), + VirtualNode::ZERO, ), epoch1, ReadOptions { @@ -686,9 +704,12 @@ async fn test_reload_storage() { // Update aa, write cc let iter = hummock_storage .iter( - ( - Bound::Unbounded, - Bound::Included(gen_key_from_str(VirtualNode::ZERO, "ee")), + prefixed_range_with_vnode( + ( + Bound::::Unbounded, + Bound::Included(Bytes::from("ee")), + ), + VirtualNode::ZERO, ), epoch2, ReadOptions { @@ -1025,7 +1046,9 @@ async fn test_delete_get_inner( StorageValue::new_put("222"), ), ]; - let mut local = hummock_storage.new_local(NewLocalOptions::default()).await; + let mut local = hummock_storage + .new_local(NewLocalOptions::for_test(Default::default())) + .await; local.init_for_test(epoch1).await.unwrap(); local .ingest_batch( @@ -1108,7 +1131,9 @@ async fn test_multiple_epoch_sync_inner( ), ]; - let mut local = hummock_storage.new_local(NewLocalOptions::default()).await; + let mut local = hummock_storage + .new_local(NewLocalOptions::for_test(TableId::default())) + .await; local.init_for_test(epoch1).await.unwrap(); local .ingest_batch( @@ -1364,6 +1389,7 @@ async fn test_replicated_local_hummock_storage() { TableOption { retention_seconds: None, }, + Arc::new(Bitmap::ones(VirtualNode::COUNT)), )) .await; @@ -1406,7 +1432,10 @@ async fn test_replicated_local_hummock_storage() { let actual = risingwave_storage::store::LocalStateStore::iter( &local_hummock_storage, - (Unbounded, Unbounded), + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), read_options.clone(), ) .await @@ -1468,7 +1497,14 @@ async fn test_replicated_local_hummock_storage() { // Test Global State Store iter, epoch2 { let actual = hummock_storage - .iter((Unbounded, Unbounded), epoch2, read_options.clone()) + .iter( + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), + epoch2, + read_options.clone(), + ) .await .unwrap() .collect::>() @@ -1496,7 +1532,14 @@ async fn test_replicated_local_hummock_storage() { // Test Global State Store iter, epoch1 { let actual = hummock_storage - .iter((Unbounded, Unbounded), epoch1, read_options) + .iter( + prefixed_range_with_vnode( + (Bound::::Unbounded, Bound::::Unbounded), + VirtualNode::ZERO, + ), + epoch1, + read_options, + ) .await .unwrap() .collect::>() diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index f5b749c83fb5..966fd62d34dd 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -149,11 +149,14 @@ pub struct TracedNewLocalOptions { pub op_consistency_level: TracedOpConsistencyLevel, pub table_option: TracedTableOption, pub is_replicated: bool, + pub vnodes: TracedBitmap, } #[cfg(test)] impl TracedNewLocalOptions { pub(crate) fn for_test(table_id: u32) -> Self { + use risingwave_common::hash::VirtualNode; + Self { table_id: TracedTableId { table_id }, op_consistency_level: TracedOpConsistencyLevel::Inconsistent, @@ -161,6 +164,7 @@ impl TracedNewLocalOptions { retention_seconds: None, }, is_replicated: false, + vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT)), } } } @@ -224,7 +228,6 @@ impl From for EpochPair { #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] pub struct TracedInitOptions { pub epoch: TracedEpochPair, - pub vnodes: TracedBitmap, } #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index f1209775e154..3cd6cdf9f18c 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -741,6 +741,7 @@ impl HummockEventHandler { table_id, new_read_version_sender, is_replicated, + vnodes, } => { let pinned_version = self.pinned_version.load(); let basic_read_version = Arc::new(RwLock::new( @@ -748,6 +749,7 @@ impl HummockEventHandler { table_id, (**pinned_version).clone(), is_replicated, + vnodes, ), )); @@ -875,7 +877,9 @@ mod tests { use bytes::Bytes; use futures::FutureExt; use itertools::Itertools; + use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; + use risingwave_common::hash::VirtualNode; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_hummock_sdk::key::TableKey; use risingwave_hummock_sdk::version::HummockVersion; @@ -955,6 +959,7 @@ mod tests { table_id, new_read_version_sender: read_version_tx, is_replicated: false, + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)), }) .unwrap(); let (read_version, guard) = read_version_rx.await.unwrap(); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 5c2e081c19f7..9bcc54851420 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use parking_lot::{RwLock, RwLockReadGuard}; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::HummockEpoch; use thiserror_ext::AsReport; @@ -88,6 +89,7 @@ pub enum HummockEvent { table_id: TableId, new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>, is_replicated: bool, + vnodes: Arc, }, DestroyReadVersion { @@ -138,6 +140,7 @@ impl HummockEvent { table_id, new_read_version_sender: _, is_replicated, + vnodes: _, } => format!( "RegisterReadVersion table_id {:?}, is_replicated: {:?}", table_id, is_replicated diff --git a/src/storage/src/hummock/iterator/test_utils.rs b/src/storage/src/hummock/iterator/test_utils.rs index 62602074c601..c4a2e250d53f 100644 --- a/src/storage/src/hummock/iterator/test_utils.rs +++ b/src/storage/src/hummock/iterator/test_utils.rs @@ -19,7 +19,8 @@ use bytes::Bytes; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; -use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; +use risingwave_common::hash::VirtualNode; +use risingwave_hummock_sdk::key::{prefix_slice_with_vnode, FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId}; use risingwave_object_store::object::{ InMemObjectStore, ObjectStore, ObjectStoreImpl, ObjectStoreRef, @@ -78,8 +79,9 @@ pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableSto })) } +// Generate test table key with vnode 0 pub fn iterator_test_table_key_of(idx: usize) -> Vec { - format!("key_test_{:05}", idx).as_bytes().to_vec() + prefix_slice_with_vnode(VirtualNode::ZERO, format!("key_test_{:05}", idx).as_bytes()).to_vec() } pub fn iterator_test_user_key_of(idx: usize) -> UserKey> { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index d2064ffd09de..88c123ca5d5b 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -24,8 +24,9 @@ use more_asserts::assert_gt; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::is_max_epoch; use risingwave_common_service::observer_manager::{NotificationClient, ObserverManager}; -use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange}; -use risingwave_hummock_sdk::table_watermark::ReadTableWatermark; +use risingwave_hummock_sdk::key::{ + is_empty_key_range, vnode, vnode_range, TableKey, TableKeyRange, +}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; @@ -34,7 +35,7 @@ use tokio::sync::oneshot; use tracing::error; use super::local_hummock_storage::LocalHummockStorage; -use super::version::{CommittedVersion, HummockVersionReader}; +use super::version::{read_filter_for_version, CommittedVersion, HummockVersionReader}; use crate::error::StorageResult; use crate::filter_key_extractor::{FilterKeyExtractorManager, RpcFilterKeyExtractorManager}; use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; @@ -45,7 +46,6 @@ use crate::hummock::event_handler::{ }; use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, PinnedVersion}; use crate::hummock::observer_manager::HummockObserverNode; -use crate::hummock::store::version::read_filter_for_batch; use crate::hummock::utils::{validate_safe_epoch, wait_for_epoch}; use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef}; use crate::hummock::{ @@ -110,12 +110,7 @@ pub struct HummockStorage { write_limiter: WriteLimiterRef, } -pub type ReadVersionTuple = ( - Vec, - Vec, - CommittedVersion, - Option, -); +pub type ReadVersionTuple = (Vec, Vec, CommittedVersion); pub fn get_committed_read_version_tuple( version: PinnedVersion, @@ -123,11 +118,10 @@ pub fn get_committed_read_version_tuple( mut key_range: TableKeyRange, epoch: HummockEpoch, ) -> (TableKeyRange, ReadVersionTuple) { - let watermark = version - .table_watermark_index() - .get(&table_id) - .and_then(|index| index.range_watermarks(epoch, &mut key_range)); - (key_range, (vec![], vec![], version, watermark)) + if let Some(index) = version.table_watermark_index().get(&table_id) { + index.rewrite_range_with_table_watermark(epoch, &mut key_range) + } + (key_range, (vec![], vec![], version)) } impl HummockStorage { @@ -318,13 +312,28 @@ impl HummockStorage { // read committed_version directly without build snapshot get_committed_read_version_tuple((**pinned_version).clone(), table_id, key_range, epoch) } else { + let vnode = vnode(&key_range); + let mut matched_replicated_read_version_cnt = 0; let read_version_vec = { let read_guard = self.read_version_mapping.read(); read_guard .get(&table_id) .map(|v| { v.values() - .filter(|v| !v.read_arc().is_replicated()) + .filter(|v| { + let read_version = v.read(); + if read_version.contains(vnode) { + if read_version.is_replicated() { + matched_replicated_read_version_cnt += 1; + false + } else { + // Only non-replicated read version with matched vnode is considered + true + } + } else { + false + } + }) .cloned() .collect_vec() }) @@ -334,6 +343,24 @@ impl HummockStorage { // When the system has just started and no state has been created, the memory state // may be empty if read_version_vec.is_empty() { + if matched_replicated_read_version_cnt > 0 { + tracing::warn!( + "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={})", + table_id, + vnode.to_index(), + epoch, + matched_replicated_read_version_cnt, + pinned_version.max_committed_epoch() + ); + } else { + tracing::debug!( + "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={})", + table_id, + vnode.to_index(), + epoch, + pinned_version.max_committed_epoch() + ); + } get_committed_read_version_tuple( (**pinned_version).clone(), table_id, @@ -341,7 +368,22 @@ impl HummockStorage { epoch, ) } else { - read_filter_for_batch(epoch, table_id, key_range, read_version_vec)? + if read_version_vec.len() != 1 { + let read_version_vnodes = read_version_vec + .into_iter() + .map(|v| { + let v = v.read(); + v.vnodes().iter_ones().collect_vec() + }) + .collect_vec(); + panic!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes); + } + read_filter_for_version( + epoch, + table_id, + key_range, + read_version_vec.first().unwrap(), + )? } }; @@ -355,6 +397,7 @@ impl HummockStorage { table_id: option.table_id, new_read_version_sender: tx, is_replicated: option.is_replicated, + vnodes: option.vnodes.clone(), }) .unwrap(); @@ -416,6 +459,14 @@ impl StateStoreRead for HummockStorage { epoch: u64, read_options: ReadOptions, ) -> impl Future> + '_ { + let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range); + assert_eq!( + r_vnode_exclusive - l_vnode_inclusive, + 1, + "read range {:?} for table {} iter contains more than one vnode", + key_range, + read_options.table_id + ); self.iter_inner(key_range, epoch, read_options) } } diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index b4e4f041b02f..604a242c6675 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -21,7 +21,7 @@ use bytes::Bytes; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::MAX_SPILL_TIMES; -use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange}; +use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; use tokio::sync::mpsc; use tracing::{warn, Instrument}; @@ -30,13 +30,12 @@ use super::version::{StagingData, VersionUpdate}; use crate::error::StorageResult; use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard}; use crate::hummock::iterator::{ - ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator, - UserIterator, + ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, UserIterator, }; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferBatchIterator, }; -use crate::hummock::store::version::{read_filter_for_local, HummockVersionReader}; +use crate::hummock::store::version::{read_filter_for_version, HummockVersionReader}; use crate::hummock::utils::{ do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, wait_for_epoch, ENABLE_SANITY_CHECK, @@ -111,7 +110,7 @@ impl LocalHummockStorage { Bound::Included(table_key.clone()), ); - let (table_key_range, read_snapshot) = read_filter_for_local( + let (table_key_range, read_snapshot) = read_filter_for_version( epoch, read_options.table_id, table_key_range, @@ -137,7 +136,7 @@ impl LocalHummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult> { - let (table_key_range, read_snapshot) = read_filter_for_local( + let (table_key_range, read_snapshot) = read_filter_for_version( epoch, read_options.table_id, table_key_range, @@ -165,7 +164,7 @@ impl LocalHummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult>> { - let (table_key_range, read_snapshot) = read_filter_for_local( + let (table_key_range, read_snapshot) = read_filter_for_version( epoch, read_options.table_id, table_key_range, @@ -192,7 +191,7 @@ impl LocalHummockStorage { return Ok(true); } - let (key_range, read_snapshot) = read_filter_for_local( + let (key_range, read_snapshot) = read_filter_for_version( HummockEpoch::MAX, // Use MAX epoch to make sure we read from latest read_options.table_id, key_range, @@ -260,6 +259,14 @@ impl LocalStateStore for LocalHummockStorage { key_range: TableKeyRange, read_options: ReadOptions, ) -> StorageResult> { + let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range); + assert_eq!( + r_vnode_exclusive - l_vnode_inclusive, + 1, + "read range {:?} for table {} iter contains more than one vnode", + key_range, + read_options.table_id + ); self.iter_all(key_range.clone(), self.epoch(), read_options) .await } @@ -387,15 +394,6 @@ impl LocalStateStore for LocalHummockStorage { "local state store of table id {:?} is init for more than once", self.table_id ); - let prev_vnodes = self - .read_version - .write() - .update_vnode_bitmap(options.vnodes); - assert!( - prev_vnodes.is_none(), - "Vnode bitmap should be empty during init" - ); - Ok(()) } @@ -442,8 +440,7 @@ impl LocalStateStore for LocalHummockStorage { assert!(read_version.staging().is_empty(), "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update", self.table_id(), self.instance_id() ); - let prev_vnodes = read_version.update_vnode_bitmap(vnodes); - prev_vnodes.expect("Previous vnode bitmap should not be none") + read_version.update_vnode_bitmap(vnodes) } } @@ -581,15 +578,13 @@ impl LocalHummockStorage { pub type StagingDataIterator = MergeIterator< HummockIteratorUnion, SstableIterator>, >; -pub type HummockStorageIteratorPayloadInner<'a> = SkipWatermarkIterator< - MergeIterator< - HummockIteratorUnion< - Forward, - StagingDataIterator, - SstableIterator, - ConcatIteratorInner, - MemTableHummockIterator<'a>, - >, +pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator< + HummockIteratorUnion< + Forward, + StagingDataIterator, + SstableIterator, + ConcatIteratorInner, + MemTableHummockIterator<'a>, >, >; diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 513451b84852..1be6893144a2 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -25,13 +25,14 @@ use itertools::Itertools; use parking_lot::RwLock; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; +use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{ bound_table_key_range, is_empty_key_range, FullKey, TableKey, TableKeyRange, UserKey, }; use risingwave_hummock_sdk::key_range::KeyRangeCommon; use risingwave_hummock_sdk::table_watermark::{ - ReadTableWatermark, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, + TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo}; @@ -41,10 +42,8 @@ use tracing::Instrument; use super::StagingDataIterator; use crate::error::StorageResult; -use crate::hummock::event_handler::HummockReadVersionRef; use crate::hummock::iterator::{ - ConcatIterator, ForwardMergeRangeIterator, HummockIteratorUnion, MergeIterator, - SkipWatermarkIterator, UserIterator, + ConcatIterator, ForwardMergeRangeIterator, HummockIteratorUnion, MergeIterator, UserIterator, }; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::sstable::SstableIteratorReadOptions; @@ -220,7 +219,7 @@ pub struct HummockReadVersion { // Vnode bitmap corresponding to the read version // It will be initialized after local state store init - vnodes: Option>, + vnodes: Arc, } impl HummockReadVersion { @@ -228,6 +227,7 @@ impl HummockReadVersion { table_id: TableId, committed_version: CommittedVersion, is_replicated: bool, + vnodes: Arc, ) -> Self { // before build `HummockReadVersion`, we need to get the a initial version which obtained // from meta. want this initialization after version is initialized (now with @@ -247,12 +247,16 @@ impl HummockReadVersion { committed: committed_version, is_replicated, - vnodes: None, + vnodes, } } - pub fn new(table_id: TableId, committed_version: CommittedVersion) -> Self { - Self::new_with_replication_option(table_id, committed_version, false) + pub fn new( + table_id: TableId, + committed_version: CommittedVersion, + vnodes: Arc, + ) -> Self { + Self::new_with_replication_option(table_id, committed_version, false, vnodes) } pub fn table_id(&self) -> TableId { @@ -510,97 +514,20 @@ impl HummockReadVersion { self.is_replicated } - pub fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Option> { - self.vnodes.replace(vnodes) + pub fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + std::mem::replace(&mut self.vnodes, vnodes) } -} - -pub fn read_filter_for_batch( - epoch: HummockEpoch, // for check - table_id: TableId, - mut key_range: TableKeyRange, - read_version_vec: Vec, -) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { - assert!(!read_version_vec.is_empty()); - let mut staging_vec = Vec::with_capacity(read_version_vec.len()); - let mut max_mce_version: Option = None; - let mut table_watermarks: Vec = Vec::new(); - for read_version in &read_version_vec { - let read_version_guard = read_version.read(); - - let read_watermark = read_version_guard - .table_watermarks - .as_ref() - .and_then(|watermarks| watermarks.range_watermarks(epoch, &mut key_range)); - - if let Some(read_watermark) = read_watermark { - table_watermarks.push(read_watermark); - } - - let (imms, ssts) = { - let (imm_iter, sst_iter) = read_version_guard - .staging() - .prune_overlap(epoch, table_id, &key_range); - - ( - imm_iter.cloned().collect_vec(), - sst_iter.cloned().collect_vec(), - ) - }; - staging_vec.push((imms, ssts)); - if let Some(version) = &max_mce_version { - if read_version_guard.committed().max_committed_epoch() > version.max_committed_epoch() - { - max_mce_version = Some(read_version_guard.committed.clone()); - } - } else { - max_mce_version = Some(read_version_guard.committed.clone()); - } + pub fn contains(&self, vnode: VirtualNode) -> bool { + self.vnodes.is_set(vnode.to_index()) } - let max_mce_version = max_mce_version.expect("should exist for once"); - - let mut imm_vec = Vec::default(); - let mut sst_vec = Vec::default(); - let mut seen_imm_ids = HashSet::new(); - let mut seen_sst_ids = HashSet::new(); - - // only filter the staging data that epoch greater than max_mce to avoid data duplication - let (min_epoch, max_epoch) = (max_mce_version.max_committed_epoch(), epoch); - // prune imm and sst with max_mce - for (staging_imms, staging_ssts) in staging_vec { - imm_vec.extend(staging_imms.into_iter().filter(|imm| { - // There shouldn't be duplicated IMMs because merge imm only operates on a single shard. - assert!(seen_imm_ids.insert(imm.batch_id())); - imm.min_epoch() > min_epoch && imm.min_epoch() <= max_epoch - })); - - sst_vec.extend(staging_ssts.into_iter().filter(|staging_sst| { - assert!( - staging_sst.get_max_epoch() <= min_epoch || staging_sst.get_min_epoch() > min_epoch - ); - // Dedup staging SSTs in different shard. Duplicates can happen in the following case: - // - Table 1 Shard 1 produces IMM 1 - // - Table 1 Shard 2 produces IMM 2 - // - IMM 1 and IMM 2 are compacted into SST 1 as a Staging SST - // - SST 1 is added to both Shard 1's and Shard 2's read version - staging_sst.min_epoch > min_epoch && seen_sst_ids.insert(staging_sst.object_id) - })); + pub fn vnodes(&self) -> Arc { + self.vnodes.clone() } - - Ok(( - key_range, - ( - imm_vec, - sst_vec, - max_mce_version, - ReadTableWatermark::merge_multiple(table_watermarks), - ), - )) } -pub fn read_filter_for_local( +pub fn read_filter_for_version( epoch: HummockEpoch, table_id: TableId, mut table_key_range: TableKeyRange, @@ -610,10 +537,9 @@ pub fn read_filter_for_local( let committed_version = read_version_guard.committed().clone(); - let table_watermark = read_version_guard - .table_watermarks - .as_ref() - .and_then(|watermark| watermark.range_watermarks(epoch, &mut table_key_range)); + if let Some(watermark) = read_version_guard.table_watermarks.as_ref() { + watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range) + } let (imm_iter, sst_iter) = read_version_guard @@ -623,10 +549,7 @@ pub fn read_filter_for_local( let imms = imm_iter.cloned().collect(); let ssts = sst_iter.cloned().collect(); - Ok(( - table_key_range, - (imms, ssts, committed_version, table_watermark), - )) + Ok((table_key_range, (imms, ssts, committed_version))) } #[derive(Clone)] @@ -668,21 +591,8 @@ impl HummockVersionReader { read_options: ReadOptions, read_version_tuple: ReadVersionTuple, ) -> StorageResult> { - let (imms, uncommitted_ssts, committed_version, watermark) = read_version_tuple; - let key_vnode = table_key.vnode_part(); - if let Some(read_watermark) = watermark { - for (vnode, watermark) in read_watermark.vnode_watermarks { - if vnode == key_vnode { - let inner_key = table_key.key_part(); - if read_watermark - .direction - .filter_by_watermark(inner_key, watermark) - { - return Ok(None); - } - } - } - } + let (imms, uncommitted_ssts, committed_version) = read_version_tuple; + let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref()); let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), read_options.table_id); @@ -846,12 +756,7 @@ impl HummockVersionReader { table_key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - read_version_tuple: ( - Vec, - Vec, - CommittedVersion, - Option, - ), + read_version_tuple: (Vec, Vec, CommittedVersion), memtable_iter: MemTableHummockIterator<'a>, ) -> StorageResult>> { self.iter_inner( @@ -874,7 +779,7 @@ impl HummockVersionReader { ) -> StorageResult>> { let table_id_string = read_options.table_id.to_string(); let table_id_label = table_id_string.as_str(); - let (imms, uncommitted_ssts, committed, watermark) = read_version_tuple; + let (imms, uncommitted_ssts, committed) = read_version_tuple; let mut local_stats = StoreLocalStatistic::default(); let mut staging_iters = Vec::with_capacity(imms.len() + uncommitted_ssts.len()); @@ -1086,13 +991,6 @@ impl HummockVersionReader { .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)), ); - let watermark = watermark - .into_iter() - .map(|watermark| (read_options.table_id, watermark)) - .collect(); - - let skip_watermark_iter = SkipWatermarkIterator::new(merge_iter, watermark); - let user_key_range = ( user_key_range.0.map(|key| key.cloned()), user_key_range.1.map(|key| key.cloned()), @@ -1101,7 +999,7 @@ impl HummockVersionReader { // the epoch_range left bound for iterator read let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref()); let mut user_iter = UserIterator::new( - skip_watermark_iter, + merge_iter, user_key_range, epoch, min_epoch, @@ -1139,7 +1037,7 @@ impl HummockVersionReader { } let table_id = read_options.table_id; - let (imms, uncommitted_ssts, committed_version, _) = read_version_tuple; + let (imms, uncommitted_ssts, committed_version) = read_version_tuple; let mut stats_guard = MayExistLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id); diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 0e94071d492a..60d794a40819 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -436,7 +436,7 @@ pub struct MemtableLocalStateStore { table_id: TableId, op_consistency_level: OpConsistencyLevel, table_option: TableOption, - vnodes: Option>, + vnodes: Arc, } impl MemtableLocalStateStore { @@ -448,7 +448,7 @@ impl MemtableLocalStateStore { table_id: option.table_id, op_consistency_level: option.op_consistency_level, table_option: option.table_option, - vnodes: None, + vnodes: option.vnodes, } } @@ -601,11 +601,6 @@ impl LocalStateStore for MemtableLocalState "epoch in local state store of table id {:?} is init for more than once", self.table_id ); - assert!( - self.vnodes.replace(options.vnodes).is_none(), - "vnodes in local state store of table id {:?} is init for more than once", - self.table_id - ); Ok(()) } @@ -665,7 +660,7 @@ impl LocalStateStore for MemtableLocalState } fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - self.vnodes.replace(vnodes).unwrap() + std::mem::replace(&mut self.vnodes, vnodes) } } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index e5b096de4719..96838e1ef25d 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -25,6 +25,7 @@ use futures_async_stream::try_stream; use prost::Message; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{Epoch, EpochPair}; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; use risingwave_hummock_sdk::table_watermark::{ @@ -32,8 +33,8 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::{HummockReadEpoch, LocalSstableInfo}; use risingwave_hummock_trace::{ - TracedBitmap, TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, - TracedPrefetchOptions, TracedReadOptions, TracedSealCurrentEpochOptions, TracedWriteOptions, + TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedWriteOptions, }; use crate::error::{StorageError, StorageResult}; @@ -447,7 +448,7 @@ impl OpConsistencyLevel { } } -#[derive(Clone, Default)] +#[derive(Clone)] pub struct NewLocalOptions { pub table_id: TableId, /// Whether the operation is consistent. The term `consistent` requires the following: @@ -463,6 +464,9 @@ pub struct NewLocalOptions { /// Indicate if this is replicated. If it is, we should not /// upload its ReadVersions. pub is_replicated: bool, + + /// The vnode bitmap for the local state store instance + pub vnodes: Arc, } impl From for NewLocalOptions { @@ -477,6 +481,7 @@ impl From for NewLocalOptions { }, table_option: value.table_option.into(), is_replicated: value.is_replicated, + vnodes: Arc::new(value.vnodes.into()), } } } @@ -493,6 +498,7 @@ impl From for TracedNewLocalOptions { }, table_option: value.table_option.into(), is_replicated: value.is_replicated, + vnodes: value.vnodes.as_ref().clone().into(), } } } @@ -502,12 +508,14 @@ impl NewLocalOptions { table_id: TableId, op_consistency_level: OpConsistencyLevel, table_option: TableOption, + vnodes: Arc, ) -> Self { NewLocalOptions { table_id, op_consistency_level, table_option, is_replicated: false, + vnodes, } } @@ -515,12 +523,14 @@ impl NewLocalOptions { table_id: TableId, op_consistency_level: OpConsistencyLevel, table_option: TableOption, + vnodes: Arc, ) -> Self { NewLocalOptions { table_id, op_consistency_level, table_option, is_replicated: true, + vnodes, } } @@ -532,6 +542,7 @@ impl NewLocalOptions { retention_seconds: None, }, is_replicated: false, + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)), } } } @@ -539,14 +550,11 @@ impl NewLocalOptions { #[derive(Clone)] pub struct InitOptions { pub epoch: EpochPair, - - /// The vnode bitmap for the local state store instance - pub vnodes: Arc, } impl InitOptions { - pub fn new(epoch: EpochPair, vnodes: Arc) -> Self { - Self { epoch, vnodes } + pub fn new(epoch: EpochPair) -> Self { + Self { epoch } } } @@ -554,7 +562,6 @@ impl From for TracedInitOptions { fn from(value: InitOptions) -> Self { TracedInitOptions { epoch: value.epoch.into(), - vnodes: TracedBitmap::from(value.vnodes.as_ref().clone()), } } } @@ -563,7 +570,6 @@ impl From for InitOptions { fn from(value: TracedInitOptions) -> Self { InitOptions { epoch: value.epoch.into(), - vnodes: Arc::new(Bitmap::from(value.vnodes)), } } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 9b77a9a7bf09..a6a69ae00350 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -365,6 +365,7 @@ impl LogStoreFactory for KvLogStoreFactory { retention_seconds: None, }, is_replicated: false, + vnodes: serde.vnodes().clone(), }) .await; @@ -1375,8 +1376,10 @@ mod tests { let chunk_ids = check_reader(&mut reader, [(epoch3, None)].iter()).await; assert_eq!(0, chunk_ids.len()); - // Recovery happens. Test rewind while consuming persisted log. No new data written - + // Recovery happens. Test rewind while consuming persisted log. No new data written. + // Writer must be dropped first to ensure vnode assignment is exclusive. + drop(reader); + drop(writer); let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), @@ -1432,7 +1435,9 @@ mod tests { assert_eq!(1, chunk_ids.len()); // Recovery happens again. Test rewind with some new data written and flushed. - + // Writer must be dropped first to ensure vnode assignment is exclusive. + drop(reader); + drop(writer); let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index b4f54d17ed1d..731d8e42126d 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -83,9 +83,7 @@ impl LogWriter for KvLogStoreWriter { epoch: EpochPair, pause_read_on_bootstrap: bool, ) -> LogStoreResult<()> { - self.state_store - .init(InitOptions::new(epoch, self.serde.vnodes().clone())) - .await?; + self.state_store.init(InitOptions::new(epoch)).await?; if pause_read_on_bootstrap { self.pause()?; info!("KvLogStore of {} paused on bootstrap", self.identity); diff --git a/src/stream/src/common/log_store_impl/subscription_log_store.rs b/src/stream/src/common/log_store_impl/subscription_log_store.rs index 999400771a30..39ada926826d 100644 --- a/src/stream/src/common/log_store_impl/subscription_log_store.rs +++ b/src/stream/src/common/log_store_impl/subscription_log_store.rs @@ -60,9 +60,7 @@ impl SubscriptionLogStoreWriter { epoch: risingwave_common::util::epoch::EpochPair, _pause_read_on_bootstrap: bool, ) -> LogStoreResult<()> { - self.state_store - .init(InitOptions::new(epoch, self.serde.vnodes().clone())) - .await?; + self.state_store.init(InitOptions::new(epoch)).await?; self.seq_id = FIRST_SEQ_ID; Ok(()) } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 87f61d487407..dba45d52f1e3 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -177,9 +177,7 @@ where /// async interface only used for replicated state table, /// as it needs to wait for prev epoch to be committed. pub async fn init_epoch(&mut self, epoch: EpochPair) -> StorageResult<()> { - self.local_store - .init(InitOptions::new(epoch, self.vnodes().clone())) - .await + self.local_store.init(InitOptions::new(epoch)).await } } @@ -195,7 +193,7 @@ where /// No need to `wait_for_epoch`, so it should complete immediately. pub fn init_epoch(&mut self, epoch: EpochPair) { self.local_store - .init(InitOptions::new(epoch, self.vnodes().clone())) + .init(InitOptions::new(epoch)) .now_or_never() .expect("non-replicated state store should start immediately.") .expect("non-replicated state store should not wait_for_epoch, and fail because of it.") @@ -359,9 +357,19 @@ where let table_option = TableOption::new(table_catalog.retention_seconds); let new_local_options = if IS_REPLICATED { - NewLocalOptions::new_replicated(table_id, op_consistency_level, table_option) + NewLocalOptions::new_replicated( + table_id, + op_consistency_level, + table_option, + distribution.vnodes().clone(), + ) } else { - NewLocalOptions::new(table_id, op_consistency_level, table_option) + NewLocalOptions::new( + table_id, + op_consistency_level, + table_option, + distribution.vnodes().clone(), + ) }; let local_state_store = store.new_local(new_local_options).await; @@ -573,6 +581,7 @@ where table_id, op_consistency_level, TableOption::default(), + distribution.vnodes().clone(), )) .await; let row_serde = make_row_serde(); diff --git a/src/stream/src/from_proto/subscription.rs b/src/stream/src/from_proto/subscription.rs index abc30a196743..a7e8b79b16c1 100644 --- a/src/stream/src/from_proto/subscription.rs +++ b/src/stream/src/from_proto/subscription.rs @@ -35,6 +35,17 @@ impl ExecutorBuilder for SubscriptionExecutorBuilder { ) -> StreamResult { let [input]: [_; 1] = params.input.try_into().unwrap(); let table_id = TableId::new(node.log_store_table.as_ref().unwrap().id); + let vnodes = std::sync::Arc::new( + params + .vnode_bitmap + .expect("vnodes not set for subscription"), + ); + let serde = LogStoreRowSerde::new( + node.log_store_table.as_ref().unwrap(), + Some(vnodes), + &KV_LOG_STORE_V2_INFO, + ); + let local_state_store = state_store .new_local(NewLocalOptions { table_id: TableId { @@ -45,19 +56,10 @@ impl ExecutorBuilder for SubscriptionExecutorBuilder { retention_seconds: None, }, is_replicated: false, + vnodes: serde.vnodes().clone(), }) .await; - let vnodes = std::sync::Arc::new( - params - .vnode_bitmap - .expect("vnodes not set for subscription"), - ); - let serde = LogStoreRowSerde::new( - node.log_store_table.as_ref().unwrap(), - Some(vnodes.clone()), - &KV_LOG_STORE_V2_INFO, - ); let log_store_identity = format!( "subscription[{}]-executor[{}]", node.subscription_catalog.as_ref().unwrap().id,