Skip to content

Commit

Permalink
feat(frontend): support idle in transaction session timeout (#14566)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored and Little-Wallace committed Jan 20, 2024
1 parent 377249a commit 2880ca6
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ pub struct ConfigMap {
#[parameter(default = 0u32)]
statement_timeout: u32,

/// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds.
#[parameter(default = 60000u32)]
idle_in_transaction_session_timeout: u32,

/// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
/// Unused in RisingWave, support for compatibility.
#[parameter(default = 0)]
Expand Down
64 changes: 63 additions & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::time::{Duration, Instant};
use bytes::Bytes;
use either::Either;
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
use pgwire::error::{PsqlError, PsqlResult};
use pgwire::net::{Address, AddressRef};
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_message::TransactionStatus;
Expand Down Expand Up @@ -346,6 +347,24 @@ impl FrontendEnv {
.unwrap(),
));

let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
let sessions = sessions_map.clone();

// Idle transaction background monitor
let join_handle = tokio::spawn(async move {
let mut check_idle_txn_interval =
tokio::time::interval(core::time::Duration::from_secs(10));
check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
check_idle_txn_interval.reset();
loop {
check_idle_txn_interval.tick().await;
sessions.read().values().for_each(|session| {
let _ = session.check_idle_in_transaction_timeout();
})
}
});
join_handles.push(join_handle);

Ok((
Self {
catalog_reader,
Expand All @@ -359,7 +378,7 @@ impl FrontendEnv {
server_addr: frontend_address,
client_pool,
frontend_metrics,
sessions_map: Arc::new(RwLock::new(HashMap::new())),
sessions_map,
batch_config,
meta_config,
source_metrics,
Expand Down Expand Up @@ -524,6 +543,9 @@ pub struct SessionImpl {

/// execution context represents the lifetime of a running SQL in the current session
exec_context: Mutex<Option<Weak<ExecContext>>>,

/// Last idle instant
last_idle_instant: Arc<Mutex<Option<Instant>>>,
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -562,6 +584,7 @@ impl SessionImpl {
current_query_cancel_flag: Mutex::new(None),
notices: Default::default(),
exec_context: Mutex::new(None),
last_idle_instant: Default::default(),
}
}

Expand All @@ -587,6 +610,7 @@ impl SessionImpl {
8080,
))
.into(),
last_idle_instant: Default::default(),
}
}

Expand Down Expand Up @@ -668,6 +692,13 @@ impl SessionImpl {
.map(|context| context.last_instant.elapsed().as_millis())
}

pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
self.last_idle_instant
.lock()
.as_ref()
.map(|x| x.elapsed().as_millis())
}

pub fn check_relation_name_duplicated(
&self,
name: ObjectName,
Expand Down Expand Up @@ -1139,10 +1170,41 @@ impl Session for SessionImpl {
let exec_context = Arc::new(ExecContext {
running_sql: sql,
last_instant: Instant::now(),
last_idle_instant: self.last_idle_instant.clone(),
});
*self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
// unset idle state, since there is a sql running
*self.last_idle_instant.lock() = None;
ExecContextGuard::new(exec_context)
}

/// Check whether idle transaction timeout.
/// If yes, unpin snapshot and return an `IdleInTxnTimeout` error.
fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
// In transaction.
if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
let idle_in_transaction_session_timeout =
self.config().idle_in_transaction_session_timeout() as u128;
// Idle transaction timeout has been enabled.
if idle_in_transaction_session_timeout != 0 {
// Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot.
let guard = self.exec_context.lock();
// No running sql i.e. idle
if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
// Idle timeout.
if let Some(elapse_since_last_idle_instant) =
self.elapse_since_last_idle_instant()
{
if elapse_since_last_idle_instant > idle_in_transaction_session_timeout {
self.unpin_snapshot();
return Err(PsqlError::IdleInTxnTimeout);
}
}
}
}
}
Ok(())
}
}

/// Returns row description of the statement
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/session/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ impl SessionImpl {
})
}

/// Unpin snapshot by replacing the snapshot with None.
pub fn unpin_snapshot(&self) {
self.txn_ctx().snapshot = None;
}

