From 9f18c51e6b3e989f5c460c582735daa7917f94c9 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 15 Jan 2024 15:29:33 +0800 Subject: [PATCH 1/7] support idle in transaction session timeout --- Cargo.lock | 1 + .../idle_in_transaction_session_timeout.slt | 14 +++++++++++ src/common/src/session_config/mod.rs | 4 ++++ src/frontend/src/session.rs | 23 +++++++++++++++++++ src/utils/pgwire/Cargo.toml | 1 + src/utils/pgwire/src/pg_protocol.rs | 13 +++++++++++ src/utils/pgwire/src/pg_server.rs | 16 +++++++++++++ 7 files changed, 72 insertions(+) create mode 100644 e2e_test/batch/transaction/idle_in_transaction_session_timeout.slt diff --git a/Cargo.lock b/Cargo.lock index 8950c51de77c1..519b76c68813b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7262,6 +7262,7 @@ dependencies = [ "madsim-tokio", "openssl", "panic-message", + "parking_lot 0.12.1", "risingwave_common", "risingwave_sqlparser", "tempfile", diff --git a/e2e_test/batch/transaction/idle_in_transaction_session_timeout.slt b/e2e_test/batch/transaction/idle_in_transaction_session_timeout.slt new file mode 100644 index 0000000000000..617de3e0cc27c --- /dev/null +++ b/e2e_test/batch/transaction/idle_in_transaction_session_timeout.slt @@ -0,0 +1,14 @@ +statement ok +set idle_in_transaction_session_timeout = 1000 + +statement ok +begin read only; + +statement ok +select 1; + +# wait enough time to trigger idle_in_transaction_session_timeout +sleep 2s + +statement error +select 1; diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index daef5faf1e240..a2d5381147f72 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -207,6 +207,10 @@ pub struct ConfigMap { #[parameter(default = 0u32)] statement_timeout: u32, + /// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds. + #[parameter(default = 60000u32)] + idle_in_transaction_session_timeout: u32, + /// See /// Unused in RisingWave, support for compatibility. #[parameter(default = 0)] diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index b5ef22590875d..8cb096a4c382e 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -514,6 +514,9 @@ pub struct SessionImpl { /// execution context represents the lifetime of a running SQL in the current session exec_context: Mutex>>, + + /// Last idle instant + last_idle_instant: Arc>>, } #[derive(Error, Debug)] @@ -552,6 +555,7 @@ impl SessionImpl { current_query_cancel_flag: Mutex::new(None), notices: Default::default(), exec_context: Mutex::new(None), + last_idle_instant: Default::default(), } } @@ -577,6 +581,7 @@ impl SessionImpl { 8080, )) .into(), + last_idle_instant: Default::default(), } } @@ -658,6 +663,13 @@ impl SessionImpl { .map(|context| context.last_instant.elapsed().as_millis()) } + pub fn elapse_since_last_idle_instant(&self) -> Option { + self.last_idle_instant + .lock() + .as_ref() + .map(|x| x.elapsed().as_millis()) + } + pub fn check_relation_name_duplicated( &self, name: ObjectName, @@ -1129,10 +1141,21 @@ impl Session for SessionImpl { let exec_context = Arc::new(ExecContext { running_sql: sql, last_instant: Instant::now(), + last_idle_instant: self.last_idle_instant.clone(), }); *self.exec_context.lock() = Some(Arc::downgrade(&exec_context)); ExecContextGuard::new(exec_context) } + + fn check_idle_in_transaction_timeout(&self) -> bool { + if matches!(self.transaction_status(), TransactionStatus::InTransaction) { + if let Some(elapse_since_last_idle_instant) = self.elapse_since_last_idle_instant() { + return elapse_since_last_idle_instant + > self.config().idle_in_transaction_session_timeout() as u128; + } + } + false + } } /// Returns row description of the statement diff --git a/src/utils/pgwire/Cargo.toml b/src/utils/pgwire/Cargo.toml index 51588721fb7de..0e5b4e98faefd 100644 --- a/src/utils/pgwire/Cargo.toml +++ b/src/utils/pgwire/Cargo.toml @@ -23,6 +23,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } itertools = "0.12" openssl = "0.10.60" panic-message = "0.3" +parking_lot = "0.12" risingwave_common = { workspace = true } risingwave_sqlparser = { workspace = true } thiserror = "1" diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index 2f7c3572ee80a..7095c8bc44a9c 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -550,6 +550,12 @@ where record_sql_in_span(&sql); let session = self.session.clone().unwrap(); + if session.check_idle_in_transaction_timeout() { + self.process_terminate(); + return Err(PsqlError::SimpleQueryError( + "terminating connection due to idle-in-transaction timeout".into(), + )); + } let _exec_context_guard = session.init_exec_context(sql.clone()); self.inner_process_query_msg(sql.clone(), session.clone()) .await @@ -585,6 +591,7 @@ where session: Arc, ) -> PsqlResult<()> { let session = session.clone(); + // execute query let res = session .clone() @@ -792,6 +799,12 @@ where let sql: Arc = Arc::from(format!("{}", portal)); record_sql_in_span(&sql); + if session.check_idle_in_transaction_timeout() { + self.process_terminate(); + return Err(PsqlError::ExtendedPrepareError( + "terminating connection due to idle-in-transaction timeout".into(), + ))?; + } let _exec_context_guard = session.init_exec_context(sql.clone()); let result = session.clone().execute(portal).await; diff --git a/src/utils/pgwire/src/pg_server.rs b/src/utils/pgwire/src/pg_server.rs index 0a8d1dccddd05..2af0be6cca6b8 100644 --- a/src/utils/pgwire/src/pg_server.rs +++ b/src/utils/pgwire/src/pg_server.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Instant; use bytes::Bytes; +use parking_lot::Mutex; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::Statement; use thiserror_ext::AsReport; @@ -112,6 +113,8 @@ pub trait Session: Send + Sync { fn transaction_status(&self) -> TransactionStatus; fn init_exec_context(&self, sql: Arc) -> ExecContextGuard; + + fn check_idle_in_transaction_timeout(&self) -> bool; } /// Each session could run different SQLs multiple times. @@ -120,6 +123,8 @@ pub struct ExecContext { pub running_sql: Arc, /// The instant of the running sql pub last_instant: Instant, + /// A reference used to update when `ExecContext` is dropped + pub last_idle_instant: Arc>>, } /// `ExecContextGuard` holds a `Arc` pointer. Once `ExecContextGuard` is dropped, @@ -132,6 +137,12 @@ impl ExecContextGuard { } } +impl Drop for ExecContext { + fn drop(&mut self) { + *self.last_idle_instant.lock() = Some(Instant::now()); + } +} + #[derive(Debug, Clone)] pub enum UserAuthenticator { // No need to authenticate. @@ -362,9 +373,14 @@ mod tests { let exec_context = Arc::new(ExecContext { running_sql: sql, last_instant: Instant::now(), + last_idle_instant: Default::default(), }); ExecContextGuard::new(exec_context) } + + fn check_idle_in_transaction_timeout(&self) -> bool { + false + } } async fn do_test_query(bind_addr: impl Into, pg_config: impl Into) { From 19f4b9f43a5babead3e8682e6cb8b4f42010055d Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 15 Jan 2024 18:42:53 +0800 Subject: [PATCH 2/7] refine --- src/frontend/src/session.rs | 12 ++++++++---- src/utils/pgwire/src/error.rs | 3 +++ src/utils/pgwire/src/pg_protocol.rs | 19 +++++-------------- src/utils/pgwire/src/pg_server.rs | 8 +++++--- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 8cb096a4c382e..aea16ca9471b0 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; use either::Either; use parking_lot::{Mutex, RwLock, RwLockReadGuard}; +use pgwire::error::{PsqlError, PsqlResult}; use pgwire::net::{Address, AddressRef}; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_message::TransactionStatus; @@ -1147,14 +1148,17 @@ impl Session for SessionImpl { ExecContextGuard::new(exec_context) } - fn check_idle_in_transaction_timeout(&self) -> bool { + fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> { if matches!(self.transaction_status(), TransactionStatus::InTransaction) { if let Some(elapse_since_last_idle_instant) = self.elapse_since_last_idle_instant() { - return elapse_since_last_idle_instant - > self.config().idle_in_transaction_session_timeout() as u128; + if elapse_since_last_idle_instant + > self.config().idle_in_transaction_session_timeout() as u128 + { + return Err(PsqlError::IdleInTxnTimeout); + } } } - false + Ok(()) } } diff --git a/src/utils/pgwire/src/error.rs b/src/utils/pgwire/src/error.rs index a78ecc7ef1eae..9352c913ef59a 100644 --- a/src/utils/pgwire/src/error.rs +++ b/src/utils/pgwire/src/error.rs @@ -51,6 +51,9 @@ This is a bug. We would appreciate a bug report at: #[error("Unable to setup an SSL connection")] SslError(#[from] openssl::ssl::Error), + + #[error("terminating connection due to idle-in-transaction timeout")] + IdleInTxnTimeout, } impl PsqlError { diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index 7095c8bc44a9c..87eed8e2241f0 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -347,15 +347,16 @@ where self.ready_for_query().ok()?; } - PsqlError::Panic(_) => { + PsqlError::IdleInTxnTimeout | PsqlError::Panic(_) => { self.stream .write_no_flush(&BeMessage::ErrorResponse(Box::new(e))) .ok()?; let _ = self.stream.flush().await; - // Catching the panic during message processing may leave the session in an + // 1. Catching the panic during message processing may leave the session in an // inconsistent state. We forcefully close the connection (then end the // session) here for safety. + // 2. Idle in transaction timeout should also close the connection. return None; } @@ -550,12 +551,7 @@ where record_sql_in_span(&sql); let session = self.session.clone().unwrap(); - if session.check_idle_in_transaction_timeout() { - self.process_terminate(); - return Err(PsqlError::SimpleQueryError( - "terminating connection due to idle-in-transaction timeout".into(), - )); - } + session.check_idle_in_transaction_timeout()?; let _exec_context_guard = session.init_exec_context(sql.clone()); self.inner_process_query_msg(sql.clone(), session.clone()) .await @@ -799,12 +795,7 @@ where let sql: Arc = Arc::from(format!("{}", portal)); record_sql_in_span(&sql); - if session.check_idle_in_transaction_timeout() { - self.process_terminate(); - return Err(PsqlError::ExtendedPrepareError( - "terminating connection due to idle-in-transaction timeout".into(), - ))?; - } + session.check_idle_in_transaction_timeout()?; let _exec_context_guard = session.init_exec_context(sql.clone()); let result = session.clone().execute(portal).await; diff --git a/src/utils/pgwire/src/pg_server.rs b/src/utils/pgwire/src/pg_server.rs index 2af0be6cca6b8..9425781f54005 100644 --- a/src/utils/pgwire/src/pg_server.rs +++ b/src/utils/pgwire/src/pg_server.rs @@ -25,6 +25,7 @@ use risingwave_sqlparser::ast::Statement; use thiserror_ext::AsReport; use tokio::io::{AsyncRead, AsyncWrite}; +use crate::error::PsqlResult; use crate::net::{AddressRef, Listener}; use crate::pg_field_descriptor::PgFieldDescriptor; use crate::pg_message::TransactionStatus; @@ -114,7 +115,7 @@ pub trait Session: Send + Sync { fn init_exec_context(&self, sql: Arc) -> ExecContextGuard; - fn check_idle_in_transaction_timeout(&self) -> bool; + fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()>; } /// Each session could run different SQLs multiple times. @@ -236,6 +237,7 @@ mod tests { use risingwave_sqlparser::ast::Statement; use tokio_postgres::NoTls; + use crate::error::PsqlResult; use crate::pg_field_descriptor::PgFieldDescriptor; use crate::pg_message::TransactionStatus; use crate::pg_response::{PgResponse, RowSetResult, StatementType}; @@ -378,8 +380,8 @@ mod tests { ExecContextGuard::new(exec_context) } - fn check_idle_in_transaction_timeout(&self) -> bool { - false + fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> { + Ok(()) } } From 8a74663b864d6160c01c80d09f6ad216bc6baa70 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 15 Jan 2024 21:13:45 +0800 Subject: [PATCH 3/7] add a background monitor --- src/frontend/src/session.rs | 23 ++++++++++++++++++++++- src/frontend/src/session/transaction.rs | 5 +++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index aea16ca9471b0..383ddcff697fb 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -326,6 +326,24 @@ impl FrontendEnv { let creating_streaming_job_tracker = Arc::new(StreamingJobTracker::new(frontend_meta_client.clone())); + let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new())); + let sessions = sessions_map.clone(); + + // Idle transaction background monitor + let join_handle = tokio::spawn(async move { + let mut check_idle_txn_interval = + tokio::time::interval(core::time::Duration::from_secs(5)); + check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + check_idle_txn_interval.reset(); + loop { + check_idle_txn_interval.tick().await; + sessions.read().values().for_each(|session| { + let _ = session.check_idle_in_transaction_timeout(); + }) + } + }); + join_handles.push(join_handle); + Ok(( Self { catalog_reader, @@ -339,7 +357,7 @@ impl FrontendEnv { server_addr: frontend_address, client_pool, frontend_metrics, - sessions_map: Arc::new(RwLock::new(HashMap::new())), + sessions_map, batch_config, meta_config, source_metrics, @@ -1148,12 +1166,15 @@ impl Session for SessionImpl { ExecContextGuard::new(exec_context) } + /// Check whether idle transaction timeout. + /// If yes, unpin snapshot and return an `IdleInTxnTimeout` error. fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> { if matches!(self.transaction_status(), TransactionStatus::InTransaction) { if let Some(elapse_since_last_idle_instant) = self.elapse_since_last_idle_instant() { if elapse_since_last_idle_instant > self.config().idle_in_transaction_session_timeout() as u128 { + self.unpin_snapshot(); return Err(PsqlError::IdleInTxnTimeout); } } diff --git a/src/frontend/src/session/transaction.rs b/src/frontend/src/session/transaction.rs index 2acf602f485d4..3ec6ffa1346b7 100644 --- a/src/frontend/src/session/transaction.rs +++ b/src/frontend/src/session/transaction.rs @@ -195,6 +195,11 @@ impl SessionImpl { }) } + /// Unpin snapshot by replacing the snapshot with None. + pub fn unpin_snapshot(&self) { + self.txn_ctx().snapshot = None; + } + /// Acquires and pins a snapshot for the current transaction. /// /// If a snapshot is already acquired, returns it directly. From 0d39532b4aa6c7641e41ee852e78e9194b3f84c2 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 16 Jan 2024 12:30:06 +0800 Subject: [PATCH 4/7] remove the test which could cause other test cases failed --- .../idle_in_transaction_session_timeout.slt | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 e2e_test/batch/transaction/idle_in_transaction_session_timeout.slt diff --git a/e2e_test/batch/transaction/idle_in_transaction_session_timeout.slt b/e2e_test/batch/transaction/idle_in_transaction_session_timeout.slt deleted file mode 100644 index 617de3e0cc27c..0000000000000 --- a/e2e_test/batch/transaction/idle_in_transaction_session_timeout.slt +++ /dev/null @@ -1,14 +0,0 @@ -statement ok -set idle_in_transaction_session_timeout = 1000 - -statement ok -begin read only; - -statement ok -select 1; - -# wait enough time to trigger idle_in_transaction_session_timeout -sleep 2s - -statement error -select 1; From 8a283910a31a92d1765d15d378c6d1a0c03a0916 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 17 Jan 2024 12:13:23 +0800 Subject: [PATCH 5/7] ensure no running sql when we dicide to unpin a snapshot in check_idle_in_transaction_timeout --- src/frontend/src/session.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 383ddcff697fb..36d1f4b915995 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -1163,19 +1163,29 @@ impl Session for SessionImpl { last_idle_instant: self.last_idle_instant.clone(), }); *self.exec_context.lock() = Some(Arc::downgrade(&exec_context)); + // unset idle state, since there is a sql running + *self.last_idle_instant.lock() = None; ExecContextGuard::new(exec_context) } /// Check whether idle transaction timeout. /// If yes, unpin snapshot and return an `IdleInTxnTimeout` error. fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> { + // In transaction. if matches!(self.transaction_status(), TransactionStatus::InTransaction) { - if let Some(elapse_since_last_idle_instant) = self.elapse_since_last_idle_instant() { - if elapse_since_last_idle_instant - > self.config().idle_in_transaction_session_timeout() as u128 + // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot. + let guard = self.exec_context.lock(); + // No running sql i.e. idle + if guard.is_none() { + // Idle timeout. + if let Some(elapse_since_last_idle_instant) = self.elapse_since_last_idle_instant() { - self.unpin_snapshot(); - return Err(PsqlError::IdleInTxnTimeout); + if elapse_since_last_idle_instant + > self.config().idle_in_transaction_session_timeout() as u128 + { + self.unpin_snapshot(); + return Err(PsqlError::IdleInTxnTimeout); + } } } } From 4f75d1cd424f5a106f4ec8f9a4ae7d66d9f3d57a Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 17 Jan 2024 12:30:57 +0800 Subject: [PATCH 6/7] fix --- src/frontend/src/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 36d1f4b915995..069a2b37c187c 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -1176,7 +1176,7 @@ impl Session for SessionImpl { // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot. let guard = self.exec_context.lock(); // No running sql i.e. idle - if guard.is_none() { + if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() { // Idle timeout. if let Some(elapse_since_last_idle_instant) = self.elapse_since_last_idle_instant() { From dcacd31850e808f8da7485fbe2b568e9a9c07502 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 17 Jan 2024 12:37:47 +0800 Subject: [PATCH 7/7] fix when idle_in_transaction_session_timeout = 0 --- src/frontend/src/session.rs | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 069a2b37c187c..5befe3e882b53 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -332,7 +332,7 @@ impl FrontendEnv { // Idle transaction background monitor let join_handle = tokio::spawn(async move { let mut check_idle_txn_interval = - tokio::time::interval(core::time::Duration::from_secs(5)); + tokio::time::interval(core::time::Duration::from_secs(10)); check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); check_idle_txn_interval.reset(); loop { @@ -1173,18 +1173,22 @@ impl Session for SessionImpl { fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> { // In transaction. if matches!(self.transaction_status(), TransactionStatus::InTransaction) { - // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot. - let guard = self.exec_context.lock(); - // No running sql i.e. idle - if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() { - // Idle timeout. - if let Some(elapse_since_last_idle_instant) = self.elapse_since_last_idle_instant() - { - if elapse_since_last_idle_instant - > self.config().idle_in_transaction_session_timeout() as u128 + let idle_in_transaction_session_timeout = + self.config().idle_in_transaction_session_timeout() as u128; + // Idle transaction timeout has been enabled. + if idle_in_transaction_session_timeout != 0 { + // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot. + let guard = self.exec_context.lock(); + // No running sql i.e. idle + if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() { + // Idle timeout. + if let Some(elapse_since_last_idle_instant) = + self.elapse_since_last_idle_instant() { - self.unpin_snapshot(); - return Err(PsqlError::IdleInTxnTimeout); + if elapse_since_last_idle_instant > idle_in_transaction_session_timeout { + self.unpin_snapshot(); + return Err(PsqlError::IdleInTxnTimeout); + } } } }