From 2ead8939a4f6b24074cc99495d21fb92d3c90825 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 6 Mar 2024 16:39:47 +0800 Subject: [PATCH] resolve comments --- .../src/local_version_manager_tests.rs | 544 ------------------ src/storage/src/mem_table.rs | 6 +- 2 files changed, 4 insertions(+), 546 deletions(-) delete mode 100644 src/storage/hummock_test/src/local_version_manager_tests.rs diff --git a/src/storage/hummock_test/src/local_version_manager_tests.rs b/src/storage/hummock_test/src/local_version_manager_tests.rs deleted file mode 100644 index c56a9442a237d..0000000000000 --- a/src/storage/hummock_test/src/local_version_manager_tests.rs +++ /dev/null @@ -1,544 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use bytes::Bytes; -use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::HummockSstableObjectId; -use risingwave_meta::hummock::test_utils::{ - setup_compute_env, update_filter_key_extractor_for_table_ids, -}; -use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient}; -use risingwave_meta::manager::MetaSrvEnv; -use risingwave_meta::storage::MemStore; -use risingwave_pb::common::WorkerNode; -use risingwave_pb::hummock::version_update_payload::Payload; -use risingwave_pb::hummock::HummockVersion; -use risingwave_storage::filter_key_extractor::FilterKeyExtractorManager; -use risingwave_storage::hummock::compactor::CompactorContext; -use risingwave_storage::hummock::event_handler::hummock_event_handler::BufferTracker; -use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; -use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; -use risingwave_storage::hummock::shared_buffer::UncommittedData; -use risingwave_storage::hummock::test_utils::{ - default_opts_for_test, gen_dummy_batch, gen_dummy_batch_several_keys, gen_dummy_sst_info, -}; -use risingwave_storage::hummock::SstableObjectIdManager; -use risingwave_storage::monitor::CompactorMetrics; -use risingwave_storage::opts::StorageOpts; -use risingwave_storage::storage_value::StorageValue; - -use crate::test_utils::prepare_first_valid_version; - -pub async fn prepare_local_version_manager( - opt: Arc, - env: MetaSrvEnv, - hummock_manager_ref: HummockManagerRef, - worker_node: WorkerNode, -) -> LocalVersionManagerRef { - let (pinned_version, _, _) = - prepare_first_valid_version(env, hummock_manager_ref.clone(), worker_node.clone()).await; - - let sstable_store = mock_sstable_store(); - - let hummock_meta_client = Arc::new(MockHummockMetaClient::new( - hummock_manager_ref.clone(), - worker_node.id, - )); - - let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( - hummock_meta_client.clone(), - opt.sstable_id_remote_fetch_number, - )); - - let filter_key_extractor_manager = Arc::new(FilterKeyExtractorManager::default()); - update_filter_key_extractor_for_table_ids(&filter_key_extractor_manager, &[0]); - - let buffer_tracker = BufferTracker::from_storage_opts(&opt); - let compactor_context = Arc::new(CompactorContext::new_local_compact_context( - opt, - sstable_store, - hummock_meta_client, - Arc::new(CompactorMetrics::unused()), - sstable_object_id_manager, - filter_key_extractor_manager, - )); - - LocalVersionManager::new(pinned_version, compactor_context, buffer_tracker) -} - -#[tokio::test] -async fn test_update_pinned_version() { - let opt = Arc::new(default_opts_for_test()); - let (env, hummock_manager_ref, _, worker_node) = setup_compute_env(8080).await; - let local_version_manager = - prepare_local_version_manager(opt, env, hummock_manager_ref, worker_node).await; - - let pinned_version = local_version_manager.get_pinned_version(); - let initial_version_id = pinned_version.id(); - let initial_max_commit_epoch = pinned_version.max_committed_epoch(); - - let epochs: Vec = vec![ - test_epoch(initial_max_commit_epoch+1), - test_epoch(initial_max_commit_epoch+2), - test_epoch(initial_max_commit_epoch+3), - test_epoch(initial_max_commit_epoch+4) - ]; - let batches: Vec> = - epochs.iter().map(|e| gen_dummy_batch(*e)).collect(); - - // Fill shared buffer with a dummy empty batch in epochs[0] and epochs[1] - for i in 0..2 { - local_version_manager - .write_shared_buffer(epochs[i].as_u64(), batches[i].clone(), vec![], Default::default()) - .await - .unwrap(); - let local_version = local_version_manager.get_local_version(); - assert_eq!( - local_version.get_shared_buffer(epochs[i].as_u64()).unwrap().size(), - SharedBufferBatch::measure_batch_size( - &SharedBufferBatch::build_shared_buffer_item_batches(batches[i].clone()) - ) - ); - } - - local_version_manager - .write_shared_buffer(epochs[2].as_u64(), batches[2].clone(), vec![], Default::default()) - .await - .unwrap(); - let local_version = local_version_manager.get_local_version(); - assert!(local_version.get_shared_buffer(epochs[2]).is_some(),); - - let build_batch = |pairs, epoch| { - SharedBufferBatch::for_test( - SharedBufferBatch::build_shared_buffer_item_batches(pairs), - epoch, - TableId::from(0), - ) - }; - - let read_version = - local_version_manager.read_filter::<_, &[u8]>(epochs[0], TableId::default(), &(..)); - assert_eq!( - read_version.shared_buffer_data, - vec![vec![vec![UncommittedData::Batch(build_batch( - batches[0].clone(), - epochs[0] - ))]]] - ); - - let read_version = - local_version_manager.read_filter::<_, &[u8]>(epochs[1], TableId::default(), &(..)); - assert_eq!( - read_version.shared_buffer_data, - vec![ - vec![vec![UncommittedData::Batch(build_batch( - batches[1].clone(), - epochs[1].as_u64() - ))]], - vec![vec![UncommittedData::Batch(build_batch( - batches[0].clone(), - epochs[0].as_u64() - ))]] - ] - ); - - let read_version = - local_version_manager.read_filter::<_, &[u8]>(epochs[2], TableId::default(), &(..)); - assert_eq!( - read_version.shared_buffer_data, - vec![ - vec![vec![UncommittedData::Batch(build_batch( - batches[2].clone(), - epochs[2].as_u64() - ))]], - vec![vec![UncommittedData::Batch(build_batch( - batches[1].clone(), - epochs[1].as_u64() - ))]], - vec![vec![UncommittedData::Batch(build_batch( - batches[0].clone(), - epochs[0].as_u64() - ))]] - ] - ); - - let _ = local_version_manager - .sync_shared_buffer(epochs[0]) - .await - .unwrap(); - - // Update version for epochs[0] - let version = HummockVersion { - id: initial_version_id + 1, - max_committed_epoch: epochs[0].as_u64(), - ..Default::default() - }; - local_version_manager.try_update_pinned_version(Payload::PinnedVersion(version)); - let local_version = local_version_manager.get_local_version(); - assert!(local_version.get_shared_buffer(epochs[0]).is_none()); - assert_eq!( - local_version.get_shared_buffer(epochs[1]).unwrap().size(), - SharedBufferBatch::measure_batch_size( - &SharedBufferBatch::build_shared_buffer_item_batches(batches[1].clone()) - ) - ); - - let _ = local_version_manager - .sync_shared_buffer(epochs[1]) - .await - .unwrap(); - - // Update version for epochs[1] - let version = HummockVersion { - id: initial_version_id + 2, - max_committed_epoch: epochs[1].as_u64(), - ..Default::default() - }; - local_version_manager.try_update_pinned_version(Payload::PinnedVersion(version)); - let local_version = local_version_manager.get_local_version(); - assert!(local_version.get_shared_buffer(epochs[0]).is_none()); - assert!(local_version.get_shared_buffer(epochs[1]).is_none()); - - let _ = local_version_manager - .sync_shared_buffer(epochs[2].as_u64()) - .await - .unwrap(); - // Update version for epochs[2] - let version = HummockVersion { - id: initial_version_id + 3, - max_committed_epoch: epochs[2].as_u64(), - ..Default::default() - }; - - local_version_manager.try_update_pinned_version(Payload::PinnedVersion(version)); - assert!(local_version.get_shared_buffer(epochs[0].as_u64()).is_none()); - assert!(local_version.get_shared_buffer(epochs[1].as_u64()).is_none()); -} - -#[tokio::test] -async fn test_update_uncommitted_ssts() { - let mut opt = default_opts_for_test(); - opt.share_buffers_sync_parallelism = 2; - opt.sstable_size_mb = 1; - let opt = Arc::new(opt); - let (env, hummock_manager_ref, _, worker_node) = setup_compute_env(8080).await; - let local_version_manager = - prepare_local_version_manager(opt, env, hummock_manager_ref, worker_node).await; - - let pinned_version = local_version_manager.get_pinned_version(); - let max_commit_epoch = pinned_version.max_committed_epoch(); - let initial_id = pinned_version.id(); - let version = pinned_version.version(); - - let epochs: Vec = vec![max_commit_epoch + 1, max_commit_epoch + 2]; - let kvs: Vec> = epochs - .iter() - .map(|_| gen_dummy_batch_several_keys(2000)) - .collect(); - let mut batches = Vec::with_capacity(kvs.len()); - - // Fill shared buffer with dummy batches - for i in 0..2 { - local_version_manager - .write_shared_buffer(epochs[i], kvs[i].clone(), vec![], Default::default()) - .await - .unwrap(); - let local_version = local_version_manager.get_local_version(); - let batch = SharedBufferBatch::for_test( - SharedBufferBatch::build_shared_buffer_item_batches(kvs[i].clone()), - epochs[i], - Default::default(), - ); - assert_eq!( - local_version.get_shared_buffer(epochs[i]).unwrap().size(), - batch.size(), - ); - batches.push(batch); - } - - // Update uncommitted sst for epochs[0] - let sst1 = gen_dummy_sst_info(1, vec![batches[0].clone()], TableId::default(), epochs[0]); - { - let (payload, task_size) = { - let mut local_version_guard = local_version_manager.local_version().write(); - local_version_guard.advance_max_sync_epoch(epochs[0]); - let (payload, task_size) = local_version_guard.start_syncing(epochs[0]); - { - assert_eq!(1, payload.len()); - assert_eq!(1, payload[0].len()); - assert_eq!(payload[0][0], UncommittedData::Batch(batches[0].clone())); - assert_eq!(task_size, batches[0].size()); - } - (payload, task_size) - }; - // Check uncommitted ssts - local_version_manager - .run_sync_upload_task(payload, Arc::new(HashMap::new()), task_size, epochs[0]) - .await - .unwrap(); - let epoch_uncommitted_ssts = local_version_manager - .get_local_version() - .get_synced_ssts(epochs[0]) - .clone(); - assert_eq!(epoch_uncommitted_ssts.len(), 2); - assert_eq!( - epoch_uncommitted_ssts - .first() - .unwrap() - .sst_info - .key_range - .as_ref() - .unwrap() - .left, - sst1.key_range.as_ref().unwrap().left, - ); - assert_eq!( - epoch_uncommitted_ssts - .last() - .unwrap() - .sst_info - .key_range - .as_ref() - .unwrap() - .right, - sst1.key_range.as_ref().unwrap().right, - ); - } - - let local_version = local_version_manager.get_local_version(); - // Check shared buffer - assert!(local_version.get_shared_buffer(epochs[0]).is_none()); - assert_eq!( - local_version.get_shared_buffer(epochs[1]).unwrap().size(), - batches[1].size(), - ); - - // Check pinned version - assert_eq!(local_version.pinned_version().version(), version); - assert_eq!(local_version.iter_shared_buffer().count(), 1); - - // Update uncommitted sst for epochs[1] - let sst2 = gen_dummy_sst_info(2, vec![batches[1].clone()], TableId::default(), epochs[1]); - { - let (payload, task_size) = { - let mut local_version_guard = local_version_manager.local_version().write(); - local_version_guard.advance_max_sync_epoch(epochs[1]); - let (payload, task_size) = local_version_guard.start_syncing(epochs[1]); - { - assert_eq!(1, payload.len()); - assert_eq!(1, payload[0].len()); - assert_eq!(payload[0][0], UncommittedData::Batch(batches[1].clone())); - assert_eq!(task_size, batches[1].size()); - } - (payload, task_size) - }; - - local_version_manager - .run_sync_upload_task(payload, Arc::new(HashMap::new()), task_size, epochs[1]) - .await - .unwrap(); - let epoch_uncommitted_ssts = local_version_manager - .get_local_version() - .get_synced_ssts(epochs[1]) - .clone(); - assert_eq!(epoch_uncommitted_ssts.len(), 2); - assert_eq!( - epoch_uncommitted_ssts - .first() - .unwrap() - .sst_info - .key_range - .as_ref() - .unwrap() - .left, - sst2.key_range.as_ref().unwrap().left, - ); - assert_eq!( - epoch_uncommitted_ssts - .last() - .unwrap() - .sst_info - .key_range - .as_ref() - .unwrap() - .right, - sst2.key_range.as_ref().unwrap().right, - ); - } - let local_version = local_version_manager.get_local_version(); - // Check shared buffer - for epoch in &epochs { - assert!(local_version.get_shared_buffer(*epoch).is_none()); - } - // Check pinned version - assert_eq!(local_version.pinned_version().version(), version); - - // Update version for epochs[0] - let version = HummockVersion { - id: initial_id + 1, - max_committed_epoch: epochs[0], - ..Default::default() - }; - assert!(local_version_manager - .try_update_pinned_version(Payload::PinnedVersion(version.clone())) - .is_some()); - let local_version = local_version_manager.get_local_version(); - // Check shared buffer - assert!(local_version.get_shared_buffer(epochs[0]).is_none()); - assert!(local_version.get_shared_buffer(epochs[1]).is_none()); - // Check pinned version - assert_eq!(local_version.pinned_version().version(), version); - assert!(local_version.get_shared_buffer(epochs[0]).is_none()); - - // Update version for epochs[1] - let version = HummockVersion { - id: initial_id + 2, - max_committed_epoch: epochs[1], - ..Default::default() - }; - local_version_manager.try_update_pinned_version(Payload::PinnedVersion(version.clone())); - let local_version = local_version_manager.get_local_version(); - assert!(local_version.get_shared_buffer(epochs[0]).is_none()); - assert!(local_version.get_shared_buffer(epochs[1]).is_none()); - // Check pinned version - assert_eq!(local_version.pinned_version().version(), version); - // Check uncommitted ssts - assert!(local_version.get_shared_buffer(epochs[0]).is_none()); - assert!(local_version.get_shared_buffer(epochs[1]).is_none()); -} - -#[tokio::test] -async fn test_clear_shared_buffer() { - let opt = Arc::new(default_opts_for_test()); - let (env, hummock_manager_ref, _, worker_node) = setup_compute_env(8080).await; - let local_version_manager = - prepare_local_version_manager(opt, env, hummock_manager_ref, worker_node).await; - - let pinned_version = local_version_manager.get_pinned_version(); - let initial_max_commit_epoch = pinned_version.max_committed_epoch(); - - let epochs: Vec = vec![initial_max_commit_epoch + 1, initial_max_commit_epoch + 2]; - let batches: Vec> = - epochs.iter().map(|e| gen_dummy_batch(*e)).collect(); - - // Fill shared buffer with a dummy empty batch in epochs[0] and epochs[1] - for i in 0..2 { - local_version_manager - .write_shared_buffer(epochs[i], batches[i].clone(), vec![], Default::default()) - .await - .unwrap(); - let local_version = local_version_manager.get_local_version(); - assert_eq!( - local_version.get_shared_buffer(epochs[i]).unwrap().size(), - SharedBufferBatch::measure_batch_size( - &SharedBufferBatch::build_shared_buffer_item_batches(batches[i].clone()) - ) - ); - } - - // Clear shared buffer and check - local_version_manager.clear_shared_buffer(); - let local_version = local_version_manager.get_local_version(); - assert_eq!(local_version.iter_shared_buffer().count(), 0); - - assert_eq!( - local_version_manager - .sstable_object_id_manager() - .global_watermark_object_id(), - HummockSstableObjectId::MAX - ); -} - -#[tokio::test] -async fn test_sst_gc_watermark() { - let opt = Arc::new(default_opts_for_test()); - let (env, hummock_manager_ref, _, worker_node) = setup_compute_env(8080).await; - let local_version_manager = - prepare_local_version_manager(opt, env, hummock_manager_ref, worker_node).await; - - let pinned_version = local_version_manager.get_pinned_version(); - let initial_version_id = pinned_version.id(); - let initial_max_commit_epoch = pinned_version.max_committed_epoch(); - - let epochs: Vec = vec![initial_max_commit_epoch + 1, initial_max_commit_epoch + 2]; - let batches: Vec> = - epochs.iter().map(|e| gen_dummy_batch(*e)).collect(); - - assert_eq!( - local_version_manager - .sstable_object_id_manager() - .global_watermark_object_id(), - HummockSstableObjectId::MAX - ); - - for i in 0..2 { - local_version_manager - .write_shared_buffer(epochs[i], batches[i].clone(), vec![], Default::default()) - .await - .unwrap(); - } - - assert_eq!( - local_version_manager - .sstable_object_id_manager() - .global_watermark_object_id(), - HummockSstableObjectId::MAX - ); - - for epoch in &epochs { - let _ = local_version_manager - .sync_shared_buffer(*epoch) - .await - .unwrap(); - - // Global watermark determined by epoch 0. - assert_eq!( - local_version_manager - .sstable_object_id_manager() - .global_watermark_object_id(), - 1 - ); - } - - let version = HummockVersion { - id: initial_version_id + 1, - max_committed_epoch: epochs[0], - ..Default::default() - }; - // Watermark held by epoch 0 is removed. - local_version_manager.try_update_pinned_version(Payload::PinnedVersion(version)); - // Global watermark determined by epoch 1. - assert_eq!( - local_version_manager - .sstable_object_id_manager() - .global_watermark_object_id(), - 2 - ); - - let version = HummockVersion { - id: initial_version_id + 2, - max_committed_epoch: epochs[1], - ..Default::default() - }; - local_version_manager.try_update_pinned_version(Payload::PinnedVersion(version)); - assert_eq!( - local_version_manager - .sstable_object_id_manager() - .global_watermark_object_id(), - HummockSstableObjectId::MAX - ); -} diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 363035cd7d381..aed84ccf1e2ed 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -667,7 +667,9 @@ mod tests { use rand::{thread_rng, Rng}; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; - use risingwave_common::util::epoch::{EPOCH_AVAILABLE_BITS, EPOCH_INC_MIN_STEP_FOR_TEST}; + use risingwave_common::util::epoch::{ + EpochExt, EPOCH_AVAILABLE_BITS, EPOCH_INC_MIN_STEP_FOR_TEST, + }; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::EpochWithGap; @@ -930,7 +932,7 @@ mod tests { check_data(&mut iter, &ordered_test_data).await; // Test seek with a later epoch, the first key is not skipped - let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH + EPOCH_INC_MIN_STEP_FOR_TEST); + let later_epoch = EpochWithGap::new_from_epoch(TEST_EPOCH.next_epoch()); let seek_idx = 500; iter.seek(FullKey { user_key: UserKey {