From 9fe649f9d3b2fa198cf16e7a09216ab14835db4d Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Mon, 11 Dec 2023 19:23:44 +0800 Subject: [PATCH 1/2] feat: Add query statement timeout --- src/common/src/config.rs | 9 +++++ src/common/src/session_config/mod.rs | 6 +-- src/config/example.toml | 1 + src/frontend/src/handler/query.rs | 29 +++++++++++--- .../src/scheduler/distributed/query.rs | 39 +++++++++++++++---- .../scheduler/distributed/query_manager.rs | 2 +- src/frontend/src/scheduler/error.rs | 6 +-- src/frontend/src/scheduler/local.rs | 36 +++++++++++++++-- src/frontend/src/scheduler/mod.rs | 10 ++++- src/frontend/src/session.rs | 8 ++++ 10 files changed, 121 insertions(+), 25 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index d7c4c6f260971..753378df9fcee 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -417,6 +417,10 @@ pub struct BatchConfig { #[serde(default = "default::batch::enable_barrier_read")] pub enable_barrier_read: bool, + /// Timeout for a batch query in seconds. + #[serde(default = "default::batch::statement_timeout_in_sec")] + pub statement_timeout_in_sec: u32, + #[serde(default, flatten)] pub unrecognized: Unrecognized, } @@ -1358,6 +1362,11 @@ pub mod default { pub fn enable_barrier_read() -> bool { false } + + pub fn statement_timeout_in_sec() -> u32 { + // 1 hour + 60 * 60 + } } pub mod compaction_config { diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 82c5bd0a6f120..cf2a0c6256622 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -195,12 +195,12 @@ pub struct ConfigMap { #[parameter(default = false)] synchronize_seqscans: bool, - /// Abort any statement that takes more than the specified amount of time. If + /// Abort query statement that takes more than the specified amount of time in sec. If /// log_min_error_statement is set to ERROR or lower, the statement that timed out will also be /// logged. If this value is specified without units, it is taken as milliseconds. A value of /// zero (the default) disables the timeout. - #[parameter(default = 0)] - statement_timeout: i32, + #[parameter(default = 0u32)] + statement_timeout: u32, /// See /// Unused in RisingWave, support for compatibility. diff --git a/src/config/example.toml b/src/config/example.toml index eda01aa5f8905..16ca17c3d6413 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -67,6 +67,7 @@ enable_emergency_picker = true [batch] enable_barrier_read = false +statement_timeout_in_sec = 3600 [batch.developer] batch_connector_message_buffer_size = 16 diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 4e533cd98cf0d..211beefdf71eb 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -311,6 +311,7 @@ async fn execute( .. } = plan_fragmenter_result; + let mut can_timeout_cancel = true; // Acquire the write guard for DML statements. match stmt_type { StatementType::INSERT @@ -320,6 +321,7 @@ async fn execute( | StatementType::UPDATE | StatementType::UPDATE_RETURNING => { session.txn_write_guard()?; + can_timeout_cancel = false; } _ => {} } @@ -341,7 +343,7 @@ async fn execute( let mut row_stream = match query_mode { QueryMode::Auto => unreachable!(), QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new( - local_execute(session.clone(), query).await?, + local_execute(session.clone(), query, can_timeout_cancel).await?, column_types, formats, session.clone(), @@ -349,7 +351,7 @@ async fn execute( // Local mode do not support cancel tasks. QueryMode::Distributed => { PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new( - distribute_execute(session.clone(), query).await?, + distribute_execute(session.clone(), query, can_timeout_cancel).await?, column_types, formats, session.clone(), @@ -451,8 +453,15 @@ async fn execute( async fn distribute_execute( session: Arc, query: Query, + can_timeout_cancel: bool, ) -> Result { - let execution_context: ExecutionContextRef = ExecutionContext::new(session.clone()).into(); + let timeout = if can_timeout_cancel { + Some(session.statement_timeout()) + } else { + None + }; + let execution_context: ExecutionContextRef = + ExecutionContext::new(session.clone(), timeout).into(); let query_manager = session.env().query_manager().clone(); query_manager @@ -462,14 +471,24 @@ async fn distribute_execute( } #[expect(clippy::unused_async)] -async fn local_execute(session: Arc, query: Query) -> Result { +async fn local_execute( + session: Arc, + query: Query, + can_timeout_cancel: bool, +) -> Result { + let timeout = if can_timeout_cancel { + Some(session.statement_timeout()) + } else { + None + }; let front_env = session.env(); // TODO: if there's no table scan, we don't need to acquire snapshot. let snapshot = session.pinned_snapshot(); // TODO: Passing sql here - let execution = LocalQueryExecution::new(query, front_env.clone(), "", snapshot, session); + let execution = + LocalQueryExecution::new(query, front_env.clone(), "", snapshot, session, timeout); Ok(execution.stream_rows()) } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 003c19d2ec9ac..5f0162e80e985 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -30,6 +30,7 @@ use risingwave_rpc_client::ComputeClientPoolRef; use thiserror_ext::AsReport; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::{oneshot, RwLock}; +use tokio::task::JoinHandle; use tracing::{debug, error, info, warn, Instrument}; use super::{DistributedQueryMetrics, QueryExecutionInfoRef, QueryResultFetcher, StageEvent}; @@ -47,7 +48,8 @@ use crate::scheduler::{ExecutionContextRef, ReadSnapshot, SchedulerError, Schedu pub enum QueryMessage { /// Events passed running execution. Stage(StageEvent), - CancelQuery, + /// Cancelled by some reason + CancelQuery(String), } enum QueryState { @@ -87,6 +89,7 @@ struct QueryRunner { query_execution_info: QueryExecutionInfoRef, query_metrics: Arc, + timeout_abort_task_handle: Option>, } impl QueryExecution { @@ -112,7 +115,7 @@ impl QueryExecution { /// cancel request (from ctrl-c, cli, ui etc). #[allow(clippy::too_many_arguments)] pub async fn start( - &self, + self: Arc, context: ExecutionContextRef, worker_node_manager: WorkerNodeSelector, pinned_snapshot: ReadSnapshot, @@ -129,7 +132,7 @@ impl QueryExecution { // can control when to release the snapshot. let stage_executions = self.gen_stage_executions( &pinned_snapshot, - context, + context.clone(), worker_node_manager, compute_client_pool.clone(), catalog_reader, @@ -139,6 +142,22 @@ impl QueryExecution { QueryState::Pending { msg_receiver } => { *state = QueryState::Running; + // Start a timer to cancel the query + let mut timeout_abort_task_handle: Option> = None; + if let Some(timeout) = context.timeout() { + let this = self.clone(); + timeout_abort_task_handle = Some(tokio::spawn(async move { + tokio::time::sleep(timeout).await; + warn!( + "Query {:?} timeout after {} seconds, sending cancel message.", + this.query.query_id, + timeout.as_secs(), + ); + this.abort(format!("timeout after {} seconds", timeout.as_secs())) + .await; + })); + } + // Create a oneshot channel for QueryResultFetcher to get failed event. let (root_stage_sender, root_stage_receiver) = oneshot::channel::>(); @@ -151,6 +170,7 @@ impl QueryExecution { scheduled_stages_count: 0, query_execution_info, query_metrics, + timeout_abort_task_handle, }; let span = tracing::info_span!( @@ -184,10 +204,10 @@ impl QueryExecution { } /// Cancel execution of this query. - pub async fn abort(self: Arc) { + pub async fn abort(self: Arc, reason: String) { if self .shutdown_tx - .send(QueryMessage::CancelQuery) + .send(QueryMessage::CancelQuery(reason)) .await .is_err() { @@ -236,6 +256,9 @@ impl QueryExecution { impl Drop for QueryRunner { fn drop(&mut self) { self.query_metrics.running_query_num.dec(); + self.timeout_abort_task_handle + .as_ref() + .inspect(|h| h.abort()); } } @@ -346,8 +369,8 @@ impl QueryRunner { break; } } - QueryMessage::CancelQuery => { - self.clean_all_stages(Some(SchedulerError::QueryCancelled)) + QueryMessage::CancelQuery(reason) => { + self.clean_all_stages(Some(SchedulerError::QueryCancelled(reason))) .await; // One stage failed, not necessary to execute schedule stages. break; @@ -489,7 +512,7 @@ pub(crate) mod tests { assert!(query_execution .start( - ExecutionContext::new(SessionImpl::mock().into()).into(), + ExecutionContext::new(SessionImpl::mock().into(), None).into(), worker_node_selector, ReadSnapshot::FrontendPinned { snapshot: pinned_snapshot, diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index 53fb6d3364294..53cdadfcb6fb5 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -117,7 +117,7 @@ impl QueryExecutionInfo { if query.session_id == session_id { let query = query.clone(); // Spawn a task to abort. Avoid await point in this function. - tokio::spawn(async move { query.abort().await }); + tokio::spawn(async move { query.abort("cancelled by user".to_string()).await }); } } } diff --git a/src/frontend/src/scheduler/error.rs b/src/frontend/src/scheduler/error.rs index cddd9f3e27c72..04276671916c3 100644 --- a/src/frontend/src/scheduler/error.rs +++ b/src/frontend/src/scheduler/error.rs @@ -49,9 +49,9 @@ pub enum SchedulerError { #[error("Task got killed because compute node running out of memory")] TaskRunningOutOfMemory, - /// Used when receive cancel request (ctrl-c) from user. - #[error("Cancelled by user")] - QueryCancelled, + /// Used when receive cancel request for some reason, such as user cancel or timeout. + #[error("Query cancelled: {0}")] + QueryCancelled(String), #[error("Reject query: the {0} query number reaches the limit: {1}")] QueryReachLimit(QueryMode, u64), diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 37c603f833691..879cbd72dce9d 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use std::time::Duration; use anyhow::anyhow; use futures::stream::BoxStream; @@ -66,6 +67,7 @@ pub struct LocalQueryExecution { snapshot: ReadSnapshot, session: Arc, worker_node_manager: WorkerNodeSelector, + timeout: Option, } impl LocalQueryExecution { @@ -75,6 +77,7 @@ impl LocalQueryExecution { sql: S, snapshot: ReadSnapshot, session: Arc, + timeout: Option, ) -> Self { let sql = sql.into(); let worker_node_manager = WorkerNodeSelector::new( @@ -89,6 +92,7 @@ impl LocalQueryExecution { snapshot, session, worker_node_manager, + timeout, } } @@ -149,15 +153,19 @@ impl LocalQueryExecution { let db_name = self.session.database().to_string(); let search_path = self.session.config().search_path(); let time_zone = self.session.config().timezone(); + let timeout = self.timeout; + let sender1 = sender.clone(); let exec = async move { let mut data_stream = self.run().map(|r| r.map_err(|e| Box::new(e) as BoxedError)); while let Some(mut r) = data_stream.next().await { // append a query cancelled error if the query is cancelled. if r.is_err() && shutdown_rx.is_cancelled() { - r = Err(Box::new(SchedulerError::QueryCancelled) as BoxedError); + r = Err(Box::new(SchedulerError::QueryCancelled( + "Cancelled by user".to_string(), + )) as BoxedError); } - if sender.send(r).await.is_err() { + if sender1.send(r).await.is_err() { tracing::info!("Receiver closed."); return; } @@ -176,7 +184,29 @@ impl LocalQueryExecution { let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await }; let exec = async move { TIME_ZONE::scope(time_zone, exec).await }; - compute_runtime.spawn(exec); + if let Some(timeout) = timeout { + let exec = async move { + if let Err(_e) = tokio::time::timeout(timeout, exec).await { + tracing::error!( + "Local query execution timeout after {} seconds", + timeout.as_secs() + ); + if sender + .send(Err(Box::new(SchedulerError::QueryCancelled(format!( + "timeout after {} seconds", + timeout.as_secs(), + ))) as BoxedError)) + .await + .is_err() + { + tracing::info!("Receiver closed."); + } + } + }; + compute_runtime.spawn(exec); + } else { + compute_runtime.spawn(exec); + } ReceiverStream::new(receiver) } diff --git a/src/frontend/src/scheduler/mod.rs b/src/frontend/src/scheduler/mod.rs index d5361f83ff56f..cf197b82403ef 100644 --- a/src/frontend/src/scheduler/mod.rs +++ b/src/frontend/src/scheduler/mod.rs @@ -15,6 +15,7 @@ //! Fragment and schedule batch queries. use std::sync::Arc; +use std::time::Duration; use futures::Stream; use risingwave_common::array::DataChunk; @@ -46,19 +47,24 @@ pub trait DataChunkStream = Stream>; /// Context for mpp query execution. pub struct ExecutionContext { session: Arc, + timeout: Option, } pub type ExecutionContextRef = Arc; impl ExecutionContext { - pub fn new(session: Arc) -> Self { - Self { session } + pub fn new(session: Arc, timeout: Option) -> Self { + Self { session, timeout } } pub fn session(&self) -> &SessionImpl { &self.session } + pub fn timeout(&self) -> Option { + self.timeout + } + pub fn to_batch_task_context(&self) -> FrontendBatchTaskContext { FrontendBatchTaskContext::new(self.session.env().clone(), self.session.auth_context()) } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 9d221c8023ece..9bf6a17f59725 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -862,6 +862,14 @@ impl SessionImpl { VisibilityMode::Checkpoint => false, } } + + pub fn statement_timeout(&self) -> Duration { + if self.config().statement_timeout() == 0 { + Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64) + } else { + Duration::from_secs(self.config().statement_timeout() as u64) + } + } } pub struct SessionManagerImpl { From cdee192ce9bbb52ab6873931fa6d0e143806fc0b Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 12 Dec 2023 15:44:05 +0800 Subject: [PATCH 2/2] Disable madsim --- src/frontend/src/handler/query.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 211beefdf71eb..599d4f16e79ef 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -455,7 +455,9 @@ async fn distribute_execute( query: Query, can_timeout_cancel: bool, ) -> Result { - let timeout = if can_timeout_cancel { + let timeout = if cfg!(madsim) { + None + } else if can_timeout_cancel { Some(session.statement_timeout()) } else { None @@ -476,7 +478,9 @@ async fn local_execute( query: Query, can_timeout_cancel: bool, ) -> Result { - let timeout = if can_timeout_cancel { + let timeout = if cfg!(madsim) { + None + } else if can_timeout_cancel { Some(session.statement_timeout()) } else { None