Skip to content

Commit

Permalink
feat(frontend): add max_batch_queries_per_frontend_node config (#15574
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kwannoel authored Mar 8, 2024
1 parent 7181e58 commit f796432
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 4 deletions.
12 changes: 12 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,18 @@ profile:
- use: prometheus
- use: grafana

full-with-batch-query-limit:
config-path: src/config/full-with-batch-query-limit.toml
steps:
- use: minio
- use: etcd
- use: meta-node
- use: compute-node
- use: frontend
- use: compactor
- use: prometheus
- use: grafana

compose:
risingwave: "ghcr.io/risingwavelabs/risingwave:latest"
prometheus: "prom/prometheus:latest"
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,14 @@ pub struct BatchConfig {
#[config_doc(omitted)]
pub developer: BatchDeveloperConfig,

/// This is the max number of queries per sql session.
#[serde(default)]
pub distributed_query_limit: Option<u64>,

/// This is the max number of batch queries per frontend node.
#[serde(default)]
pub max_batch_queries_per_frontend_node: Option<u64>,

#[serde(default = "default::batch::enable_barrier_read")]
pub enable_barrier_read: bool,

Expand Down
3 changes: 2 additions & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ This page is automatically generated by `./risedev generate-example-config`

| Config | Description | Default |
|--------|-------------|---------|
| distributed_query_limit | | |
| distributed_query_limit | This is the max number of queries per sql session. | |
| enable_barrier_read | | false |
| frontend_compute_runtime_worker_threads | frontend compute runtime worker threads | 4 |
| max_batch_queries_per_frontend_node | This is the max number of batch queries per frontend node. | |
| statement_timeout_in_sec | Timeout for a batch query in seconds. | 3600 |
| worker_threads_num | The thread number of the batch task runtime in the compute node. The default value is decided by `tokio`. | |

Expand Down
2 changes: 2 additions & 0 deletions src/config/full-with-batch-query-limit.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[batch]
max_batch_queries_per_frontend_node = 2
11 changes: 9 additions & 2 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub struct QueryExecution {
shutdown_tx: Sender<QueryMessage>,
/// Identified by process_id, secret_key. Query in the same session should have same key.
pub session_id: SessionId,
/// Permit to execute the query. Once query finishes execution, this is dropped.
pub permit: Option<tokio::sync::OwnedSemaphorePermit>,
}

struct QueryRunner {
Expand All @@ -94,7 +96,11 @@ struct QueryRunner {

impl QueryExecution {
#[allow(clippy::too_many_arguments)]
pub fn new(query: Query, session_id: SessionId) -> Self {
pub fn new(
query: Query,
session_id: SessionId,
permit: Option<tokio::sync::OwnedSemaphorePermit>,
) -> Self {
let query = Arc::new(query);
let (sender, receiver) = channel(100);
let state = QueryState::Pending {
Expand All @@ -106,6 +112,7 @@ impl QueryExecution {
state: RwLock::new(state),
shutdown_tx: sender,
session_id,
permit,
}
}

Expand Down Expand Up @@ -507,7 +514,7 @@ pub(crate) mod tests {
let query = create_query().await;
let query_id = query.query_id().clone();
let pinned_snapshot = hummock_snapshot_manager.acquire();
let query_execution = Arc::new(QueryExecution::new(query, (0, 0)));
let query_execution = Arc::new(QueryExecution::new(query, (0, 0), None));
let query_execution_info = Arc::new(RwLock::new(QueryExecutionInfo::new_from_map(
HashMap::from([(query_id, query_execution.clone())]),
)));
Expand Down
34 changes: 33 additions & 1 deletion src/frontend/src/scheduler/distributed/query_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::session_config::QueryMode;
use risingwave_pb::batch_plan::TaskOutputId;
use risingwave_pb::common::HostAddress;
use risingwave_rpc_client::ComputeClientPoolRef;
use tokio::sync::OwnedSemaphorePermit;

use super::stats::DistributedQueryMetrics;
use super::QueryExecution;
Expand Down Expand Up @@ -131,7 +132,12 @@ pub struct QueryManager {
catalog_reader: CatalogReader,
query_execution_info: QueryExecutionInfoRef,
pub query_metrics: Arc<DistributedQueryMetrics>,
/// Limit per session.
disrtibuted_query_limit: Option<u64>,
/// Limits the number of concurrent distributed queries.
distributed_query_semaphore: Option<Arc<tokio::sync::Semaphore>>,
/// Total permitted distributed query number.
pub total_distributed_query_limit: Option<u64>,
}

impl QueryManager {
Expand All @@ -141,14 +147,39 @@ impl QueryManager {
catalog_reader: CatalogReader,
query_metrics: Arc<DistributedQueryMetrics>,
disrtibuted_query_limit: Option<u64>,
total_distributed_query_limit: Option<u64>,
) -> Self {
let distributed_query_semaphore = total_distributed_query_limit
.map(|limit| Arc::new(tokio::sync::Semaphore::new(limit as usize)));
Self {
worker_node_manager,
compute_client_pool,
catalog_reader,
query_execution_info: Arc::new(RwLock::new(QueryExecutionInfo::default())),
query_metrics,
disrtibuted_query_limit,
distributed_query_semaphore,
total_distributed_query_limit,
}
}

async fn get_permit(&self) -> SchedulerResult<Option<OwnedSemaphorePermit>> {
match self.distributed_query_semaphore {
Some(ref semaphore) => {
let permit = semaphore.clone().acquire_owned().await;
match permit {
Ok(permit) => Ok(Some(permit)),
Err(_) => {
self.query_metrics.rejected_query_counter.inc();
Err(crate::scheduler::SchedulerError::QueryReachLimit(
QueryMode::Distributed,
self.total_distributed_query_limit
.expect("should have distributed query limit"),
))
}
}
}
None => Ok(None),
}
}

Expand All @@ -167,7 +198,8 @@ impl QueryManager {
));
}
let query_id = query.query_id.clone();
let query_execution = Arc::new(QueryExecution::new(query, context.session().id()));
let permit = self.get_permit().await?;
let query_execution = Arc::new(QueryExecution::new(query, context.session().id(), permit));

// Add queries status when begin.
context
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl FrontendEnv {
catalog_reader.clone(),
Arc::new(DistributedQueryMetrics::for_test()),
None,
None,
);
let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
let client_pool = Arc::new(ComputeClientPool::default());
Expand Down Expand Up @@ -277,6 +278,7 @@ impl FrontendEnv {
catalog_reader.clone(),
Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
batch_config.distributed_query_limit,
batch_config.max_batch_queries_per_frontend_node,
);

let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
Expand Down

0 comments on commit f796432

Please sign in to comment.