diff --git a/Cargo.lock b/Cargo.lock index 009012b95998f..663875ba23db9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7261,6 +7261,7 @@ dependencies = [ "madsim-tokio", "openssl", "panic-message", + "parking_lot 0.12.1", "risingwave_common", "risingwave_sqlparser", "tempfile", diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index daef5faf1e240..a2d5381147f72 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -207,6 +207,10 @@ pub struct ConfigMap { #[parameter(default = 0u32)] statement_timeout: u32, + /// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds. + #[parameter(default = 60000u32)] + idle_in_transaction_session_timeout: u32, + /// See /// Unused in RisingWave, support for compatibility. #[parameter(default = 0)] diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 10b4fe8f32954..ee7dc4e0965a3 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; use either::Either; use parking_lot::{Mutex, RwLock, RwLockReadGuard}; +use pgwire::error::{PsqlError, PsqlResult}; use pgwire::net::{Address, AddressRef}; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_message::TransactionStatus; @@ -346,6 +347,24 @@ impl FrontendEnv { .unwrap(), )); + let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new())); + let sessions = sessions_map.clone(); + + // Idle transaction background monitor + let join_handle = tokio::spawn(async move { + let mut check_idle_txn_interval = + tokio::time::interval(core::time::Duration::from_secs(10)); + check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + check_idle_txn_interval.reset(); + loop { + check_idle_txn_interval.tick().await; + sessions.read().values().for_each(|session| { + let _ = session.check_idle_in_transaction_timeout(); + }) + } + }); + join_handles.push(join_handle); + Ok(( Self { catalog_reader, @@ -359,7 +378,7 @@ impl FrontendEnv { server_addr: frontend_address, client_pool, frontend_metrics, - sessions_map: Arc::new(RwLock::new(HashMap::new())), + sessions_map, batch_config, meta_config, source_metrics, @@ -524,6 +543,9 @@ pub struct SessionImpl { /// execution context represents the lifetime of a running SQL in the current session exec_context: Mutex>>, + + /// Last idle instant + last_idle_instant: Arc>>, } #[derive(Error, Debug)] @@ -562,6 +584,7 @@ impl SessionImpl { current_query_cancel_flag: Mutex::new(None), notices: Default::default(), exec_context: Mutex::new(None), + last_idle_instant: Default::default(), } } @@ -587,6 +610,7 @@ impl SessionImpl { 8080, )) .into(), + last_idle_instant: Default::default(), } } @@ -668,6 +692,13 @@ impl SessionImpl { .map(|context| context.last_instant.elapsed().as_millis()) } + pub fn elapse_since_last_idle_instant(&self) -> Option { + self.last_idle_instant + .lock() + .as_ref() + .map(|x| x.elapsed().as_millis()) + } + pub fn check_relation_name_duplicated( &self, name: ObjectName, @@ -1139,10 +1170,41 @@ impl Session for SessionImpl { let exec_context = Arc::new(ExecContext { running_sql: sql, last_instant: Instant::now(), + last_idle_instant: self.last_idle_instant.clone(), }); *self.exec_context.lock() = Some(Arc::downgrade(&exec_context)); + // unset idle state, since there is a sql running + *self.last_idle_instant.lock() = None; ExecContextGuard::new(exec_context) } + + /// Check whether idle transaction timeout. + /// If yes, unpin snapshot and return an `IdleInTxnTimeout` error. + fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> { + // In transaction. + if matches!(self.transaction_status(), TransactionStatus::InTransaction) { + let idle_in_transaction_session_timeout = + self.config().idle_in_transaction_session_timeout() as u128; + // Idle transaction timeout has been enabled. + if idle_in_transaction_session_timeout != 0 { + // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot. + let guard = self.exec_context.lock(); + // No running sql i.e. idle + if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() { + // Idle timeout. + if let Some(elapse_since_last_idle_instant) = + self.elapse_since_last_idle_instant() + { + if elapse_since_last_idle_instant > idle_in_transaction_session_timeout { + self.unpin_snapshot(); + return Err(PsqlError::IdleInTxnTimeout); + } + } + } + } + } + Ok(()) + } } /// Returns row description of the statement diff --git a/src/frontend/src/session/transaction.rs b/src/frontend/src/session/transaction.rs index 2acf602f485d4..3ec6ffa1346b7 100644 --- a/src/frontend/src/session/transaction.rs +++ b/src/frontend/src/session/transaction.rs @@ -195,6 +195,11 @@ impl SessionImpl { }) } + /// Unpin snapshot by replacing the snapshot with None. + pub fn unpin_snapshot(&self) { + self.txn_ctx().snapshot = None; + } + /// Acquires and pins a snapshot for the current transaction. /// /// If a snapshot is already acquired, returns it directly. diff --git a/src/utils/pgwire/Cargo.toml b/src/utils/pgwire/Cargo.toml index 51588721fb7de..0e5b4e98faefd 100644 --- a/src/utils/pgwire/Cargo.toml +++ b/src/utils/pgwire/Cargo.toml @@ -23,6 +23,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } itertools = "0.12" openssl = "0.10.60" panic-message = "0.3" +parking_lot = "0.12" risingwave_common = { workspace = true } risingwave_sqlparser = { workspace = true } thiserror = "1" diff --git a/src/utils/pgwire/src/error.rs b/src/utils/pgwire/src/error.rs index a78ecc7ef1eae..9352c913ef59a 100644 --- a/src/utils/pgwire/src/error.rs +++ b/src/utils/pgwire/src/error.rs @@ -51,6 +51,9 @@ This is a bug. We would appreciate a bug report at: #[error("Unable to setup an SSL connection")] SslError(#[from] openssl::ssl::Error), + + #[error("terminating connection due to idle-in-transaction timeout")] + IdleInTxnTimeout, } impl PsqlError { diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index 2f7c3572ee80a..87eed8e2241f0 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -347,15 +347,16 @@ where self.ready_for_query().ok()?; } - PsqlError::Panic(_) => { + PsqlError::IdleInTxnTimeout | PsqlError::Panic(_) => { self.stream .write_no_flush(&BeMessage::ErrorResponse(Box::new(e))) .ok()?; let _ = self.stream.flush().await; - // Catching the panic during message processing may leave the session in an + // 1. Catching the panic during message processing may leave the session in an // inconsistent state. We forcefully close the connection (then end the // session) here for safety. + // 2. Idle in transaction timeout should also close the connection. return None; } @@ -550,6 +551,7 @@ where record_sql_in_span(&sql); let session = self.session.clone().unwrap(); + session.check_idle_in_transaction_timeout()?; let _exec_context_guard = session.init_exec_context(sql.clone()); self.inner_process_query_msg(sql.clone(), session.clone()) .await @@ -585,6 +587,7 @@ where session: Arc, ) -> PsqlResult<()> { let session = session.clone(); + // execute query let res = session .clone() @@ -792,6 +795,7 @@ where let sql: Arc = Arc::from(format!("{}", portal)); record_sql_in_span(&sql); + session.check_idle_in_transaction_timeout()?; let _exec_context_guard = session.init_exec_context(sql.clone()); let result = session.clone().execute(portal).await; diff --git a/src/utils/pgwire/src/pg_server.rs b/src/utils/pgwire/src/pg_server.rs index 0a8d1dccddd05..9425781f54005 100644 --- a/src/utils/pgwire/src/pg_server.rs +++ b/src/utils/pgwire/src/pg_server.rs @@ -19,11 +19,13 @@ use std::sync::Arc; use std::time::Instant; use bytes::Bytes; +use parking_lot::Mutex; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::Statement; use thiserror_ext::AsReport; use tokio::io::{AsyncRead, AsyncWrite}; +use crate::error::PsqlResult; use crate::net::{AddressRef, Listener}; use crate::pg_field_descriptor::PgFieldDescriptor; use crate::pg_message::TransactionStatus; @@ -112,6 +114,8 @@ pub trait Session: Send + Sync { fn transaction_status(&self) -> TransactionStatus; fn init_exec_context(&self, sql: Arc) -> ExecContextGuard; + + fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()>; } /// Each session could run different SQLs multiple times. @@ -120,6 +124,8 @@ pub struct ExecContext { pub running_sql: Arc, /// The instant of the running sql pub last_instant: Instant, + /// A reference used to update when `ExecContext` is dropped + pub last_idle_instant: Arc>>, } /// `ExecContextGuard` holds a `Arc` pointer. Once `ExecContextGuard` is dropped, @@ -132,6 +138,12 @@ impl ExecContextGuard { } } +impl Drop for ExecContext { + fn drop(&mut self) { + *self.last_idle_instant.lock() = Some(Instant::now()); + } +} + #[derive(Debug, Clone)] pub enum UserAuthenticator { // No need to authenticate. @@ -225,6 +237,7 @@ mod tests { use risingwave_sqlparser::ast::Statement; use tokio_postgres::NoTls; + use crate::error::PsqlResult; use crate::pg_field_descriptor::PgFieldDescriptor; use crate::pg_message::TransactionStatus; use crate::pg_response::{PgResponse, RowSetResult, StatementType}; @@ -362,9 +375,14 @@ mod tests { let exec_context = Arc::new(ExecContext { running_sql: sql, last_instant: Instant::now(), + last_idle_instant: Default::default(), }); ExecContextGuard::new(exec_context) } + + fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> { + Ok(()) + } } async fn do_test_query(bind_addr: impl Into, pg_config: impl Into) {