Skip to content

Commit

Permalink
support idle in transaction session timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jan 15, 2024
1 parent 7c3edb1 commit 9f18c51
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions e2e_test/batch/transaction/idle_in_transaction_session_timeout.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
statement ok
set idle_in_transaction_session_timeout = 1000

statement ok
begin read only;

statement ok
select 1;

# wait enough time to trigger idle_in_transaction_session_timeout
sleep 2s

statement error
select 1;
4 changes: 4 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ pub struct ConfigMap {
#[parameter(default = 0u32)]
statement_timeout: u32,

/// Terminate any session that has been idle (that is, waiting for a client query) within an open transaction for longer than the specified amount of time in milliseconds.
#[parameter(default = 60000u32)]
idle_in_transaction_session_timeout: u32,

/// See <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-LOCK-TIMEOUT>
/// Unused in RisingWave, support for compatibility.
#[parameter(default = 0)]
Expand Down
23 changes: 23 additions & 0 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ pub struct SessionImpl {

/// execution context represents the lifetime of a running SQL in the current session
exec_context: Mutex<Option<Weak<ExecContext>>>,

/// Last idle instant
last_idle_instant: Arc<Mutex<Option<Instant>>>,
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -552,6 +555,7 @@ impl SessionImpl {
current_query_cancel_flag: Mutex::new(None),
notices: Default::default(),
exec_context: Mutex::new(None),
last_idle_instant: Default::default(),
}
}

Expand All @@ -577,6 +581,7 @@ impl SessionImpl {
8080,
))
.into(),
last_idle_instant: Default::default(),
}
}

Expand Down Expand Up @@ -658,6 +663,13 @@ impl SessionImpl {
.map(|context| context.last_instant.elapsed().as_millis())
}

pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
self.last_idle_instant
.lock()
.as_ref()
.map(|x| x.elapsed().as_millis())
}

pub fn check_relation_name_duplicated(
&self,
name: ObjectName,
Expand Down Expand Up @@ -1129,10 +1141,21 @@ impl Session for SessionImpl {
let exec_context = Arc::new(ExecContext {
running_sql: sql,
last_instant: Instant::now(),
last_idle_instant: self.last_idle_instant.clone(),
});
*self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
ExecContextGuard::new(exec_context)
}

fn check_idle_in_transaction_timeout(&self) -> bool {
if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
if let Some(elapse_since_last_idle_instant) = self.elapse_since_last_idle_instant() {
return elapse_since_last_idle_instant
> self.config().idle_in_transaction_session_timeout() as u128;
}
}
false
}
}

/// Returns row description of the statement
Expand Down
1 change: 1 addition & 0 deletions src/utils/pgwire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.12"
openssl = "0.10.60"
panic-message = "0.3"
parking_lot = "0.12"
risingwave_common = { workspace = true }
risingwave_sqlparser = { workspace = true }
thiserror = "1"
Expand Down
13 changes: 13 additions & 0 deletions src/utils/pgwire/src/pg_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,12 @@ where
record_sql_in_span(&sql);
let session = self.session.clone().unwrap();

if session.check_idle_in_transaction_timeout() {
self.process_terminate();
return Err(PsqlError::SimpleQueryError(
"terminating connection due to idle-in-transaction timeout".into(),
));
}
let _exec_context_guard = session.init_exec_context(sql.clone());
self.inner_process_query_msg(sql.clone(), session.clone())
.await
Expand Down Expand Up @@ -585,6 +591,7 @@ where
session: Arc<SM::Session>,
) -> PsqlResult<()> {
let session = session.clone();

// execute query
let res = session
.clone()
Expand Down Expand Up @@ -792,6 +799,12 @@ where
let sql: Arc<str> = Arc::from(format!("{}", portal));
record_sql_in_span(&sql);

if session.check_idle_in_transaction_timeout() {
self.process_terminate();
return Err(PsqlError::ExtendedPrepareError(
"terminating connection due to idle-in-transaction timeout".into(),
))?;
}
let _exec_context_guard = session.init_exec_context(sql.clone());
let result = session.clone().execute(portal).await;

Expand Down
16 changes: 16 additions & 0 deletions src/utils/pgwire/src/pg_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use parking_lot::Mutex;
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::Statement;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -112,6 +113,8 @@ pub trait Session: Send + Sync {
fn transaction_status(&self) -> TransactionStatus;

fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard;

fn check_idle_in_transaction_timeout(&self) -> bool;
}

/// Each session could run different SQLs multiple times.
Expand All @@ -120,6 +123,8 @@ pub struct ExecContext {
pub running_sql: Arc<str>,
/// The instant of the running sql
pub last_instant: Instant,
/// A reference used to update when `ExecContext` is dropped
pub last_idle_instant: Arc<Mutex<Option<Instant>>>,
}

/// `ExecContextGuard` holds a `Arc` pointer. Once `ExecContextGuard` is dropped,
Expand All @@ -132,6 +137,12 @@ impl ExecContextGuard {
}
}

impl Drop for ExecContext {
fn drop(&mut self) {
*self.last_idle_instant.lock() = Some(Instant::now());
}
}

#[derive(Debug, Clone)]
pub enum UserAuthenticator {
// No need to authenticate.
Expand Down Expand Up @@ -362,9 +373,14 @@ mod tests {
let exec_context = Arc::new(ExecContext {
running_sql: sql,
last_instant: Instant::now(),
last_idle_instant: Default::default(),
});
ExecContextGuard::new(exec_context)
}

fn check_idle_in_transaction_timeout(&self) -> bool {
false
}
}

async fn do_test_query(bind_addr: impl Into<String>, pg_config: impl Into<String>) {
Expand Down

0 comments on commit 9f18c51

Please sign in to comment.