Skip to content

Commit

Permalink
compactor mode be an enum
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Sep 19, 2023
1 parent 1f5cafb commit 32ae944
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
10 changes: 10 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,16 @@ impl AsyncStackTraceOption {
}
}

#[derive(Debug, Default, Clone, Copy, ValueEnum)]
pub enum CompactorMode {
#[default]
#[clap(alias = "dedicated")]
Dedicated,

#[clap(alias = "shared")]
Shared,
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct HeapProfilingConfig {
/// Enable to auto dump heap profile when memory usage is high
Expand Down
20 changes: 8 additions & 12 deletions src/storage/compactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ pub mod server;
mod telemetry;

use clap::Parser;
use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
use risingwave_common::config::{
AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig,
};

use crate::server::{compactor_serve, shared_compactor_serve};

Expand Down Expand Up @@ -88,8 +90,8 @@ pub struct CompactorOpts {
#[override_opts(path = storage.object_store_read_timeout_ms)]
pub object_store_read_timeout_ms: Option<u64>,

#[clap(long, env = "RW_COMPACTOR_MODE", default_value = "dedicated")]
pub compactor_mode: String,
#[clap(long, env = "RW_COMPACTOR_MODE", value_enum)]
pub compactor_mode: CompactorMode,

#[clap(long, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
pub proxy_rpc_endpoint: String,
Expand All @@ -101,8 +103,8 @@ use std::pin::Pin;
pub fn start(opts: CompactorOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
// WARNING: don't change the function signature. Making it `async fn` will cause
// slow compile in release mode.
match opts.compactor_mode.as_str() {
"shared" => Box::pin(async move {
match opts.compactor_mode {
CompactorMode::Shared => Box::pin(async move {
tracing::info!("Shared compactor pod options: {:?}", opts);
tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone());

Expand All @@ -113,7 +115,7 @@ pub fn start(opts: CompactorOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {

join_handle.await.unwrap();
}),
"dedicated" => Box::pin(async move {
CompactorMode::Dedicated => Box::pin(async move {
tracing::info!("Compactor node options: {:?}", opts);
tracing::info!("meta address: {}", opts.meta_address.clone());

Expand All @@ -136,11 +138,5 @@ pub fn start(opts: CompactorOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
join_handle.await.unwrap();
observer_join_handle.abort();
}),
_ => {
panic!(
"Compactor mode {} is configured incorrectly",
opts.compactor_mode
);
}
}
}

0 comments on commit 32ae944

Please sign in to comment.