From 32ae94404712a9015bb2837ee12189acd7786add Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 19 Sep 2023 14:35:02 +0800 Subject: [PATCH] compactor mode be an enum --- src/common/src/config.rs | 10 ++++++++++ src/storage/compactor/src/lib.rs | 20 ++++++++------------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 2924b5dcdbf6b..a0ca5e2e6f1dd 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -656,6 +656,16 @@ impl AsyncStackTraceOption { } } +#[derive(Debug, Default, Clone, Copy, ValueEnum)] +pub enum CompactorMode { + #[default] + #[clap(alias = "dedicated")] + Dedicated, + + #[clap(alias = "shared")] + Shared, +} + #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] pub struct HeapProfilingConfig { /// Enable to auto dump heap profile when memory usage is high diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 6d1dd048dd3f1..ed7fdb4d39cca 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -18,7 +18,9 @@ pub mod server; mod telemetry; use clap::Parser; -use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig}; +use risingwave_common::config::{ + AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig, +}; use crate::server::{compactor_serve, shared_compactor_serve}; @@ -88,8 +90,8 @@ pub struct CompactorOpts { #[override_opts(path = storage.object_store_read_timeout_ms)] pub object_store_read_timeout_ms: Option, - #[clap(long, env = "RW_COMPACTOR_MODE", default_value = "dedicated")] - pub compactor_mode: String, + #[clap(long, env = "RW_COMPACTOR_MODE", value_enum)] + pub compactor_mode: CompactorMode, #[clap(long, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")] pub proxy_rpc_endpoint: String, @@ -101,8 +103,8 @@ use std::pin::Pin; pub fn start(opts: CompactorOpts) -> Pin + Send>> { // WARNING: don't change the function signature. Making it `async fn` will cause // slow compile in release mode. - match opts.compactor_mode.as_str() { - "shared" => Box::pin(async move { + match opts.compactor_mode { + CompactorMode::Shared => Box::pin(async move { tracing::info!("Shared compactor pod options: {:?}", opts); tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone()); @@ -113,7 +115,7 @@ pub fn start(opts: CompactorOpts) -> Pin + Send>> { join_handle.await.unwrap(); }), - "dedicated" => Box::pin(async move { + CompactorMode::Dedicated => Box::pin(async move { tracing::info!("Compactor node options: {:?}", opts); tracing::info!("meta address: {}", opts.meta_address.clone()); @@ -136,11 +138,5 @@ pub fn start(opts: CompactorOpts) -> Pin + Send>> { join_handle.await.unwrap(); observer_join_handle.abort(); }), - _ => { - panic!( - "Compactor mode {} is configured incorrectly", - opts.compactor_mode - ); - } } }