From 1e7f08956b1b61512ab416873e56d1a50999856d Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 22 Sep 2023 15:23:47 +0800 Subject: [PATCH] can merge --- src/common/src/config.rs | 10 ++++++++++ src/rpc_client/src/compactor_client.rs | 10 +++++----- src/storage/compactor/src/lib.rs | 20 ++++++++------------ src/storage/compactor/src/server.rs | 2 +- src/storage/src/hummock/compactor/mod.rs | 13 ++++++------- 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 3b7bc027be870..c1a81e9d5fe71 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -658,6 +658,16 @@ impl AsyncStackTraceOption { } } +#[derive(Debug, Default, Clone, Copy, ValueEnum)] +pub enum CompactorMode { + #[default] + #[clap(alias = "dedicated")] + Dedicated, + + #[clap(alias = "shared")] + Shared, +} + #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] pub struct HeapProfilingConfig { /// Enable to auto dump heap profile when memory usage is high diff --git a/src/rpc_client/src/compactor_client.rs b/src/rpc_client/src/compactor_client.rs index 5679237af3dbe..8574e76695740 100644 --- a/src/rpc_client/src/compactor_client.rs +++ b/src/rpc_client/src/compactor_client.rs @@ -95,7 +95,7 @@ impl GrpcCompactorProxyClient { } pub async fn get_new_sst_ids( - &mut self, + &self, request: GetNewSstIdsRequest, ) -> std::result::Result, tonic::Status> { let mut hummock_client = self.core.read().await.hummock_client.clone(); @@ -103,7 +103,7 @@ impl GrpcCompactorProxyClient { } pub async fn report_compaction_task( - &mut self, + &self, request: ReportCompactionTaskRequest, ) -> std::result::Result, tonic::Status> { let mut hummock_client = self.core.read().await.hummock_client.clone(); @@ -111,7 +111,7 @@ impl GrpcCompactorProxyClient { } pub async fn report_full_scan_task( - &mut self, + &self, request: ReportFullScanTaskRequest, ) -> std::result::Result, tonic::Status> { let mut hummock_client = self.core.read().await.hummock_client.clone(); @@ -119,7 +119,7 @@ impl GrpcCompactorProxyClient { } pub async fn report_vacuum_task( - &mut self, + &self, request: ReportVacuumTaskRequest, ) -> std::result::Result, tonic::Status> { let mut hummock_client = self.core.read().await.hummock_client.clone(); @@ -127,7 +127,7 @@ impl GrpcCompactorProxyClient { } pub async fn get_system_params( - &mut self, + &self, ) -> std::result::Result, tonic::Status> { let mut system_params_client = self.core.read().await.system_params_client.clone(); system_params_client diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 6d1dd048dd3f1..6cca88f6bfc89 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -18,7 +18,9 @@ pub mod server; mod telemetry; use clap::Parser; -use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig}; +use risingwave_common::config::{ + AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig, +}; use crate::server::{compactor_serve, shared_compactor_serve}; @@ -88,8 +90,8 @@ pub struct CompactorOpts { #[override_opts(path = storage.object_store_read_timeout_ms)] pub object_store_read_timeout_ms: Option, - #[clap(long, env = "RW_COMPACTOR_MODE", default_value = "dedicated")] - pub compactor_mode: String, + #[clap(long, env = "RW_COMPACTOR_MODE", value_enum)] + pub compactor_mode: Option, #[clap(long, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")] pub proxy_rpc_endpoint: String, @@ -101,8 +103,8 @@ use std::pin::Pin; pub fn start(opts: CompactorOpts) -> Pin + Send>> { // WARNING: don't change the function signature. Making it `async fn` will cause // slow compile in release mode. - match opts.compactor_mode.as_str() { - "shared" => Box::pin(async move { + match opts.compactor_mode { + Some(CompactorMode::Shared) => Box::pin(async move { tracing::info!("Shared compactor pod options: {:?}", opts); tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone()); @@ -113,7 +115,7 @@ pub fn start(opts: CompactorOpts) -> Pin + Send>> { join_handle.await.unwrap(); }), - "dedicated" => Box::pin(async move { + None | Some(CompactorMode::Dedicated) => Box::pin(async move { tracing::info!("Compactor node options: {:?}", opts); tracing::info!("meta address: {}", opts.meta_address.clone()); @@ -136,11 +138,5 @@ pub fn start(opts: CompactorOpts) -> Pin + Send>> { join_handle.await.unwrap(); observer_join_handle.abort(); }), - _ => { - panic!( - "Compactor mode {} is configured incorrectly", - opts.compactor_mode - ); - } } } diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 7e4a1046949ca..e099ed16ffcbd 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -328,7 +328,7 @@ pub async fn shared_compactor_serve( .connect() .await .expect("Failed to create channel via proxy rpc endpoint."); - let mut grpc_proxy_client = GrpcCompactorProxyClient::new(channel); + let grpc_proxy_client = GrpcCompactorProxyClient::new(channel); let system_params_response = grpc_proxy_client .get_system_params() .await diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index b776568c8fbd1..7012cf952317c 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -633,7 +633,7 @@ pub fn start_shared_compactor( let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval); let executor = context.compaction_executor.clone(); - let mut report_heartbeat_client = grpc_proxy_client.clone(); + let report_heartbeat_client = grpc_proxy_client.clone(); 'consume_stream: loop { let request: Option> = tokio::select! { _ = periodic_event_interval.tick() => { @@ -667,8 +667,7 @@ pub fn start_shared_compactor( let context = context.clone(); let shutdown = shutdown_map.clone(); - let mut report_task_client = grpc_proxy_client.clone(); - let get_sst_id_client = grpc_proxy_client.clone(); + let cloned_grpc_proxy_client = grpc_proxy_client.clone(); executor.spawn(async move { let DispatchCompactionTaskRequest { tables, @@ -689,7 +688,7 @@ pub fn start_shared_compactor( let mut output_object_ids_deque: VecDeque<_> = VecDeque::new(); output_object_ids_deque.extend(output_object_ids); let shared_compactor_object_id_manager = - SharedComapctorObjectIdManager::new(output_object_ids_deque, get_sst_id_client, context.storage_opts.sstable_id_remote_fetch_number); + SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number); match dispatch_task.unwrap() { dispatch_compaction_task_request::Task::CompactTask(compact_task) => { context.running_task_count.fetch_add(1, Ordering::SeqCst); @@ -714,7 +713,7 @@ pub fn start_shared_compactor( })), }; - match report_task_client + match cloned_grpc_proxy_client .report_compaction_task(report_compaction_task_request) .await { @@ -733,7 +732,7 @@ pub fn start_shared_compactor( let report_vacuum_task_request = ReportVacuumTaskRequest { vacuum_task: Some(vacuum_task), }; - match report_task_client.report_vacuum_task(report_vacuum_task_request).await { + match cloned_grpc_proxy_client.report_vacuum_task(report_vacuum_task_request).await { Ok(_) => tracing::info!("Finished vacuuming SSTs"), Err(e) => tracing::warn!("Failed to report vacuum task: {:#?}", e), } @@ -753,7 +752,7 @@ pub fn start_shared_compactor( total_object_count, total_object_size, }; - match report_task_client + match cloned_grpc_proxy_client .report_full_scan_task(report_full_scan_task_request) .await {