From 0ccb0ec34655617782b797c30fa984cf8dcd2502 Mon Sep 17 00:00:00 2001 From: yufansong Date: Mon, 15 Jan 2024 19:35:07 -0800 Subject: [PATCH 1/6] add config --- src/common/src/config.rs | 16 ++++++++++++++++ src/frontend/src/session.rs | 31 ++++++++++++++++++------------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 49b0453334c17..ab682906f9246 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -478,6 +478,14 @@ pub struct StreamingConfig { #[serde(default, flatten)] pub unrecognized: Unrecognized, + + #[serde(default = "default::streaming::compute_runtime_worker_threads")] + /// compute runtime worker threads + pub compute_runtime_worker_threads: usize, + + #[serde(default = "default::streaming::compute_runtime_worker_name")] + /// compute runtime worker threads + pub compute_runtime_worker_name: String, } #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] @@ -1250,6 +1258,14 @@ pub mod default { pub fn unique_user_stream_errors() -> usize { 10 } + + pub fn compute_runtime_worker_threads() -> usize { + 4 + } + + pub fn compute_runtime_worker_name() -> String { + "rw-batch-local".to_string() + } } pub mod file_cache { diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index b5ef22590875d..0bb173bd5a71f 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -184,7 +184,14 @@ impl FrontendEnv { meta_config: MetaConfig::default(), source_metrics: Arc::new(SourceMetrics::default()), creating_streaming_job_tracker: Arc::new(creating_streaming_tracker), - compute_runtime: Self::create_compute_runtime(), + compute_runtime: Arc::new(BackgroundShutdownRuntime::from( + Builder::new_multi_thread() + .worker_threads(4) + .thread_name("rw-batch-local") + .enable_all() + .build() + .unwrap(), + )), } } @@ -325,6 +332,15 @@ impl FrontendEnv { let creating_streaming_job_tracker = Arc::new(StreamingJobTracker::new(frontend_meta_client.clone())); + let compute_runtime = Arc::new(BackgroundShutdownRuntime::from( + Builder::new_multi_thread() + .worker_threads(config.streaming.compute_runtime_worker_threads) + .thread_name(config.streaming.compute_runtime_worker_name) + .enable_all() + .build() + .unwrap(), + )); + Ok(( Self { catalog_reader, @@ -343,7 +359,7 @@ impl FrontendEnv { meta_config, source_metrics, creating_streaming_job_tracker, - compute_runtime: Self::create_compute_runtime(), + compute_runtime, }, join_handles, shutdown_senders, @@ -432,17 +448,6 @@ impl FrontendEnv { self.compute_runtime.clone() } - fn create_compute_runtime() -> Arc { - Arc::new(BackgroundShutdownRuntime::from( - Builder::new_multi_thread() - .worker_threads(4) - .thread_name("rw-batch-local") - .enable_all() - .build() - .unwrap(), - )) - } - /// Cancel queries (i.e. batch queries) in session. /// If the session exists return true, otherwise, return false. pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool { From 3adf3fd0fc359d1895ce083342d8202e0f110c86 Mon Sep 17 00:00:00 2001 From: yufansong Date: Tue, 16 Jan 2024 01:13:05 -0800 Subject: [PATCH 2/6] fix typo --- src/common/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index ab682906f9246..60a8fbada21c6 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -484,7 +484,7 @@ pub struct StreamingConfig { pub compute_runtime_worker_threads: usize, #[serde(default = "default::streaming::compute_runtime_worker_name")] - /// compute runtime worker threads + /// compute runtime worker name pub compute_runtime_worker_name: String, } From aa91db084c843c78792cb5a00b45599375eed13a Mon Sep 17 00:00:00 2001 From: yufansong Date: Tue, 16 Jan 2024 02:24:25 -0800 Subject: [PATCH 3/6] fix unit test --- src/config/example.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/config/example.toml b/src/config/example.toml index 21d13f81fbdcd..aec3fdd9ef2db 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -83,6 +83,8 @@ batch_chunk_size = 1024 in_flight_barrier_nums = 10000 async_stack_trace = "ReleaseVerbose" unique_user_stream_errors = 10 +compute_runtime_worker_threads = 4 +compute_runtime_worker_name = "rw-batch-local" [streaming.developer] stream_enable_executor_row_count = false From fb6f6a524d6233708a2af15022318c6446f7cfb7 Mon Sep 17 00:00:00 2001 From: yufansong Date: Tue, 16 Jan 2024 21:22:40 -0800 Subject: [PATCH 4/6] apply suggestion --- src/common/src/config.rs | 24 ++++++++---------------- src/config/example.toml | 3 +-- src/frontend/src/session.rs | 21 +++++++++++---------- 3 files changed, 20 insertions(+), 28 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 60a8fbada21c6..7b845b20d32c0 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -451,6 +451,10 @@ pub struct BatchConfig { #[serde(default, flatten)] pub unrecognized: Unrecognized, + + #[serde(default = "default::batch::compute_runtime_worker_threads")] + /// compute runtime worker threads + pub compute_runtime_worker_threads: usize, } /// The section `[streaming]` in `risingwave.toml`. @@ -478,14 +482,6 @@ pub struct StreamingConfig { #[serde(default, flatten)] pub unrecognized: Unrecognized, - - #[serde(default = "default::streaming::compute_runtime_worker_threads")] - /// compute runtime worker threads - pub compute_runtime_worker_threads: usize, - - #[serde(default = "default::streaming::compute_runtime_worker_name")] - /// compute runtime worker name - pub compute_runtime_worker_name: String, } #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] @@ -1258,14 +1254,6 @@ pub mod default { pub fn unique_user_stream_errors() -> usize { 10 } - - pub fn compute_runtime_worker_threads() -> usize { - 4 - } - - pub fn compute_runtime_worker_name() -> String { - "rw-batch-local".to_string() - } } pub mod file_cache { @@ -1442,6 +1430,10 @@ pub mod default { // 1 hour 60 * 60 } + + pub fn compute_runtime_worker_threads() -> usize { + 4 + } } pub mod compaction_config { diff --git a/src/config/example.toml b/src/config/example.toml index aec3fdd9ef2db..a8eb28e011f6d 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -73,6 +73,7 @@ meta_cached_traces_memory_limit_bytes = 134217728 [batch] enable_barrier_read = false statement_timeout_in_sec = 3600 +compute_runtime_worker_threads = 4 [batch.developer] batch_connector_message_buffer_size = 16 @@ -83,8 +84,6 @@ batch_chunk_size = 1024 in_flight_barrier_nums = 10000 async_stack_trace = "ReleaseVerbose" unique_user_stream_errors = 10 -compute_runtime_worker_threads = 4 -compute_runtime_worker_name = "rw-batch-local" [streaming.developer] stream_enable_executor_row_count = false diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 0bb173bd5a71f..f565b6f60ceee 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -167,6 +167,14 @@ impl FrontendEnv { let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap(); let client_pool = Arc::new(ComputeClientPool::default()); let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone()); + let compute_runtime = Arc::new(BackgroundShutdownRuntime::from( + Builder::new_multi_thread() + .worker_threads(load_config("", FrontendOpts::default()).batch.compute_runtime_worker_threads) + .thread_name("rw-batch-local") + .enable_all() + .build() + .unwrap(), + )); Self { meta_client, catalog_writer, @@ -184,14 +192,7 @@ impl FrontendEnv { meta_config: MetaConfig::default(), source_metrics: Arc::new(SourceMetrics::default()), creating_streaming_job_tracker: Arc::new(creating_streaming_tracker), - compute_runtime: Arc::new(BackgroundShutdownRuntime::from( - Builder::new_multi_thread() - .worker_threads(4) - .thread_name("rw-batch-local") - .enable_all() - .build() - .unwrap(), - )), + compute_runtime, } } @@ -334,8 +335,8 @@ impl FrontendEnv { let compute_runtime = Arc::new(BackgroundShutdownRuntime::from( Builder::new_multi_thread() - .worker_threads(config.streaming.compute_runtime_worker_threads) - .thread_name(config.streaming.compute_runtime_worker_name) + .worker_threads(batch_config.compute_runtime_worker_threads) + .thread_name("rw-batch-local") .enable_all() .build() .unwrap(), From e9dfd4d6ce8287a90b190d278f70a8d818043265 Mon Sep 17 00:00:00 2001 From: yufansong Date: Tue, 16 Jan 2024 21:28:46 -0800 Subject: [PATCH 5/6] format --- src/frontend/src/session.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index f565b6f60ceee..2f44284ab4014 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -169,7 +169,11 @@ impl FrontendEnv { let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone()); let compute_runtime = Arc::new(BackgroundShutdownRuntime::from( Builder::new_multi_thread() - .worker_threads(load_config("", FrontendOpts::default()).batch.compute_runtime_worker_threads) + .worker_threads( + load_config("", FrontendOpts::default()) + .batch + .compute_runtime_worker_threads, + ) .thread_name("rw-batch-local") .enable_all() .build() From 38260ab81d057918d2cfd92398d0ac6a72a9adfa Mon Sep 17 00:00:00 2001 From: yufansong Date: Tue, 16 Jan 2024 22:22:10 -0800 Subject: [PATCH 6/6] rename --- src/common/src/config.rs | 8 ++++---- src/config/example.toml | 2 +- src/frontend/src/session.rs | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 7b845b20d32c0..01e5e79f1c287 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -452,9 +452,9 @@ pub struct BatchConfig { #[serde(default, flatten)] pub unrecognized: Unrecognized, - #[serde(default = "default::batch::compute_runtime_worker_threads")] - /// compute runtime worker threads - pub compute_runtime_worker_threads: usize, + #[serde(default = "default::batch::frontend_compute_runtime_worker_threads")] + /// frontend compute runtime worker threads + pub frontend_compute_runtime_worker_threads: usize, } /// The section `[streaming]` in `risingwave.toml`. @@ -1431,7 +1431,7 @@ pub mod default { 60 * 60 } - pub fn compute_runtime_worker_threads() -> usize { + pub fn frontend_compute_runtime_worker_threads() -> usize { 4 } } diff --git a/src/config/example.toml b/src/config/example.toml index a8eb28e011f6d..126482a20ddf4 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -73,7 +73,7 @@ meta_cached_traces_memory_limit_bytes = 134217728 [batch] enable_barrier_read = false statement_timeout_in_sec = 3600 -compute_runtime_worker_threads = 4 +frontend_compute_runtime_worker_threads = 4 [batch.developer] batch_connector_message_buffer_size = 16 diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 2f44284ab4014..10b4fe8f32954 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -172,7 +172,7 @@ impl FrontendEnv { .worker_threads( load_config("", FrontendOpts::default()) .batch - .compute_runtime_worker_threads, + .frontend_compute_runtime_worker_threads, ) .thread_name("rw-batch-local") .enable_all() @@ -339,7 +339,7 @@ impl FrontendEnv { let compute_runtime = Arc::new(BackgroundShutdownRuntime::from( Builder::new_multi_thread() - .worker_threads(batch_config.compute_runtime_worker_threads) + .worker_threads(batch_config.frontend_compute_runtime_worker_threads) .thread_name("rw-batch-local") .enable_all() .build()