Skip to content

Commit

Permalink
refactor(frontend): make frontend compute runtime configurable (#14597)
Browse files Browse the repository at this point in the history
  • Loading branch information
yufansong authored and Little-Wallace committed Jan 20, 2024
1 parent c387da0 commit f7b91c2
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 13 deletions.
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ pub struct BatchConfig {

#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,

#[serde(default = "default::batch::frontend_compute_runtime_worker_threads")]
/// frontend compute runtime worker threads
pub frontend_compute_runtime_worker_threads: usize,
}

/// The section `[streaming]` in `risingwave.toml`.
Expand Down Expand Up @@ -1395,6 +1399,10 @@ pub mod default {
// 1 hour
60 * 60
}

pub fn frontend_compute_runtime_worker_threads() -> usize {
4
}
}

pub mod compaction_config {
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ meta_cached_traces_memory_limit_bytes = 134217728
[batch]
enable_barrier_read = false
statement_timeout_in_sec = 3600
frontend_compute_runtime_worker_threads = 4

[batch.developer]
batch_connector_message_buffer_size = 16
Expand Down
36 changes: 23 additions & 13 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ impl FrontendEnv {
let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
let client_pool = Arc::new(ComputeClientPool::default());
let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(
Builder::new_multi_thread()
.worker_threads(
load_config("", FrontendOpts::default())
.batch
.frontend_compute_runtime_worker_threads,
)
.thread_name("rw-batch-local")
.enable_all()
.build()
.unwrap(),
));
Self {
meta_client,
catalog_writer,
Expand All @@ -184,7 +196,7 @@ impl FrontendEnv {
meta_config: MetaConfig::default(),
source_metrics: Arc::new(SourceMetrics::default()),
creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
compute_runtime: Self::create_compute_runtime(),
compute_runtime,
}
}

Expand Down Expand Up @@ -325,6 +337,15 @@ impl FrontendEnv {
let creating_streaming_job_tracker =
Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));

let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(
Builder::new_multi_thread()
.worker_threads(batch_config.frontend_compute_runtime_worker_threads)
.thread_name("rw-batch-local")
.enable_all()
.build()
.unwrap(),
));

Ok((
Self {
catalog_reader,
Expand All @@ -343,7 +364,7 @@ impl FrontendEnv {
meta_config,
source_metrics,
creating_streaming_job_tracker,
compute_runtime: Self::create_compute_runtime(),
compute_runtime,
},
join_handles,
shutdown_senders,
Expand Down Expand Up @@ -432,17 +453,6 @@ impl FrontendEnv {
self.compute_runtime.clone()
}

fn create_compute_runtime() -> Arc<BackgroundShutdownRuntime> {
Arc::new(BackgroundShutdownRuntime::from(
Builder::new_multi_thread()
.worker_threads(4)
.thread_name("rw-batch-local")
.enable_all()
.build()
.unwrap(),
))
}

/// Cancel queries (i.e. batch queries) in session.
/// If the session exists return true, otherwise, return false.
pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
Expand Down

0 comments on commit f7b91c2

Please sign in to comment.