From f7b91c2654dced290793b3197a4edcc133ffa54e Mon Sep 17 00:00:00 2001 From: Yufan Song <33971064+yufansong@users.noreply.github.com> Date: Tue, 16 Jan 2024 23:01:35 -0800 Subject: [PATCH] refactor(frontend): make frontend compute runtime configurable (#14597) --- src/common/src/config.rs | 8 ++++++++ src/config/example.toml | 1 + src/frontend/src/session.rs | 36 +++++++++++++++++++++++------------- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index a742c8616ed57..fd1f4cab7cf10 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -460,6 +460,10 @@ pub struct BatchConfig { #[serde(default, flatten)] pub unrecognized: Unrecognized, + + #[serde(default = "default::batch::frontend_compute_runtime_worker_threads")] + /// frontend compute runtime worker threads + pub frontend_compute_runtime_worker_threads: usize, } /// The section `[streaming]` in `risingwave.toml`. @@ -1395,6 +1399,10 @@ pub mod default { // 1 hour 60 * 60 } + + pub fn frontend_compute_runtime_worker_threads() -> usize { + 4 + } } pub mod compaction_config { diff --git a/src/config/example.toml b/src/config/example.toml index b6a2504a22d3a..77ef4e644c7b2 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -74,6 +74,7 @@ meta_cached_traces_memory_limit_bytes = 134217728 [batch] enable_barrier_read = false statement_timeout_in_sec = 3600 +frontend_compute_runtime_worker_threads = 4 [batch.developer] batch_connector_message_buffer_size = 16 diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index b5ef22590875d..10b4fe8f32954 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -167,6 +167,18 @@ impl FrontendEnv { let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap(); let client_pool = Arc::new(ComputeClientPool::default()); let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone()); + let compute_runtime = Arc::new(BackgroundShutdownRuntime::from( + Builder::new_multi_thread() + .worker_threads( + load_config("", FrontendOpts::default()) + .batch + .frontend_compute_runtime_worker_threads, + ) + .thread_name("rw-batch-local") + .enable_all() + .build() + .unwrap(), + )); Self { meta_client, catalog_writer, @@ -184,7 +196,7 @@ impl FrontendEnv { meta_config: MetaConfig::default(), source_metrics: Arc::new(SourceMetrics::default()), creating_streaming_job_tracker: Arc::new(creating_streaming_tracker), - compute_runtime: Self::create_compute_runtime(), + compute_runtime, } } @@ -325,6 +337,15 @@ impl FrontendEnv { let creating_streaming_job_tracker = Arc::new(StreamingJobTracker::new(frontend_meta_client.clone())); + let compute_runtime = Arc::new(BackgroundShutdownRuntime::from( + Builder::new_multi_thread() + .worker_threads(batch_config.frontend_compute_runtime_worker_threads) + .thread_name("rw-batch-local") + .enable_all() + .build() + .unwrap(), + )); + Ok(( Self { catalog_reader, @@ -343,7 +364,7 @@ impl FrontendEnv { meta_config, source_metrics, creating_streaming_job_tracker, - compute_runtime: Self::create_compute_runtime(), + compute_runtime, }, join_handles, shutdown_senders, @@ -432,17 +453,6 @@ impl FrontendEnv { self.compute_runtime.clone() } - fn create_compute_runtime() -> Arc { - Arc::new(BackgroundShutdownRuntime::from( - Builder::new_multi_thread() - .worker_threads(4) - .thread_name("rw-batch-local") - .enable_all() - .build() - .unwrap(), - )) - } - /// Cancel queries (i.e. batch queries) in session. /// If the session exists return true, otherwise, return false. pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {