From 3e9b32bbbb73b8c33ff037fef35c0ee9783d0ada Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 1 Sep 2023 15:17:17 +0800 Subject: [PATCH] remove meta client from context --- src/compute/src/server.rs | 6 ++--- src/storage/compactor/src/server.rs | 6 ++--- .../hummock_test/src/compactor_tests.rs | 26 +++++-------------- .../hummock_test/src/sync_point_tests.rs | 8 ++---- .../src/hummock/compactor/compaction_utils.rs | 4 +-- .../src/hummock/compactor/compactor_runner.rs | 8 +++--- src/storage/src/hummock/compactor/context.rs | 6 ----- src/storage/src/hummock/compactor/mod.rs | 9 ++++--- .../compactor/shared_buffer_compact.rs | 6 ++--- .../event_handler/hummock_event_handler.rs | 4 +-- src/storage/src/hummock/mod.rs | 7 +++-- .../src/delete_range_runner.rs | 7 +++-- 12 files changed, 37 insertions(+), 60 deletions(-) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index ad3817df74507..c96dde2baf159 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -216,9 +216,8 @@ pub async fn compute_node_serve( let memory_limiter = Arc::new(MemoryLimiter::new( storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2, )); - let compactor_context = Arc::new(CompactorContext { + let compactor_context = CompactorContext { storage_opts, - hummock_meta_client: hummock_meta_client.clone(), sstable_store: storage.sstable_store(), compactor_metrics: compactor_metrics.clone(), is_share_buffer_compact: false, @@ -229,10 +228,11 @@ pub async fn compute_node_serve( task_progress_manager: Default::default(), await_tree_reg: None, running_task_count: Arc::new(AtomicU32::new(0)), - }); + }; let (handle, shutdown_sender) = start_compactor( compactor_context, + hummock_meta_client.clone(), storage.sstable_object_id_manager().clone(), ); sub_tasks.push((handle, shutdown_sender)); diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 566ad8fd5e8ee..be4f15ca2aa5a 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -194,9 +194,8 @@ pub async fn compactor_serve( }; let await_tree_reg = await_tree_config.map(|c| Arc::new(RwLock::new(await_tree::Registry::new(c)))); - let compactor_context = Arc::new(CompactorContext { + let compactor_context = CompactorContext { storage_opts, - hummock_meta_client: hummock_meta_client.clone(), sstable_store: sstable_store.clone(), compactor_metrics, is_share_buffer_compact: false, @@ -211,7 +210,7 @@ pub async fn compactor_serve( task_progress_manager: Default::default(), await_tree_reg: await_tree_reg.clone(), running_task_count: Arc::new(AtomicU32::new(0)), - }); + }; let mut sub_tasks = vec![ MetaClient::start_heartbeat_loop( meta_client.clone(), @@ -220,6 +219,7 @@ pub async fn compactor_serve( ), risingwave_storage::hummock::compactor::start_compactor( compactor_context.clone(), + hummock_meta_client.clone(), sstable_object_id_manager.clone(), ), ]; diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 0be24a4653e17..3eb28a5efa78c 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -166,13 +166,11 @@ pub(crate) mod tests { fn get_compactor_context_with_filter_key_extractor_manager( storage: &HummockStorage, - hummock_meta_client: &Arc, filter_key_extractor_manager: FilterKeyExtractorManagerRef, ) -> CompactorContext { get_compactor_context_with_filter_key_extractor_manager_impl( storage.storage_opts().clone(), storage.sstable_store(), - hummock_meta_client, filter_key_extractor_manager, ) } @@ -180,13 +178,11 @@ pub(crate) mod tests { fn get_compactor_context_with_filter_key_extractor_manager_impl( options: Arc, sstable_store: SstableStoreRef, - hummock_meta_client: &Arc, filter_key_extractor_manager: FilterKeyExtractorManagerRef, ) -> CompactorContext { CompactorContext { storage_opts: options, sstable_store, - hummock_meta_client: hummock_meta_client.clone(), compactor_metrics: Arc::new(CompactorMetrics::unused()), is_share_buffer_compact: false, compaction_executor: Arc::new(CompactionExecutor::new(Some(1))), @@ -235,7 +231,6 @@ pub(crate) mod tests { }; let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( &storage, - &hummock_meta_client, rpc_filter_key_extractor_manager, ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( @@ -291,7 +286,7 @@ pub(crate) mod tests { let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx.clone()), + compact_ctx.clone(), compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -399,7 +394,6 @@ pub(crate) mod tests { }; let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( &storage, - &hummock_meta_client, rpc_filter_key_extractor_manager, ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( @@ -454,7 +448,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -569,7 +563,6 @@ pub(crate) mod tests { pub(crate) fn prepare_compactor_and_filter( storage: &HummockStorage, - hummock_meta_client: &Arc, existing_table_id: u32, ) -> CompactorContext { let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() @@ -586,7 +579,6 @@ pub(crate) mod tests { get_compactor_context_with_filter_key_extractor_manager( storage, - hummock_meta_client, rpc_filter_key_extractor_manager, ) } @@ -700,7 +692,6 @@ pub(crate) mod tests { let compact_ctx = get_compactor_context_with_filter_key_extractor_manager_impl( global_storage.storage_opts().clone(), global_storage.sstable_store(), - &hummock_meta_client, rpc_filter_key_extractor_manager, ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( @@ -788,7 +779,7 @@ pub(crate) mod tests { // 4. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -887,7 +878,6 @@ pub(crate) mod tests { let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( &storage, - &hummock_meta_client, rpc_filter_key_extractor_manager.clone(), ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( @@ -977,7 +967,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -1087,7 +1077,6 @@ pub(crate) mod tests { ); let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( &storage, - &hummock_meta_client, rpc_filter_key_extractor_manager, ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( @@ -1163,7 +1152,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -1264,8 +1253,7 @@ pub(crate) mod tests { TableId::from(existing_table_id), ) .await; - let compact_ctx = - prepare_compactor_and_filter(&storage, &hummock_meta_client, existing_table_id); + let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), storage @@ -1322,7 +1310,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index b0895ec64c1e9..02fa93053b3ce 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -229,7 +229,7 @@ async fn test_syncpoints_test_local_notification_receiver() { pub async fn compact_once( hummock_manager_ref: HummockManagerRef, - compact_ctx: Arc, + compact_ctx: CompactorContext, sstable_object_id_manager: Arc, ) { // 2. get compact task @@ -289,11 +289,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { TableId::from(existing_table_id), ) .await; - let compact_ctx = Arc::new(prepare_compactor_and_filter( - &storage, - &hummock_meta_client, - existing_table_id, - )); + let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index f04121ff76179..b1fd6b5411643 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -161,7 +161,7 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact pub async fn generate_splits( sstable_infos: &Vec, compaction_size: u64, - context: Arc, + context: CompactorContext, ) -> HummockResult> { let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; if compaction_size > parallel_compact_size { @@ -231,7 +231,7 @@ pub async fn generate_splits( Ok(vec![]) } -pub fn estimate_task_output_capacity(context: Arc, task: &CompactTask) -> usize { +pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize { let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20); let total_input_uncompressed_file_size = task .input_ssts diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index cc22048489c20..6ce3d0b829bbc 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -60,7 +60,7 @@ pub struct CompactorRunner { impl CompactorRunner { pub fn new( split_index: usize, - context: Arc, + context: CompactorContext, task: CompactTask, object_id_getter: Box, ) -> Self { @@ -109,7 +109,7 @@ impl CompactorRunner { Self { compactor, compact_task: task, - sstable_store: context.sstable_store.clone(), + sstable_store: context.sstable_store, key_range, split_index, } @@ -235,7 +235,7 @@ impl CompactorRunner { /// Handles a compaction task and reports its status to hummock manager. /// Always return `Ok` and let hummock manager handle errors. pub async fn compact( - compactor_context: Arc, + compactor_context: CompactorContext, mut compact_task: CompactTask, mut shutdown_rx: Receiver<()>, object_id_getter: Box, @@ -543,7 +543,7 @@ pub async fn compact( /// Fills in the compact task and tries to report the task result to meta node. fn compact_done( mut compact_task: CompactTask, - context: Arc, + context: CompactorContext, output_ssts: Vec, task_status: TaskStatus, ) -> (CompactTask, HashMap) { diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index b5563ba391ccd..ad3d5ffcc2dd6 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -16,7 +16,6 @@ use std::sync::atomic::AtomicU32; use std::sync::Arc; use parking_lot::RwLock; -use risingwave_rpc_client::HummockMetaClient; use super::task_progress::TaskProgressManagerRef; use crate::filter_key_extractor::FilterKeyExtractorManager; @@ -32,9 +31,6 @@ pub struct CompactorContext { /// Storage options. pub storage_opts: Arc, - /// The meta client. - pub hummock_meta_client: Arc, - /// Sstable store that manages the sstables. pub sstable_store: SstableStoreRef, @@ -61,7 +57,6 @@ impl CompactorContext { pub fn new_local_compact_context( storage_opts: Arc, sstable_store: SstableStoreRef, - hummock_meta_client: Arc, compactor_metrics: Arc, filter_key_extractor_manager: FilterKeyExtractorManager, ) -> Self { @@ -77,7 +72,6 @@ impl CompactorContext { // not limit memory for local compact Self { storage_opts, - hummock_meta_client, sstable_store, compactor_metrics, is_share_buffer_compact: true, diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index c5d8cb05b96a4..17e0b31decc73 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -50,6 +50,7 @@ use risingwave_pb::hummock::{ CompactTaskProgress, CompactorWorkload, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, }; +use risingwave_rpc_client::HummockMetaClient; pub use shared_buffer_compact::{compact, merge_imms_in_memory}; use sysinfo::{CpuRefreshKind, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt}; use tokio::sync::oneshot::Sender; @@ -76,7 +77,7 @@ use crate::hummock::{ /// Implementation of Hummock compaction. pub struct Compactor { /// The context of the compactor. - context: Arc, + context: CompactorContext, object_id_getter: Box, task_config: TaskConfig, options: SstableBuilderOptions, @@ -88,7 +89,7 @@ pub type CompactOutput = (usize, Vec, CompactionStatistics); impl Compactor { /// Create a new compactor. pub fn new( - context: Arc, + context: CompactorContext, options: SstableBuilderOptions, task_config: TaskConfig, object_id_getter: Box, @@ -310,10 +311,10 @@ impl Compactor { /// manager and runs compaction tasks. #[cfg_attr(coverage, no_coverage)] pub fn start_compactor( - compactor_context: Arc, + compactor_context: CompactorContext, + hummock_meta_client: Arc, sstable_object_id_manager: Arc, ) -> (JoinHandle<()>, Sender<()>) { - let hummock_meta_client = compactor_context.hummock_meta_client.clone(); type CompactionShutdownMap = Arc>>>; let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); let stream_retry_interval = Duration::from_secs(30); diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 994dc3bab7799..6e01be793abf2 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -55,7 +55,7 @@ const GC_WATERMARK_FOR_FLUSH: u64 = 0; /// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group. pub async fn compact( - context: Arc, + context: CompactorContext, sstable_object_id_manager: SstableObjectIdManagerRef, payload: UploadTaskPayload, compaction_group_index: Arc>, @@ -110,7 +110,7 @@ pub async fn compact( /// For compaction from shared buffer to level 0, this is the only function gets called. async fn compact_shared_buffer( - context: Arc, + context: CompactorContext, sstable_object_id_manager: SstableObjectIdManagerRef, mut payload: UploadTaskPayload, ) -> HummockResult> { @@ -453,7 +453,7 @@ impl SharedBufferCompactRunner { pub fn new( split_index: usize, key_range: KeyRange, - context: Arc, + context: CompactorContext, sub_compaction_sstable_size: usize, split_weight_by_vnode: u32, use_block_based_filter: bool, diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 6bf8f463c8189..56fa2d9365dc2 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -133,7 +133,7 @@ pub struct HummockEventHandler { async fn flush_imms( payload: UploadTaskPayload, task_info: UploadTaskInfo, - compactor_context: Arc, + compactor_context: CompactorContext, sstable_object_id_manager: Arc, ) -> HummockResult> { for epoch in &task_info.epochs { @@ -159,7 +159,7 @@ impl HummockEventHandler { hummock_event_tx: mpsc::UnboundedSender, hummock_event_rx: mpsc::UnboundedReceiver, pinned_version: PinnedVersion, - compactor_context: Arc, + compactor_context: CompactorContext, sstable_object_id_manager: Arc, state_store_metrics: Arc, refill_data_file_cache_levels: HashSet, diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index fea6600f63b0f..9c72ea902cd56 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -105,7 +105,7 @@ impl Drop for HummockStorageShutdownGuard { pub struct HummockStorage { hummock_event_sender: UnboundedSender, - context: Arc, + context: CompactorContext, sstable_object_id_manager: SstableObjectIdManagerRef, @@ -180,15 +180,14 @@ impl HummockStorage { hummock_meta_client.clone(), )); - let compactor_context = Arc::new(CompactorContext::new_local_compact_context( + let compactor_context = CompactorContext::new_local_compact_context( options.clone(), sstable_store.clone(), - hummock_meta_client.clone(), compactor_metrics.clone(), FilterKeyExtractorManager::RpcFilterKeyExtractorManager( filter_key_extractor_manager.clone(), ), - )); + ); let seal_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); let min_current_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 0b42262b09c83..fa42f4a484f69 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -574,9 +574,8 @@ fn run_compactor_thread( tokio::task::JoinHandle<()>, tokio::sync::oneshot::Sender<()>, ) { - let compactor_context = Arc::new(CompactorContext { + let compactor_context = CompactorContext { storage_opts, - hummock_meta_client: meta_client, sstable_store, compactor_metrics, is_share_buffer_compact: false, @@ -588,8 +587,8 @@ fn run_compactor_thread( task_progress_manager: Default::default(), await_tree_reg: None, running_task_count: Arc::new(AtomicU32::new(0)), - }); - start_compactor(compactor_context, sstable_object_id_manager) + }; + start_compactor(compactor_context, meta_client, sstable_object_id_manager) } #[cfg(test)]