diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 4e5a70f369a72..f5ea06669e40c 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -218,9 +218,8 @@ pub async fn compute_node_serve( let memory_limiter = Arc::new(MemoryLimiter::new( storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2, )); - let compactor_context = Arc::new(CompactorContext { + let compactor_context = CompactorContext { storage_opts, - hummock_meta_client: hummock_meta_client.clone(), sstable_store: storage.sstable_store(), compactor_metrics: compactor_metrics.clone(), is_share_buffer_compact: false, @@ -231,10 +230,11 @@ pub async fn compute_node_serve( task_progress_manager: Default::default(), await_tree_reg: None, running_task_count: Arc::new(AtomicU32::new(0)), - }); + }; let (handle, shutdown_sender) = start_compactor( compactor_context, + hummock_meta_client.clone(), storage.sstable_object_id_manager().clone(), ); sub_tasks.push((handle, shutdown_sender)); diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 566ad8fd5e8ee..be4f15ca2aa5a 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -194,9 +194,8 @@ pub async fn compactor_serve( }; let await_tree_reg = await_tree_config.map(|c| Arc::new(RwLock::new(await_tree::Registry::new(c)))); - let compactor_context = Arc::new(CompactorContext { + let compactor_context = CompactorContext { storage_opts, - hummock_meta_client: hummock_meta_client.clone(), sstable_store: sstable_store.clone(), compactor_metrics, is_share_buffer_compact: false, @@ -211,7 +210,7 @@ pub async fn compactor_serve( task_progress_manager: Default::default(), await_tree_reg: await_tree_reg.clone(), running_task_count: Arc::new(AtomicU32::new(0)), - }); + }; let mut sub_tasks = vec![ MetaClient::start_heartbeat_loop( meta_client.clone(), @@ -220,6 +219,7 @@ pub async fn compactor_serve( ), risingwave_storage::hummock::compactor::start_compactor( compactor_context.clone(), + hummock_meta_client.clone(), sstable_object_id_manager.clone(), ), ]; diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 5db6d01030899..5925798eaebdd 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -167,13 +167,11 @@ pub(crate) mod tests { fn get_compactor_context_with_filter_key_extractor_manager( storage: &HummockStorage, - hummock_meta_client: &Arc, filter_key_extractor_manager: FilterKeyExtractorManagerRef, ) -> CompactorContext { get_compactor_context_with_filter_key_extractor_manager_impl( storage.storage_opts().clone(), storage.sstable_store(), - hummock_meta_client, filter_key_extractor_manager, ) } @@ -181,13 +179,11 @@ pub(crate) mod tests { fn get_compactor_context_with_filter_key_extractor_manager_impl( options: Arc, sstable_store: SstableStoreRef, - hummock_meta_client: &Arc, filter_key_extractor_manager: FilterKeyExtractorManagerRef, ) -> CompactorContext { CompactorContext { storage_opts: options, sstable_store, - hummock_meta_client: hummock_meta_client.clone(), compactor_metrics: Arc::new(CompactorMetrics::unused()), is_share_buffer_compact: false, compaction_executor: Arc::new(CompactionExecutor::new(Some(1))), @@ -236,7 +232,6 @@ pub(crate) mod tests { }; let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( &storage, - &hummock_meta_client, rpc_filter_key_extractor_manager, ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( @@ -292,7 +287,7 @@ pub(crate) mod tests { let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx.clone()), + compact_ctx.clone(), compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -400,7 +395,6 @@ pub(crate) mod tests { }; let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( &storage, - &hummock_meta_client, rpc_filter_key_extractor_manager, ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( @@ -455,7 +449,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -570,7 +564,6 @@ pub(crate) mod tests { pub(crate) fn prepare_compactor_and_filter( storage: &HummockStorage, - hummock_meta_client: &Arc, existing_table_id: u32, ) -> CompactorContext { let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() @@ -587,7 +580,6 @@ pub(crate) mod tests { get_compactor_context_with_filter_key_extractor_manager( storage, - hummock_meta_client, rpc_filter_key_extractor_manager, ) } @@ -701,7 +693,6 @@ pub(crate) mod tests { let compact_ctx = get_compactor_context_with_filter_key_extractor_manager_impl( global_storage.storage_opts().clone(), global_storage.sstable_store(), - &hummock_meta_client, rpc_filter_key_extractor_manager, ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( @@ -789,7 +780,7 @@ pub(crate) mod tests { // 4. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -888,7 +879,6 @@ pub(crate) mod tests { let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( &storage, - &hummock_meta_client, rpc_filter_key_extractor_manager.clone(), ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( @@ -978,7 +968,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -1088,7 +1078,6 @@ pub(crate) mod tests { ); let compact_ctx = get_compactor_context_with_filter_key_extractor_manager( &storage, - &hummock_meta_client, rpc_filter_key_extractor_manager, ); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( @@ -1164,7 +1153,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), @@ -1265,8 +1254,7 @@ pub(crate) mod tests { TableId::from(existing_table_id), ) .await; - let compact_ctx = - prepare_compactor_and_filter(&storage, &hummock_meta_client, existing_table_id); + let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), storage @@ -1323,7 +1311,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact( - Arc::new(compact_ctx), + compact_ctx, compact_task.clone(), rx, Box::new(sstable_object_id_manager.clone()), diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index 6a0326ce55687..c8515f2620b65 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -230,7 +230,7 @@ async fn test_syncpoints_test_local_notification_receiver() { pub async fn compact_once( hummock_manager_ref: HummockManagerRef, - compact_ctx: Arc, + compact_ctx: CompactorContext, sstable_object_id_manager: Arc, ) { // 2. get compact task @@ -290,11 +290,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { TableId::from(existing_table_id), ) .await; - let compact_ctx = Arc::new(prepare_compactor_and_filter( - &storage, - &hummock_meta_client, - existing_table_id, - )); + let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id); let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( hummock_meta_client.clone(), diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index f04121ff76179..b1fd6b5411643 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -161,7 +161,7 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact pub async fn generate_splits( sstable_infos: &Vec, compaction_size: u64, - context: Arc, + context: CompactorContext, ) -> HummockResult> { let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; if compaction_size > parallel_compact_size { @@ -231,7 +231,7 @@ pub async fn generate_splits( Ok(vec![]) } -pub fn estimate_task_output_capacity(context: Arc, task: &CompactTask) -> usize { +pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize { let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20); let total_input_uncompressed_file_size = task .input_ssts diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index cc22048489c20..6ce3d0b829bbc 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -60,7 +60,7 @@ pub struct CompactorRunner { impl CompactorRunner { pub fn new( split_index: usize, - context: Arc, + context: CompactorContext, task: CompactTask, object_id_getter: Box, ) -> Self { @@ -109,7 +109,7 @@ impl CompactorRunner { Self { compactor, compact_task: task, - sstable_store: context.sstable_store.clone(), + sstable_store: context.sstable_store, key_range, split_index, } @@ -235,7 +235,7 @@ impl CompactorRunner { /// Handles a compaction task and reports its status to hummock manager. /// Always return `Ok` and let hummock manager handle errors. pub async fn compact( - compactor_context: Arc, + compactor_context: CompactorContext, mut compact_task: CompactTask, mut shutdown_rx: Receiver<()>, object_id_getter: Box, @@ -543,7 +543,7 @@ pub async fn compact( /// Fills in the compact task and tries to report the task result to meta node. fn compact_done( mut compact_task: CompactTask, - context: Arc, + context: CompactorContext, output_ssts: Vec, task_status: TaskStatus, ) -> (CompactTask, HashMap) { diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index b5563ba391ccd..ad3d5ffcc2dd6 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -16,7 +16,6 @@ use std::sync::atomic::AtomicU32; use std::sync::Arc; use parking_lot::RwLock; -use risingwave_rpc_client::HummockMetaClient; use super::task_progress::TaskProgressManagerRef; use crate::filter_key_extractor::FilterKeyExtractorManager; @@ -32,9 +31,6 @@ pub struct CompactorContext { /// Storage options. pub storage_opts: Arc, - /// The meta client. - pub hummock_meta_client: Arc, - /// Sstable store that manages the sstables. pub sstable_store: SstableStoreRef, @@ -61,7 +57,6 @@ impl CompactorContext { pub fn new_local_compact_context( storage_opts: Arc, sstable_store: SstableStoreRef, - hummock_meta_client: Arc, compactor_metrics: Arc, filter_key_extractor_manager: FilterKeyExtractorManager, ) -> Self { @@ -77,7 +72,6 @@ impl CompactorContext { // not limit memory for local compact Self { storage_opts, - hummock_meta_client, sstable_store, compactor_metrics, is_share_buffer_compact: true, diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index c5d8cb05b96a4..17e0b31decc73 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -50,6 +50,7 @@ use risingwave_pb::hummock::{ CompactTaskProgress, CompactorWorkload, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, }; +use risingwave_rpc_client::HummockMetaClient; pub use shared_buffer_compact::{compact, merge_imms_in_memory}; use sysinfo::{CpuRefreshKind, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt}; use tokio::sync::oneshot::Sender; @@ -76,7 +77,7 @@ use crate::hummock::{ /// Implementation of Hummock compaction. pub struct Compactor { /// The context of the compactor. - context: Arc, + context: CompactorContext, object_id_getter: Box, task_config: TaskConfig, options: SstableBuilderOptions, @@ -88,7 +89,7 @@ pub type CompactOutput = (usize, Vec, CompactionStatistics); impl Compactor { /// Create a new compactor. pub fn new( - context: Arc, + context: CompactorContext, options: SstableBuilderOptions, task_config: TaskConfig, object_id_getter: Box, @@ -310,10 +311,10 @@ impl Compactor { /// manager and runs compaction tasks. #[cfg_attr(coverage, no_coverage)] pub fn start_compactor( - compactor_context: Arc, + compactor_context: CompactorContext, + hummock_meta_client: Arc, sstable_object_id_manager: Arc, ) -> (JoinHandle<()>, Sender<()>) { - let hummock_meta_client = compactor_context.hummock_meta_client.clone(); type CompactionShutdownMap = Arc>>>; let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); let stream_retry_interval = Duration::from_secs(30); diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 994dc3bab7799..6e01be793abf2 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -55,7 +55,7 @@ const GC_WATERMARK_FOR_FLUSH: u64 = 0; /// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group. pub async fn compact( - context: Arc, + context: CompactorContext, sstable_object_id_manager: SstableObjectIdManagerRef, payload: UploadTaskPayload, compaction_group_index: Arc>, @@ -110,7 +110,7 @@ pub async fn compact( /// For compaction from shared buffer to level 0, this is the only function gets called. async fn compact_shared_buffer( - context: Arc, + context: CompactorContext, sstable_object_id_manager: SstableObjectIdManagerRef, mut payload: UploadTaskPayload, ) -> HummockResult> { @@ -453,7 +453,7 @@ impl SharedBufferCompactRunner { pub fn new( split_index: usize, key_range: KeyRange, - context: Arc, + context: CompactorContext, sub_compaction_sstable_size: usize, split_weight_by_vnode: u32, use_block_based_filter: bool, diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 7a9c4e7796bd1..366f8af391428 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -132,7 +132,7 @@ pub struct HummockEventHandler { async fn flush_imms( payload: UploadTaskPayload, task_info: UploadTaskInfo, - compactor_context: Arc, + compactor_context: CompactorContext, sstable_object_id_manager: Arc, ) -> HummockResult> { for epoch in &task_info.epochs { @@ -158,7 +158,7 @@ impl HummockEventHandler { hummock_event_tx: mpsc::UnboundedSender, hummock_event_rx: mpsc::UnboundedReceiver, pinned_version: PinnedVersion, - compactor_context: Arc, + compactor_context: CompactorContext, sstable_object_id_manager: Arc, state_store_metrics: Arc, cache_refill_config: CacheRefillConfig, diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 88a24c65ff255..0e93eeff782bc 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -107,7 +107,7 @@ impl Drop for HummockStorageShutdownGuard { pub struct HummockStorage { hummock_event_sender: UnboundedSender, - context: Arc, + context: CompactorContext, sstable_object_id_manager: SstableObjectIdManagerRef, @@ -182,15 +182,14 @@ impl HummockStorage { hummock_meta_client.clone(), )); - let compactor_context = Arc::new(CompactorContext::new_local_compact_context( + let compactor_context = CompactorContext::new_local_compact_context( options.clone(), sstable_store.clone(), - hummock_meta_client.clone(), compactor_metrics.clone(), FilterKeyExtractorManager::RpcFilterKeyExtractorManager( filter_key_extractor_manager.clone(), ), - )); + ); let seal_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); let min_current_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 484f657791973..683cff8fac45d 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -575,9 +575,8 @@ fn run_compactor_thread( tokio::task::JoinHandle<()>, tokio::sync::oneshot::Sender<()>, ) { - let compactor_context = Arc::new(CompactorContext { + let compactor_context = CompactorContext { storage_opts, - hummock_meta_client: meta_client, sstable_store, compactor_metrics, is_share_buffer_compact: false, @@ -589,8 +588,8 @@ fn run_compactor_thread( task_progress_manager: Default::default(), await_tree_reg: None, running_task_count: Arc::new(AtomicU32::new(0)), - }); - start_compactor(compactor_context, sstable_object_id_manager) + }; + start_compactor(compactor_context, meta_client, sstable_object_id_manager) } #[cfg(test)]