Skip to content

Commit

Permalink
add new config
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Mar 8, 2024
1 parent bffaa38 commit d5a77d3
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 3 deletions.
12 changes: 12 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,18 @@ profile:
- use: prometheus
- use: grafana

full-with-batch-query-limit:
config-path: src/config/full-with-batch-query-limit.toml
steps:
- use: minio
- use: etcd
- use: meta-node
- use: compute-node
- use: frontend
- use: compactor
- use: prometheus
- use: grafana

compose:
risingwave: "ghcr.io/risingwavelabs/risingwave:latest"
prometheus: "prom/prometheus:latest"
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,14 @@ pub struct BatchConfig {
#[config_doc(omitted)]
pub developer: BatchDeveloperConfig,

/// This is the max number of queries per sql session.
#[serde(default)]
pub distributed_query_limit: Option<u64>,

/// This is the max number of batch queries per frontend node.
#[serde(default)]
pub max_batch_queries_per_frontend_node: Option<u64>,

#[serde(default = "default::batch::enable_barrier_read")]
pub enable_barrier_read: bool,

Expand Down
2 changes: 2 additions & 0 deletions src/config/full-with-batch-query-limit.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[batch]
max_batch_queries_per_frontend_node = 2
11 changes: 9 additions & 2 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub struct QueryExecution {
shutdown_tx: Sender<QueryMessage>,
/// Identified by process_id, secret_key. Query in the same session should have same key.
pub session_id: SessionId,
/// Permit to execute the query. Once query finishes execution, this is dropped.
pub permit: Option<tokio::sync::OwnedSemaphorePermit>,
}

struct QueryRunner {
Expand All @@ -94,7 +96,11 @@ struct QueryRunner {

impl QueryExecution {
#[allow(clippy::too_many_arguments)]
pub fn new(query: Query, session_id: SessionId) -> Self {
pub fn new(
query: Query,
session_id: SessionId,
permit: Option<tokio::sync::OwnedSemaphorePermit>,
) -> Self {
let query = Arc::new(query);
let (sender, receiver) = channel(100);
let state = QueryState::Pending {
Expand All @@ -106,6 +112,7 @@ impl QueryExecution {
state: RwLock::new(state),
shutdown_tx: sender,
session_id,
permit,
}
}

Expand Down Expand Up @@ -507,7 +514,7 @@ pub(crate) mod tests {
let query = create_query().await;
let query_id = query.query_id().clone();
let pinned_snapshot = hummock_snapshot_manager.acquire();
let query_execution = Arc::new(QueryExecution::new(query, (0, 0)));
let query_execution = Arc::new(QueryExecution::new(query, (0, 0), None));
let query_execution_info = Arc::new(RwLock::new(QueryExecutionInfo::new_from_map(
HashMap::from([(query_id, query_execution.clone())]),
)));
Expand Down
34 changes: 33 additions & 1 deletion src/frontend/src/scheduler/distributed/query_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::session_config::QueryMode;
use risingwave_pb::batch_plan::TaskOutputId;
use risingwave_pb::common::HostAddress;
use risingwave_rpc_client::ComputeClientPoolRef;
use tokio::sync::OwnedSemaphorePermit;

use super::stats::DistributedQueryMetrics;
use super::QueryExecution;
Expand Down Expand Up @@ -131,7 +132,12 @@ pub struct QueryManager {
catalog_reader: CatalogReader,
query_execution_info: QueryExecutionInfoRef,
pub query_metrics: Arc<DistributedQueryMetrics>,
/// Limit per session.
disrtibuted_query_limit: Option<u64>,
/// Limits the number of concurrent distributed queries.
distributed_query_semaphore: Option<Arc<tokio::sync::Semaphore>>,
/// Total permitted distributed query number.
pub total_distributed_query_limit: Option<u64>,
}

impl QueryManager {
Expand All @@ -141,14 +147,39 @@ impl QueryManager {
catalog_reader: CatalogReader,
query_metrics: Arc<DistributedQueryMetrics>,
disrtibuted_query_limit: Option<u64>,
total_distributed_query_limit: Option<u64>,
) -> Self {
let distributed_query_semaphore = total_distributed_query_limit
.map(|limit| Arc::new(tokio::sync::Semaphore::new(limit as usize)));
Self {
worker_node_manager,
compute_client_pool,
catalog_reader,
query_execution_info: Arc::new(RwLock::new(QueryExecutionInfo::default())),
query_metrics,
disrtibuted_query_limit,
distributed_query_semaphore,
total_distributed_query_limit,
}
}

fn get_permit(&self) -> SchedulerResult<Option<OwnedSemaphorePermit>> {
match self.distributed_query_semaphore {
Some(ref semaphore) => {
let permit = semaphore.clone().try_acquire_owned();
match permit {
Ok(permit) => Ok(Some(permit)),
Err(_) => {
self.query_metrics.rejected_query_counter.inc();
Err(crate::scheduler::SchedulerError::QueryReachLimit(
QueryMode::Distributed,
self.total_distributed_query_limit
.expect("should have distributed query limit"),
))
}
}
}
None => Ok(None),
}
}

Expand All @@ -167,7 +198,8 @@ impl QueryManager {
));
}
let query_id = query.query_id.clone();
let query_execution = Arc::new(QueryExecution::new(query, context.session().id()));
let permit = self.get_permit()?;
let query_execution = Arc::new(QueryExecution::new(query, context.session().id(), permit));

// Add queries status when begin.
context
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl FrontendEnv {
catalog_reader.clone(),
Arc::new(DistributedQueryMetrics::for_test()),
None,
None,
);
let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
let client_pool = Arc::new(ComputeClientPool::default());
Expand Down Expand Up @@ -277,6 +278,7 @@ impl FrontendEnv {
catalog_reader.clone(),
Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
batch_config.distributed_query_limit,
batch_config.max_batch_queries_per_frontend_node,
);

let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
Expand Down

0 comments on commit d5a77d3

Please sign in to comment.