/// Acquires and pins a snapshot for the current transaction.
///
/// If a snapshot is already acquired, returns it directly.
Expand Down
1 change: 1 addition & 0 deletions src/utils/pgwire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.12"
openssl = "0.10.60"
panic-message = "0.3"
parking_lot = "0.12"
risingwave_common = { workspace = true }
risingwave_sqlparser = { workspace = true }
thiserror = "1"
Expand Down
3 changes: 3 additions & 0 deletions src/utils/pgwire/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ This is a bug. We would appreciate a bug report at:

#[error("Unable to setup an SSL connection")]
SslError(#[from] openssl::ssl::Error),

#[error("terminating connection due to idle-in-transaction timeout")]
IdleInTxnTimeout,
}

impl PsqlError {
Expand Down
8 changes: 6 additions & 2 deletions src/utils/pgwire/src/pg_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,16 @@ where
self.ready_for_query().ok()?;
}

PsqlError::Panic(_) => {
PsqlError::IdleInTxnTimeout | PsqlError::Panic(_) => {
self.stream
.write_no_flush(&BeMessage::ErrorResponse(Box::new(e)))
.ok()?;
let _ = self.stream.flush().await;

// Catching the panic during message processing may leave the session in an
// 1. Catching the panic during message processing may leave the session in an
// inconsistent state. We forcefully close the connection (then end the
// session) here for safety.
// 2. Idle in transaction timeout should also close the connection.
return None;
}

Expand Down Expand Up @@ -550,6 +551,7 @@ where
record_sql_in_span(&sql);
let session = self.session.clone().unwrap();

session.check_idle_in_transaction_timeout()?;
let _exec_context_guard = session.init_exec_context(sql.clone());
self.inner_process_query_msg(sql.clone(), session.clone())
.await
Expand Down Expand Up @@ -585,6 +587,7 @@ where
session: Arc<SM::Session>,
) -> PsqlResult<()> {
let session = session.clone();

// execute query
let res = session
.clone()
Expand Down Expand Up @@ -792,6 +795,7 @@ where
let sql: Arc<str> = Arc::from(format!("{}", portal));
record_sql_in_span(&sql);

session.check_idle_in_transaction_timeout()?;
let _exec_context_guard = session.init_exec_context(sql.clone());
let result = session.clone().execute(portal).await;

Expand Down
18 changes: 18 additions & 0 deletions src/utils/pgwire/src/pg_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use parking_lot::Mutex;
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::Statement;
use thiserror_ext::AsReport;
use tokio::io::{AsyncRead, AsyncWrite};

use crate::error::PsqlResult;
use crate::net::{AddressRef, Listener};
use crate::pg_field_descriptor::PgFieldDescriptor;
use crate::pg_message::TransactionStatus;
Expand Down Expand Up @@ -112,6 +114,8 @@ pub trait Session: Send + Sync {
fn transaction_status(&self) -> TransactionStatus;

fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard;

fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()>;
}

/// Each session could run different SQLs multiple times.
Expand All @@ -120,6 +124,8 @@ pub struct ExecContext {
pub running_sql: Arc<str>,
/// The instant of the running sql
pub last_instant: Instant,
/// A reference used to update when `ExecContext` is dropped
pub last_idle_instant: Arc<Mutex<Option<Instant>>>,
}

/// `ExecContextGuard` holds a `Arc` pointer. Once `ExecContextGuard` is dropped,
Expand All @@ -132,6 +138,12 @@ impl ExecContextGuard {
}
}

impl Drop for ExecContext {
fn drop(&mut self) {
*self.last_idle_instant.lock() = Some(Instant::now());
}
}

#[derive(Debug, Clone)]
pub enum UserAuthenticator {
// No need to authenticate.
Expand Down Expand Up @@ -225,6 +237,7 @@ mod tests {
use risingwave_sqlparser::ast::Statement;
use tokio_postgres::NoTls;

use crate::error::PsqlResult;
use crate::pg_field_descriptor::PgFieldDescriptor;
use crate::pg_message::TransactionStatus;
use crate::pg_response::{PgResponse, RowSetResult, StatementType};
Expand Down Expand Up @@ -362,9 +375,14 @@ mod tests {
let exec_context = Arc::new(ExecContext {
running_sql: sql,
last_instant: Instant::now(),
last_idle_instant: Default::default(),
});
ExecContextGuard::new(exec_context)
}

fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
Ok(())
}
}

async fn do_test_query(bind_addr: impl Into<String>, pg_config: impl Into<String>) {
Expand Down

0 comments on commit 2880ca6

Please sign in to comment.