Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add statement_timeout for query. #13933

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ pub struct BatchConfig {
#[serde(default = "default::batch::enable_barrier_read")]
pub enable_barrier_read: bool,

/// Timeout for a batch query in seconds.
#[serde(default = "default::batch::statement_timeout_in_sec")]
pub statement_timeout_in_sec: u32,

#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,
}
Expand Down Expand Up @@ -1358,6 +1362,11 @@ pub mod default {
pub fn enable_barrier_read() -> bool {
false
}

pub fn statement_timeout_in_sec() -> u32 {
// 1 hour
60 * 60
}
}

pub mod compaction_config {
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ pub struct ConfigMap {
#[parameter(default = false)]
synchronize_seqscans: bool,

/// Abort any statement that takes more than the specified amount of time. If
/// Abort query statement that takes more than the specified amount of time in sec. If
/// log_min_error_statement is set to ERROR or lower, the statement that timed out will also be
/// logged. If this value is specified without units, it is taken as milliseconds. A value of
/// zero (the default) disables the timeout.
#[parameter(default = 0)]
statement_timeout: i32,
#[parameter(default = 0u32)]
statement_timeout: u32,

/// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
/// Unused in RisingWave, support for compatibility.
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ enable_emergency_picker = true

[batch]
enable_barrier_read = false
statement_timeout_in_sec = 3600

[batch.developer]
batch_connector_message_buffer_size = 16
Expand Down
33 changes: 28 additions & 5 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ async fn execute(
..
} = plan_fragmenter_result;

let mut can_timeout_cancel = true;
// Acquire the write guard for DML statements.
match stmt_type {
StatementType::INSERT
Expand All @@ -320,6 +321,7 @@ async fn execute(
| StatementType::UPDATE
| StatementType::UPDATE_RETURNING => {
session.txn_write_guard()?;
can_timeout_cancel = false;
}
_ => {}
}
Expand All @@ -341,15 +343,15 @@ async fn execute(
let mut row_stream = match query_mode {
QueryMode::Auto => unreachable!(),
QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
local_execute(session.clone(), query).await?,
local_execute(session.clone(), query, can_timeout_cancel).await?,
column_types,
formats,
session.clone(),
)),
// Local mode do not support cancel tasks.
QueryMode::Distributed => {
PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
distribute_execute(session.clone(), query).await?,
distribute_execute(session.clone(), query, can_timeout_cancel).await?,
column_types,
formats,
session.clone(),
Expand Down Expand Up @@ -451,8 +453,17 @@ async fn execute(
async fn distribute_execute(
session: Arc<SessionImpl>,
query: Query,
can_timeout_cancel: bool,
) -> Result<DistributedQueryStream> {
let execution_context: ExecutionContextRef = ExecutionContext::new(session.clone()).into();
let timeout = if cfg!(madsim) {
None
} else if can_timeout_cancel {
Some(session.statement_timeout())
} else {
None
};
let execution_context: ExecutionContextRef =
ExecutionContext::new(session.clone(), timeout).into();
let query_manager = session.env().query_manager().clone();

query_manager
Expand All @@ -462,14 +473,26 @@ async fn distribute_execute(
}

#[expect(clippy::unused_async)]
async fn local_execute(session: Arc<SessionImpl>, query: Query) -> Result<LocalQueryStream> {
async fn local_execute(
session: Arc<SessionImpl>,
query: Query,
can_timeout_cancel: bool,
) -> Result<LocalQueryStream> {
let timeout = if cfg!(madsim) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug in madsim which may lead to invalid memory access. We skip madsim using this approach for now.

None
} else if can_timeout_cancel {
Some(session.statement_timeout())
} else {
None
};
let front_env = session.env();

// TODO: if there's no table scan, we don't need to acquire snapshot.
let snapshot = session.pinned_snapshot();

// TODO: Passing sql here
let execution = LocalQueryExecution::new(query, front_env.clone(), "", snapshot, session);
let execution =
LocalQueryExecution::new(query, front_env.clone(), "", snapshot, session, timeout);

Ok(execution.stream_rows())
}
39 changes: 31 additions & 8 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_rpc_client::ComputeClientPoolRef;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{oneshot, RwLock};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn, Instrument};

use super::{DistributedQueryMetrics, QueryExecutionInfoRef, QueryResultFetcher, StageEvent};
Expand All @@ -47,7 +48,8 @@ use crate::scheduler::{ExecutionContextRef, ReadSnapshot, SchedulerError, Schedu
pub enum QueryMessage {
/// Events passed running execution.
Stage(StageEvent),
CancelQuery,
/// Cancelled by some reason
CancelQuery(String),
}

enum QueryState {
Expand Down Expand Up @@ -87,6 +89,7 @@ struct QueryRunner {
query_execution_info: QueryExecutionInfoRef,

query_metrics: Arc<DistributedQueryMetrics>,
timeout_abort_task_handle: Option<JoinHandle<()>>,
}

impl QueryExecution {
Expand All @@ -112,7 +115,7 @@ impl QueryExecution {
/// cancel request (from ctrl-c, cli, ui etc).
#[allow(clippy::too_many_arguments)]
pub async fn start(
&self,
self: Arc<Self>,
context: ExecutionContextRef,
worker_node_manager: WorkerNodeSelector,
pinned_snapshot: ReadSnapshot,
Expand All @@ -129,7 +132,7 @@ impl QueryExecution {
// can control when to release the snapshot.
let stage_executions = self.gen_stage_executions(
&pinned_snapshot,
context,
context.clone(),
worker_node_manager,
compute_client_pool.clone(),
catalog_reader,
Expand All @@ -139,6 +142,22 @@ impl QueryExecution {
QueryState::Pending { msg_receiver } => {
*state = QueryState::Running;

// Start a timer to cancel the query
let mut timeout_abort_task_handle: Option<JoinHandle<()>> = None;
if let Some(timeout) = context.timeout() {
let this = self.clone();
timeout_abort_task_handle = Some(tokio::spawn(async move {
tokio::time::sleep(timeout).await;
warn!(
"Query {:?} timeout after {} seconds, sending cancel message.",
this.query.query_id,
timeout.as_secs(),
);
this.abort(format!("timeout after {} seconds", timeout.as_secs()))
.await;
}));
}

// Create a oneshot channel for QueryResultFetcher to get failed event.
let (root_stage_sender, root_stage_receiver) =
oneshot::channel::<SchedulerResult<QueryResultFetcher>>();
Expand All @@ -151,6 +170,7 @@ impl QueryExecution {
scheduled_stages_count: 0,
query_execution_info,
query_metrics,
timeout_abort_task_handle,
};

let span = tracing::info_span!(
Expand Down Expand Up @@ -184,10 +204,10 @@ impl QueryExecution {
}

/// Cancel execution of this query.
pub async fn abort(self: Arc<Self>) {
pub async fn abort(self: Arc<Self>, reason: String) {
if self
.shutdown_tx
.send(QueryMessage::CancelQuery)
.send(QueryMessage::CancelQuery(reason))
.await
.is_err()
{
Expand Down Expand Up @@ -236,6 +256,9 @@ impl QueryExecution {
impl Drop for QueryRunner {
fn drop(&mut self) {
self.query_metrics.running_query_num.dec();
self.timeout_abort_task_handle
.as_ref()
.inspect(|h| h.abort());
}
}

Expand Down Expand Up @@ -346,8 +369,8 @@ impl QueryRunner {
break;
}
}
QueryMessage::CancelQuery => {
self.clean_all_stages(Some(SchedulerError::QueryCancelled))
QueryMessage::CancelQuery(reason) => {
self.clean_all_stages(Some(SchedulerError::QueryCancelled(reason)))
.await;
// One stage failed, not necessary to execute schedule stages.
break;
Expand Down Expand Up @@ -489,7 +512,7 @@ pub(crate) mod tests {

assert!(query_execution
.start(
ExecutionContext::new(SessionImpl::mock().into()).into(),
ExecutionContext::new(SessionImpl::mock().into(), None).into(),
worker_node_selector,
ReadSnapshot::FrontendPinned {
snapshot: pinned_snapshot,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/distributed/query_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl QueryExecutionInfo {
if query.session_id == session_id {
let query = query.clone();
// Spawn a task to abort. Avoid await point in this function.
tokio::spawn(async move { query.abort().await });
tokio::spawn(async move { query.abort("cancelled by user".to_string()).await });
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/scheduler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ pub enum SchedulerError {
#[error("Task got killed because compute node running out of memory")]
TaskRunningOutOfMemory,

/// Used when receive cancel request (ctrl-c) from user.
#[error("Cancelled by user")]
QueryCancelled,
/// Used when receive cancel request for some reason, such as user cancel or timeout.
#[error("Query cancelled: {0}")]
QueryCancelled(String),

#[error("Reject query: the {0} query number reaches the limit: {1}")]
QueryReachLimit(QueryMode, u64),
Expand Down
36 changes: 33 additions & 3 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use futures::stream::BoxStream;
Expand Down Expand Up @@ -66,6 +67,7 @@ pub struct LocalQueryExecution {
snapshot: ReadSnapshot,
session: Arc<SessionImpl>,
worker_node_manager: WorkerNodeSelector,
timeout: Option<Duration>,
}

impl LocalQueryExecution {
Expand All @@ -75,6 +77,7 @@ impl LocalQueryExecution {
sql: S,
snapshot: ReadSnapshot,
session: Arc<SessionImpl>,
timeout: Option<Duration>,
) -> Self {
let sql = sql.into();
let worker_node_manager = WorkerNodeSelector::new(
Expand All @@ -89,6 +92,7 @@ impl LocalQueryExecution {
snapshot,
session,
worker_node_manager,
timeout,
}
}

Expand Down Expand Up @@ -149,15 +153,19 @@ impl LocalQueryExecution {
let db_name = self.session.database().to_string();
let search_path = self.session.config().search_path();
let time_zone = self.session.config().timezone();
let timeout = self.timeout;

let sender1 = sender.clone();
let exec = async move {
let mut data_stream = self.run().map(|r| r.map_err(|e| Box::new(e) as BoxedError));
while let Some(mut r) = data_stream.next().await {
// append a query cancelled error if the query is cancelled.
if r.is_err() && shutdown_rx.is_cancelled() {
r = Err(Box::new(SchedulerError::QueryCancelled) as BoxedError);
r = Err(Box::new(SchedulerError::QueryCancelled(
"Cancelled by user".to_string(),
)) as BoxedError);
}
if sender.send(r).await.is_err() {
if sender1.send(r).await.is_err() {
tracing::info!("Receiver closed.");
return;
}
Expand All @@ -176,7 +184,29 @@ impl LocalQueryExecution {
let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await };
let exec = async move { TIME_ZONE::scope(time_zone, exec).await };

compute_runtime.spawn(exec);
if let Some(timeout) = timeout {
let exec = async move {
if let Err(_e) = tokio::time::timeout(timeout, exec).await {
tracing::error!(
"Local query execution timeout after {} seconds",
timeout.as_secs()
);
if sender
.send(Err(Box::new(SchedulerError::QueryCancelled(format!(
"timeout after {} seconds",
timeout.as_secs(),
))) as BoxedError))
.await
.is_err()
{
tracing::info!("Receiver closed.");
}
}
};
compute_runtime.spawn(exec);
} else {
compute_runtime.spawn(exec);
}

ReceiverStream::new(receiver)
}
Expand Down
10 changes: 8 additions & 2 deletions src/frontend/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Fragment and schedule batch queries.

use std::sync::Arc;
use std::time::Duration;

use futures::Stream;
use risingwave_common::array::DataChunk;
Expand Down Expand Up @@ -46,19 +47,24 @@ pub trait DataChunkStream = Stream<Item = Result<DataChunk>>;
/// Context for mpp query execution.
pub struct ExecutionContext {
session: Arc<SessionImpl>,
timeout: Option<Duration>,
}

pub type ExecutionContextRef = Arc<ExecutionContext>;

impl ExecutionContext {
pub fn new(session: Arc<SessionImpl>) -> Self {
Self { session }
pub fn new(session: Arc<SessionImpl>, timeout: Option<Duration>) -> Self {
Self { session, timeout }
}

pub fn session(&self) -> &SessionImpl {
&self.session
}

pub fn timeout(&self) -> Option<Duration> {
self.timeout
}

pub fn to_batch_task_context(&self) -> FrontendBatchTaskContext {
FrontendBatchTaskContext::new(self.session.env().clone(), self.session.auth_context())
}
Expand Down
Loading
Loading