diff --git a/risedev.yml b/risedev.yml index cb352daab6cf9..9308fd9a63e03 100644 --- a/risedev.yml +++ b/risedev.yml @@ -925,6 +925,18 @@ profile: - use: prometheus - use: grafana + full-with-batch-query-limit: + config-path: src/config/full-with-batch-query-limit.toml + steps: + - use: minio + - use: etcd + - use: meta-node + - use: compute-node + - use: frontend + - use: compactor + - use: prometheus + - use: grafana + compose: risingwave: "ghcr.io/risingwavelabs/risingwave:latest" prometheus: "prom/prometheus:latest" diff --git a/src/common/src/config.rs b/src/common/src/config.rs index f2e00232a38c9..1051a02beb174 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -478,9 +478,14 @@ pub struct BatchConfig { #[config_doc(omitted)] pub developer: BatchDeveloperConfig, + /// This is the max number of queries per sql session. #[serde(default)] pub distributed_query_limit: Option, + /// This is the max number of batch queries per frontend node. + #[serde(default)] + pub max_batch_queries_per_frontend_node: Option, + #[serde(default = "default::batch::enable_barrier_read")] pub enable_barrier_read: bool, diff --git a/src/config/docs.md b/src/config/docs.md index 93cfa668b0d5f..452caa81e83fc 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -6,9 +6,10 @@ This page is automatically generated by `./risedev generate-example-config` | Config | Description | Default | |--------|-------------|---------| -| distributed_query_limit | | | +| distributed_query_limit | This is the max number of queries per sql session. | | | enable_barrier_read | | false | | frontend_compute_runtime_worker_threads | frontend compute runtime worker threads | 4 | +| max_batch_queries_per_frontend_node | This is the max number of batch queries per frontend node. | | | statement_timeout_in_sec | Timeout for a batch query in seconds. | 3600 | | worker_threads_num | The thread number of the batch task runtime in the compute node. The default value is decided by `tokio`. | | diff --git a/src/config/full-with-batch-query-limit.toml b/src/config/full-with-batch-query-limit.toml new file mode 100644 index 0000000000000..a000025cd0528 --- /dev/null +++ b/src/config/full-with-batch-query-limit.toml @@ -0,0 +1,2 @@ +[batch] +max_batch_queries_per_frontend_node = 2 \ No newline at end of file diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 515a83d0923ef..153ae44d33850 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -73,6 +73,8 @@ pub struct QueryExecution { shutdown_tx: Sender, /// Identified by process_id, secret_key. Query in the same session should have same key. pub session_id: SessionId, + /// Permit to execute the query. Once query finishes execution, this is dropped. + pub permit: Option, } struct QueryRunner { @@ -94,7 +96,11 @@ struct QueryRunner { impl QueryExecution { #[allow(clippy::too_many_arguments)] - pub fn new(query: Query, session_id: SessionId) -> Self { + pub fn new( + query: Query, + session_id: SessionId, + permit: Option, + ) -> Self { let query = Arc::new(query); let (sender, receiver) = channel(100); let state = QueryState::Pending { @@ -106,6 +112,7 @@ impl QueryExecution { state: RwLock::new(state), shutdown_tx: sender, session_id, + permit, } } @@ -507,7 +514,7 @@ pub(crate) mod tests { let query = create_query().await; let query_id = query.query_id().clone(); let pinned_snapshot = hummock_snapshot_manager.acquire(); - let query_execution = Arc::new(QueryExecution::new(query, (0, 0))); + let query_execution = Arc::new(QueryExecution::new(query, (0, 0), None)); let query_execution_info = Arc::new(RwLock::new(QueryExecutionInfo::new_from_map( HashMap::from([(query_id, query_execution.clone())]), ))); diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index d86f12197f13b..abfc75b386ac0 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -25,6 +25,7 @@ use risingwave_common::session_config::QueryMode; use risingwave_pb::batch_plan::TaskOutputId; use risingwave_pb::common::HostAddress; use risingwave_rpc_client::ComputeClientPoolRef; +use tokio::sync::OwnedSemaphorePermit; use super::stats::DistributedQueryMetrics; use super::QueryExecution; @@ -131,7 +132,12 @@ pub struct QueryManager { catalog_reader: CatalogReader, query_execution_info: QueryExecutionInfoRef, pub query_metrics: Arc, + /// Limit per session. disrtibuted_query_limit: Option, + /// Limits the number of concurrent distributed queries. + distributed_query_semaphore: Option>, + /// Total permitted distributed query number. + pub total_distributed_query_limit: Option, } impl QueryManager { @@ -141,7 +147,10 @@ impl QueryManager { catalog_reader: CatalogReader, query_metrics: Arc, disrtibuted_query_limit: Option, + total_distributed_query_limit: Option, ) -> Self { + let distributed_query_semaphore = total_distributed_query_limit + .map(|limit| Arc::new(tokio::sync::Semaphore::new(limit as usize))); Self { worker_node_manager, compute_client_pool, @@ -149,6 +158,28 @@ impl QueryManager { query_execution_info: Arc::new(RwLock::new(QueryExecutionInfo::default())), query_metrics, disrtibuted_query_limit, + distributed_query_semaphore, + total_distributed_query_limit, + } + } + + async fn get_permit(&self) -> SchedulerResult> { + match self.distributed_query_semaphore { + Some(ref semaphore) => { + let permit = semaphore.clone().acquire_owned().await; + match permit { + Ok(permit) => Ok(Some(permit)), + Err(_) => { + self.query_metrics.rejected_query_counter.inc(); + Err(crate::scheduler::SchedulerError::QueryReachLimit( + QueryMode::Distributed, + self.total_distributed_query_limit + .expect("should have distributed query limit"), + )) + } + } + } + None => Ok(None), } } @@ -167,7 +198,8 @@ impl QueryManager { )); } let query_id = query.query_id.clone(); - let query_execution = Arc::new(QueryExecution::new(query, context.session().id())); + let permit = self.get_permit().await?; + let query_execution = Arc::new(QueryExecution::new(query, context.session().id(), permit)); // Add queries status when begin. context diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 30d1b02df7c03..371c698ca9a1d 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -173,6 +173,7 @@ impl FrontendEnv { catalog_reader.clone(), Arc::new(DistributedQueryMetrics::for_test()), None, + None, ); let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap(); let client_pool = Arc::new(ComputeClientPool::default()); @@ -277,6 +278,7 @@ impl FrontendEnv { catalog_reader.clone(), Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()), batch_config.distributed_query_limit, + batch_config.max_batch_queries_per_frontend_node, ); let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));