diff --git a/diesel/Cargo.toml b/diesel/Cargo.toml index e754f195a8ae..06e99b2bbda6 100644 --- a/diesel/Cargo.toml +++ b/diesel/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "diesel" -version = "2.1.1" +version = "2.1.4" license = "MIT OR Apache-2.0" description = "A safe, extensible ORM and Query Builder for PostgreSQL, SQLite, and MySQL" readme = "README.md" diff --git a/diesel/src/connection/instrumentation.rs b/diesel/src/connection/instrumentation.rs new file mode 100644 index 000000000000..a7444425022b --- /dev/null +++ b/diesel/src/connection/instrumentation.rs @@ -0,0 +1,304 @@ +use std::fmt::Debug; +use std::fmt::Display; +use std::num::NonZeroU32; +use std::ops::DerefMut; + +static GLOBAL_INSTRUMENTATION: std::sync::RwLock Option>> = + std::sync::RwLock::new(default_instrumentation); + +pub trait DebugQuery: Debug + Display {} + +impl DebugQuery for crate::query_builder::DebugQuery<'_, T, DB> where Self: Debug + Display {} + +/// A helper type that allows printing out str slices +/// +/// This type is necessary because it's not possible +/// to cast from a reference of a unsized type like `&str` +/// to a reference of a trait object even if that +/// type implements all necessary traits +#[diesel_derives::__diesel_public_if( + feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes" +)] +pub(crate) struct StrQueryHelper<'a> { + s: &'a str, +} + +impl<'a> StrQueryHelper<'a> { + /// Construct a new `StrQueryHelper` + #[diesel_derives::__diesel_public_if( + feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes" + )] + pub(crate) fn new(s: &'a str) -> Self { + Self { s } + } +} + +impl Debug for StrQueryHelper<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(self.s, f) + } +} + +impl Display for StrQueryHelper<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.s, f) + } +} + +impl DebugQuery for StrQueryHelper<'_> {} + +/// This enum describes possible connection events +/// that can be handled by an [`Instrumentation`] implementation +/// +/// Some fields might contain sensitive information, like login +/// details for the database. +/// +/// Diesel does not guarantee that future versions will +/// emit the same events in the same order or timing. +/// In addition the output of the [`Debug`] and [`Display`] +/// implementation of the enum itself and any of its fields +/// is not guarantee to be stable. +// +// This types is carefully designed +// to avoid any potential overhead by +// taking references for all things +// and by not performing any additional +// work until required. +#[derive(Debug)] +#[non_exhaustive] +pub enum InstrumentationEvent<'a> { + /// An event emitted by before starting + /// establishing a new connection + #[non_exhaustive] + StartEstablishConnection { + /// The database url the connection + /// tries to connect to + /// + /// This might contain sensitive information + /// like the database password + url: &'a str, + }, + /// An event emitted after establishing a + /// new connection + #[non_exhaustive] + FinishEstablishConnection { + /// The database url the connection + /// tries is connected to + /// + /// This might contain sensitive information + /// like the database password + url: &'a str, + /// An optional error if the connection failed + error: Option<&'a crate::result::ConnectionError>, + }, + /// An event that is emitted before executing + /// a query + #[non_exhaustive] + StartQuery { + /// A opaque representation of the query + /// + /// This type implements [`Debug`] and [`Display`], + /// but should be considered otherwise as opaque. + /// + /// The exact output of the [`Debug`] and [`Display`] + /// implementation is not considered as part of the + /// stable API. + query: &'a dyn DebugQuery, + }, + /// An event that is emitted when a query + /// is cached in the connection internal + /// prepared statement cache + #[non_exhaustive] + CacheQuery { + /// SQL string of the cached query + sql: &'a str, + }, + /// An event that is emitted after executing + /// a query + #[non_exhaustive] + FinishQuery { + /// A opaque representation of the query + /// + /// This type implements [`Debug`] and [`Display`], + /// but should be considered otherwise as opaque. + /// + /// The exact output of the [`Debug`] and [`Display`] + /// implementation is not considered as part of the + /// stable API. + query: &'a dyn DebugQuery, + /// An optional error if the connection failed + error: Option<&'a crate::result::Error>, + }, + /// An event that is emitted while + /// starting a new transaction + #[non_exhaustive] + BeginTransaction { + /// Transaction level of the newly started + /// transaction + depth: NonZeroU32, + }, + /// An event that is emitted while + /// committing a transaction + #[non_exhaustive] + CommitTransaction { + /// Transaction level of the to be committed + /// transaction + depth: NonZeroU32, + }, + /// An event that is emitted while + /// rolling back a transaction + #[non_exhaustive] + RollbackTransaction { + /// Transaction level of the to be rolled + /// back transaction + depth: NonZeroU32, + }, +} + +// these constructors exist to +// keep `#[non_exhaustive]` on all the variants +// and to gate the constructors on the unstable feature +impl<'a> InstrumentationEvent<'a> { + /// Create a new `InstrumentationEvent::StartEstablishConnection` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn start_establish_connection(url: &'a str) -> Self { + Self::StartEstablishConnection { url } + } + + /// Create a new `InstrumentationEvent::FinishEstablishConnection` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn finish_establish_connection( + url: &'a str, + error: Option<&'a crate::result::ConnectionError>, + ) -> Self { + Self::FinishEstablishConnection { url, error } + } + + /// Create a new `InstrumentationEvent::StartQuery` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn start_query(query: &'a dyn DebugQuery) -> Self { + Self::StartQuery { query } + } + + /// Create a new `InstrumentationEvent::CacheQuery` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn cache_query(sql: &'a str) -> Self { + Self::CacheQuery { sql } + } + + /// Create a new `InstrumentationEvent::FinishQuery` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn finish_query( + query: &'a dyn DebugQuery, + error: Option<&'a crate::result::Error>, + ) -> Self { + Self::FinishQuery { query, error } + } + + /// Create a new `InstrumentationEvent::BeginTransaction` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn begin_transaction(depth: NonZeroU32) -> Self { + Self::BeginTransaction { depth } + } + + /// Create a new `InstrumentationEvent::RollbackTransaction` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn rollback_transaction(depth: NonZeroU32) -> Self { + Self::RollbackTransaction { depth } + } + + /// Create a new `InstrumentationEvent::CommitTransaction` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn commit_transaction(depth: NonZeroU32) -> Self { + Self::CommitTransaction { depth } + } +} + +/// A type that provides an connection `Instrumentation` +/// +/// This trait is the basic building block for logging or +/// otherwise instrumenting diesel connection types. It +/// acts as callback that receives information about certain +/// important connection states +/// +/// For simple usages this trait is implemented for closures +/// accepting a [`InstrumentationEvent`] as argument. +/// +/// More complex usages and integrations with frameworks like +/// `tracing` and `log` are supposed to be part of their own +/// crates. +pub trait Instrumentation: Send + 'static { + /// The function that is invoced for each event + fn on_connection_event(&mut self, event: InstrumentationEvent<'_>); +} + +fn default_instrumentation() -> Option> { + None +} + +/// Get an instance of the default [`Instrumentation`] +/// +/// This function is mostly useful for crates implementing +/// their own connection types +pub fn get_default_instrumentation() -> Option> { + match GLOBAL_INSTRUMENTATION.read() { + Ok(f) => (*f)(), + Err(_) => None, + } +} + +/// Set a custom constructor for the default [`Instrumentation`] +/// used by new connections +/// +/// ```rust +/// use diesel::connection::{set_default_instrumentation, Instrumentation, InstrumentationEvent}; +/// +/// // a simple logger that prints all events to stdout +/// fn simple_logger() -> Option> { +/// // we need the explicit argument type there due +/// // to bugs in rustc +/// Some(Box::new(|event: InstrumentationEvent<'_>| println!("{event:?}"))) +/// } +/// +/// set_default_instrumentation(simple_logger); +/// ``` +pub fn set_default_instrumentation( + default: fn() -> Option>, +) -> crate::QueryResult<()> { + match GLOBAL_INSTRUMENTATION.write() { + Ok(mut l) => { + *l = default; + Ok(()) + } + Err(e) => Err(crate::result::Error::DatabaseError( + crate::result::DatabaseErrorKind::Unknown, + Box::new(e.to_string()), + )), + } +} + +impl Instrumentation for F +where + F: FnMut(InstrumentationEvent<'_>) + Send + 'static, +{ + fn on_connection_event(&mut self, event: InstrumentationEvent<'_>) { + (self)(event) + } +} + +impl Instrumentation for Box { + fn on_connection_event(&mut self, event: InstrumentationEvent<'_>) { + self.deref_mut().on_connection_event(event) + } +} + +impl Instrumentation for Option +where + T: Instrumentation, +{ + fn on_connection_event(&mut self, event: InstrumentationEvent<'_>) { + if let Some(i) = self { + i.on_connection_event(event) + } + } +} diff --git a/diesel/src/connection/mod.rs b/diesel/src/connection/mod.rs index 788014c0f927..af819cad8c9a 100644 --- a/diesel/src/connection/mod.rs +++ b/diesel/src/connection/mod.rs @@ -1,5 +1,6 @@ //! Types related to database connections +pub(crate) mod instrumentation; #[cfg(all( not(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"), any(feature = "sqlite", feature = "postgres", feature = "mysql") @@ -15,6 +16,11 @@ use crate::query_builder::{Query, QueryFragment, QueryId}; use crate::result::*; use std::fmt::Debug; +#[doc(inline)] +pub use self::instrumentation::{ + get_default_instrumentation, set_default_instrumentation, Instrumentation, InstrumentationEvent, +}; +#[doc(inline)] pub use self::transaction_manager::{ AnsiTransactionManager, InTransactionStatus, TransactionDepthChange, TransactionManager, TransactionManagerStatus, ValidTransactionManagerStatus, @@ -28,6 +34,9 @@ pub(crate) use self::private::ConnectionSealed; #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] pub use self::private::MultiConnectionHelper; +#[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] +pub use self::instrumentation::StrQueryHelper; + #[cfg(all( not(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"), any(feature = "sqlite", feature = "postgres", feature = "mysql") @@ -381,6 +390,14 @@ where fn transaction_state( &mut self, ) -> &mut >::TransactionStateData; + + #[diesel_derives::__diesel_public_if( + feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes" + )] + fn instrumentation(&mut self) -> &mut dyn Instrumentation; + + /// Set a specific [`Instrumentation`] implementation for this connection + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation); } /// The specific part of a [`Connection`] which actually loads data from the database diff --git a/diesel/src/connection/statement_cache.rs b/diesel/src/connection/statement_cache.rs index caf38bb9936b..2d4cd7ef4a18 100644 --- a/diesel/src/connection/statement_cache.rs +++ b/diesel/src/connection/statement_cache.rs @@ -99,9 +99,12 @@ use std::hash::Hash; use std::ops::{Deref, DerefMut}; use crate::backend::Backend; +use crate::connection::InstrumentationEvent; use crate::query_builder::*; use crate::result::QueryResult; +use super::Instrumentation; + /// A prepared statement cache #[allow(missing_debug_implementations, unreachable_pub)] #[cfg_attr( @@ -184,6 +187,7 @@ where backend: &DB, bind_types: &[DB::TypeMetadata], mut prepare_fn: F, + instrumentation: &mut dyn Instrumentation, ) -> QueryResult> where T: QueryFragment + QueryId, @@ -195,6 +199,7 @@ where backend, bind_types, &mut prepare_fn, + instrumentation, ) } @@ -206,6 +211,7 @@ where backend: &DB, bind_types: &[DB::TypeMetadata], prepare_fn: &mut dyn FnMut(&str, PrepareForCache) -> QueryResult, + instrumentation: &mut dyn Instrumentation, ) -> QueryResult> { use std::collections::hash_map::Entry::{Occupied, Vacant}; @@ -221,6 +227,8 @@ where Vacant(entry) => { let statement = { let sql = entry.key().sql(source, backend)?; + instrumentation + .on_connection_event(InstrumentationEvent::CacheQuery { sql: &sql }); prepare_fn(&sql, PrepareForCache::Yes) }; diff --git a/diesel/src/connection/transaction_manager.rs b/diesel/src/connection/transaction_manager.rs index de9ba5e7cca4..703d51098991 100644 --- a/diesel/src/connection/transaction_manager.rs +++ b/diesel/src/connection/transaction_manager.rs @@ -336,12 +336,19 @@ where fn begin_transaction(conn: &mut Conn) -> QueryResult<()> { let transaction_state = Self::get_transaction_state(conn)?; - let start_transaction_sql = match transaction_state.transaction_depth() { + let transaction_depth = transaction_state.transaction_depth(); + let start_transaction_sql = match transaction_depth { None => Cow::from("BEGIN"), Some(transaction_depth) => { Cow::from(format!("SAVEPOINT diesel_savepoint_{transaction_depth}")) } }; + let depth = transaction_depth + .and_then(|d| d.checked_add(1)) + .unwrap_or(NonZeroU32::new(1).expect("It's not 0")); + conn.instrumentation().on_connection_event( + super::instrumentation::InstrumentationEvent::BeginTransaction { depth }, + ); conn.batch_execute(&start_transaction_sql)?; Self::get_transaction_state(conn)? .change_transaction_depth(TransactionDepthChange::IncreaseDepth)?; @@ -371,6 +378,12 @@ where ), None => return Err(Error::NotInTransaction), }; + let depth = transaction_state + .transaction_depth() + .expect("We know that we are in a transaction here"); + conn.instrumentation().on_connection_event( + super::instrumentation::InstrumentationEvent::RollbackTransaction { depth }, + ); match conn.batch_execute(&rollback_sql) { Ok(()) => { @@ -449,6 +462,12 @@ where false, ), }; + let depth = transaction_state + .transaction_depth() + .expect("We know that we are in a transaction here"); + conn.instrumentation().on_connection_event( + super::instrumentation::InstrumentationEvent::CommitTransaction { depth }, + ); match conn.batch_execute(&commit_sql) { Ok(()) => { match Self::get_transaction_state(conn)? @@ -500,6 +519,7 @@ mod test { // Mock connection. mod mock { use crate::connection::transaction_manager::AnsiTransactionManager; + use crate::connection::Instrumentation; use crate::connection::{ Connection, ConnectionSealed, SimpleConnection, TransactionManager, }; @@ -512,6 +532,7 @@ mod test { pub(crate) next_batch_execute_results: VecDeque>, pub(crate) top_level_requires_rollback_after_next_batch_execute: bool, transaction_state: AnsiTransactionManager, + instrumentation: Option>, } impl SimpleConnection for MockConnection { @@ -542,6 +563,7 @@ mod test { next_batch_execute_results: VecDeque::new(), top_level_requires_rollback_after_next_batch_execute: false, transaction_state: AnsiTransactionManager::default(), + instrumentation: None, }) } @@ -559,6 +581,17 @@ mod test { { &mut self.transaction_state } + + fn instrumentation(&mut self) -> &mut dyn crate::connection::Instrumentation { + &mut self.instrumentation + } + + fn set_instrumentation( + &mut self, + instrumentation: impl crate::connection::Instrumentation, + ) { + self.instrumentation = Some(Box::new(instrumentation)); + } } } diff --git a/diesel/src/mysql/connection/mod.rs b/diesel/src/mysql/connection/mod.rs index f61770e005af..011558bdaed9 100644 --- a/diesel/src/mysql/connection/mod.rs +++ b/diesel/src/mysql/connection/mod.rs @@ -8,6 +8,9 @@ use self::stmt::iterator::StatementIterator; use self::stmt::Statement; use self::url::ConnectionOptions; use super::backend::Mysql; +use crate::connection::instrumentation::DebugQuery; +use crate::connection::instrumentation::InstrumentationEvent; +use crate::connection::instrumentation::StrQueryHelper; use crate::connection::statement_cache::{MaybeCached, StatementCache}; use crate::connection::*; use crate::expression::QueryMetadata; @@ -109,6 +112,7 @@ pub struct MysqlConnection { raw_connection: RawConnection, transaction_state: AnsiTransactionManager, statement_cache: StatementCache, + instrumentation: Option>, } // mysql connection can be shared between threads according to libmysqlclients documentation @@ -117,8 +121,19 @@ unsafe impl Send for MysqlConnection {} impl SimpleConnection for MysqlConnection { fn batch_execute(&mut self, query: &str) -> QueryResult<()> { - self.raw_connection - .enable_multi_statements(|| self.raw_connection.execute(query)) + self.instrumentation + .on_connection_event(InstrumentationEvent::StartQuery { + query: &StrQueryHelper::new(query), + }); + let r = self + .raw_connection + .enable_multi_statements(|| self.raw_connection.execute(query)); + self.instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: &StrQueryHelper::new(query), + error: r.as_ref().err(), + }); + r } } @@ -142,18 +157,18 @@ impl Connection for MysqlConnection { /// * `ssl_mode` expects a value defined for MySQL client command option `--ssl-mode` /// See fn establish(database_url: &str) -> ConnectionResult { - use crate::result::ConnectionError::CouldntSetupConfiguration; + let mut instrumentation = crate::connection::instrumentation::get_default_instrumentation(); + instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection { + url: database_url, + }); - let raw_connection = RawConnection::new(); - let connection_options = ConnectionOptions::parse(database_url)?; - raw_connection.connect(&connection_options)?; - let mut conn = MysqlConnection { - raw_connection, - transaction_state: AnsiTransactionManager::default(), - statement_cache: StatementCache::new(), - }; - conn.set_config_options() - .map_err(CouldntSetupConfiguration)?; + let establish_result = Self::establish_inner(database_url); + instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection { + url: database_url, + error: establish_result.as_ref().err(), + }); + let mut conn = establish_result?; + conn.instrumentation = instrumentation; Ok(conn) } @@ -163,33 +178,53 @@ impl Connection for MysqlConnection { { #[allow(unsafe_code)] // call to unsafe function update_transaction_manager_status( - prepared_query(&source, &mut self.statement_cache, &mut self.raw_connection).and_then( - |stmt| { - // we have not called result yet, so calling `execute` is - // fine - let stmt_use = unsafe { stmt.execute() }?; - Ok(stmt_use.affected_rows()) - }, - ), + prepared_query( + &source, + &mut self.statement_cache, + &mut self.raw_connection, + &mut self.instrumentation, + ) + .and_then(|stmt| { + // we have not called result yet, so calling `execute` is + // fine + let stmt_use = unsafe { stmt.execute() }?; + Ok(stmt_use.affected_rows()) + }), &mut self.transaction_state, + &mut self.instrumentation, + &crate::debug_query(source), ) } fn transaction_state(&mut self) -> &mut AnsiTransactionManager { &mut self.transaction_state } + + fn instrumentation(&mut self) -> &mut dyn Instrumentation { + &mut self.instrumentation + } + + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { + self.instrumentation = Some(Box::new(instrumentation)); + } } #[inline(always)] fn update_transaction_manager_status( query_result: QueryResult, transaction_manager: &mut AnsiTransactionManager, + instrumentation: &mut Option>, + query: &dyn DebugQuery, ) -> QueryResult { if let Err(Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result { transaction_manager .status .set_requires_rollback_maybe_up_to_top_level(true) } + instrumentation.on_connection_event(InstrumentationEvent::FinishQuery { + query, + error: query_result.as_ref().err(), + }); query_result } @@ -206,14 +241,20 @@ impl LoadConnection for MysqlConnection { Self::Backend: QueryMetadata, { update_transaction_manager_status( - prepared_query(&source, &mut self.statement_cache, &mut self.raw_connection).and_then( - |stmt| { - let mut metadata = Vec::new(); - Mysql::row_metadata(&mut (), &mut metadata); - StatementIterator::from_stmt(stmt, &metadata) - }, - ), + prepared_query( + &source, + &mut self.statement_cache, + &mut self.raw_connection, + &mut self.instrumentation, + ) + .and_then(|stmt| { + let mut metadata = Vec::new(); + Mysql::row_metadata(&mut (), &mut metadata); + StatementIterator::from_stmt(stmt, &metadata) + }), &mut self.transaction_state, + &mut self.instrumentation, + &crate::debug_query(&source), ) } } @@ -247,9 +288,19 @@ fn prepared_query<'a, T: QueryFragment + QueryId>( source: &'_ T, statement_cache: &'a mut StatementCache, raw_connection: &'a mut RawConnection, + instrumentation: &mut dyn Instrumentation, ) -> QueryResult> { - let mut stmt = statement_cache - .cached_statement(source, &Mysql, &[], |sql, _| raw_connection.prepare(sql))?; + instrumentation.on_connection_event(InstrumentationEvent::StartQuery { + query: &crate::debug_query(source), + }); + let mut stmt = statement_cache.cached_statement( + source, + &Mysql, + &[], + |sql, _| raw_connection.prepare(sql), + instrumentation, + )?; + let mut bind_collector = RawBytesBindCollector::new(); source.collect_binds(&mut bind_collector, &mut (), &Mysql)?; let binds = bind_collector @@ -268,6 +319,23 @@ impl MysqlConnection { crate::sql_query("SET character_set_results = 'utf8mb4'").execute(self)?; Ok(()) } + + fn establish_inner(database_url: &str) -> Result { + use crate::ConnectionError::CouldntSetupConfiguration; + + let raw_connection = RawConnection::new(); + let connection_options = ConnectionOptions::parse(database_url)?; + raw_connection.connect(&connection_options)?; + let mut conn = MysqlConnection { + raw_connection, + transaction_state: AnsiTransactionManager::default(), + statement_cache: StatementCache::new(), + instrumentation: None, + }; + conn.set_config_options() + .map_err(CouldntSetupConfiguration)?; + Ok(conn) + } } #[cfg(test)] diff --git a/diesel/src/pg/connection/cursor.rs b/diesel/src/pg/connection/cursor.rs index 4e77dffdec84..678c8812b26c 100644 --- a/diesel/src/pg/connection/cursor.rs +++ b/diesel/src/pg/connection/cursor.rs @@ -1,3 +1,5 @@ +use crate::connection::instrumentation::StrQueryHelper; + use super::raw::RawConnection; use super::result::PgResult; use super::row::PgRow; @@ -87,6 +89,9 @@ impl Iterator for RowByRowCursor<'_> { let get_next_result = super::update_transaction_manager_status( self.conn.raw_connection.get_next_result(), self.conn, + // todo + &StrQueryHelper::new(""), + false, ); match get_next_result { Ok(Some(res)) => { @@ -120,6 +125,9 @@ impl Drop for RowByRowCursor<'_> { let res = super::update_transaction_manager_status( self.conn.raw_connection.get_next_result(), self.conn, + // todo + &StrQueryHelper::new(""), + false, ); if matches!(res, Err(_) | Ok(None)) { break; diff --git a/diesel/src/pg/connection/mod.rs b/diesel/src/pg/connection/mod.rs index bcf9b18f1cfe..40773d08b8c9 100644 --- a/diesel/src/pg/connection/mod.rs +++ b/diesel/src/pg/connection/mod.rs @@ -9,6 +9,9 @@ use self::private::ConnectionAndTransactionManager; use self::raw::{PgTransactionStatus, RawConnection}; use self::result::PgResult; use self::stmt::Statement; +use crate::connection::instrumentation::DebugQuery; +use crate::connection::instrumentation::StrQueryHelper; +use crate::connection::instrumentation::{Instrumentation, InstrumentationEvent}; use crate::connection::statement_cache::{MaybeCached, StatementCache}; use crate::connection::*; use crate::expression::QueryMetadata; @@ -20,6 +23,7 @@ use crate::result::ConnectionError::CouldntSetupConfiguration; use crate::result::*; use crate::RunQueryDsl; use std::ffi::CString; +use std::fmt::Debug; use std::os::raw as libc; /// The connection string expected by `PgConnection::establish` @@ -126,11 +130,16 @@ unsafe impl Send for PgConnection {} impl SimpleConnection for PgConnection { #[allow(unsafe_code)] // use of unsafe function fn batch_execute(&mut self, query: &str) -> QueryResult<()> { - let query = CString::new(query)?; + self.connection_and_transaction_manager + .instrumentation + .on_connection_event(InstrumentationEvent::StartQuery { + query: &StrQueryHelper::new(query), + }); + let c_query = CString::new(query)?; let inner_result = unsafe { self.connection_and_transaction_manager .raw_connection - .exec(query.as_ptr()) + .exec(c_query.as_ptr()) }; update_transaction_manager_status( inner_result.and_then(|raw_result| { @@ -140,6 +149,8 @@ impl SimpleConnection for PgConnection { ) }), &mut self.connection_and_transaction_manager, + &StrQueryHelper::new(query), + true, )?; Ok(()) } @@ -158,11 +169,16 @@ impl Connection for PgConnection { type TransactionManager = AnsiTransactionManager; fn establish(database_url: &str) -> ConnectionResult { - RawConnection::establish(database_url).and_then(|raw_conn| { + let mut instrumentation = crate::connection::instrumentation::get_default_instrumentation(); + instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection { + url: database_url, + }); + let r = RawConnection::establish(database_url).and_then(|raw_conn| { let mut conn = PgConnection { connection_and_transaction_manager: ConnectionAndTransactionManager { raw_connection: raw_conn, transaction_state: AnsiTransactionManager::default(), + instrumentation: None, }, statement_cache: StatementCache::new(), metadata_cache: PgMetadataCache::new(), @@ -170,14 +186,22 @@ impl Connection for PgConnection { conn.set_config_options() .map_err(CouldntSetupConfiguration)?; Ok(conn) - }) + }); + instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection { + url: database_url, + error: r.as_ref().err(), + }); + let mut conn = r?; + conn.connection_and_transaction_manager.instrumentation = instrumentation; + Ok(conn) } + fn execute_returning_count(&mut self, source: &T) -> QueryResult where T: QueryFragment + QueryId, { update_transaction_manager_status( - self.with_prepared_query(source, |query, params, conn| { + self.with_prepared_query(source, true, |query, params, conn| { let res = query .execute(&mut conn.raw_connection, ¶ms, false) .map(|r| r.rows_affected()); @@ -187,6 +211,8 @@ impl Connection for PgConnection { res }), &mut self.connection_and_transaction_manager, + &crate::debug_query(source), + true, ) } @@ -196,6 +222,14 @@ impl Connection for PgConnection { { &mut self.connection_and_transaction_manager.transaction_state } + + fn instrumentation(&mut self) -> &mut dyn Instrumentation { + &mut self.connection_and_transaction_manager.instrumentation + } + + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { + self.connection_and_transaction_manager.instrumentation = Some(Box::new(instrumentation)); + } } impl LoadConnection for PgConnection @@ -213,11 +247,16 @@ where T: Query + QueryFragment + QueryId + 'query, Self::Backend: QueryMetadata, { - self.with_prepared_query(&source, |stmt, params, conn| { + self.with_prepared_query(&source, false, |stmt, params, conn| { use self::private::PgLoadingMode; let result = stmt.execute(&mut conn.raw_connection, ¶ms, Self::USE_ROW_BY_ROW_MODE); - let result = update_transaction_manager_status(result, conn)?; - Self::get_cursor(conn, result) + let result = update_transaction_manager_status( + result, + conn, + &crate::debug_query(&source), + false, + )?; + Self::get_cursor(conn, result, &source) }) } } @@ -232,6 +271,8 @@ impl GetPgMetadataCache for PgConnection { fn update_transaction_manager_status( query_result: QueryResult, conn: &mut ConnectionAndTransactionManager, + source: &dyn DebugQuery, + final_call: bool, ) -> QueryResult { /// avoid monomorphizing for every result type - this part will not be inlined fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) { @@ -281,6 +322,19 @@ fn update_transaction_manager_status( } } non_generic_inner(conn, query_result.is_err()); + if let Err(ref e) = query_result { + conn.instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: source, + error: Some(e), + }); + } else if final_call { + conn.instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: source, + error: None, + }); + } query_result } @@ -342,12 +396,18 @@ impl PgConnection { fn with_prepared_query<'conn, T: QueryFragment + QueryId, R>( &'conn mut self, source: &'_ T, + execute_returning_count: bool, f: impl FnOnce( MaybeCached<'_, Statement>, Vec>>, &'conn mut ConnectionAndTransactionManager, ) -> QueryResult, ) -> QueryResult { + self.connection_and_transaction_manager + .instrumentation + .on_connection_event(InstrumentationEvent::StartQuery { + query: &crate::debug_query(source), + }); let mut bind_collector = RawBytesBindCollector::::new(); source.collect_binds(&mut bind_collector, self, &Pg)?; let binds = bind_collector.binds; @@ -356,14 +416,30 @@ impl PgConnection { let cache_len = self.statement_cache.len(); let cache = &mut self.statement_cache; let conn = &mut self.connection_and_transaction_manager.raw_connection; - let query = cache.cached_statement(source, &Pg, &metadata, |sql, _| { - let query_name = if source.is_safe_to_cache_prepared(&Pg)? { - Some(format!("__diesel_stmt_{cache_len}")) - } else { - None - }; - Statement::prepare(conn, sql, query_name.as_deref(), &metadata) - }); + let query = cache.cached_statement( + source, + &Pg, + &metadata, + |sql, _| { + let query_name = if source.is_safe_to_cache_prepared(&Pg)? { + Some(format!("__diesel_stmt_{cache_len}")) + } else { + None + }; + Statement::prepare(conn, sql, query_name.as_deref(), &metadata) + }, + &mut self.connection_and_transaction_manager.instrumentation, + ); + if !execute_returning_count { + if let Err(ref e) = query { + self.connection_and_transaction_manager + .instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: &crate::debug_query(&source), + error: Some(e), + }); + } + } f(query?, binds, &mut self.connection_and_transaction_manager) } @@ -387,6 +463,7 @@ mod private { pub struct ConnectionAndTransactionManager { pub(super) raw_connection: RawConnection, pub(super) transaction_state: AnsiTransactionManager, + pub(super) instrumentation: Option>, } pub trait PgLoadingMode { @@ -394,10 +471,11 @@ mod private { type Cursor<'conn, 'query>: Iterator>>; type Row<'conn, 'query>: crate::row::Row<'conn, Pg>; - fn get_cursor<'query>( - raw_connection: &mut ConnectionAndTransactionManager, + fn get_cursor<'conn, 'query>( + raw_connection: &'conn mut ConnectionAndTransactionManager, result: PgResult, - ) -> QueryResult>; + source: &dyn QueryFragment, + ) -> QueryResult>; } impl PgLoadingMode for PgConnection { @@ -405,11 +483,17 @@ mod private { type Cursor<'conn, 'query> = Cursor; type Row<'conn, 'query> = self::row::PgRow; - fn get_cursor<'query>( - conn: &mut ConnectionAndTransactionManager, + fn get_cursor<'conn, 'query>( + conn: &'conn mut ConnectionAndTransactionManager, result: PgResult, - ) -> QueryResult> { - update_transaction_manager_status(Cursor::new(result, &mut conn.raw_connection), conn) + source: &dyn QueryFragment, + ) -> QueryResult> { + update_transaction_manager_status( + Cursor::new(result, &mut conn.raw_connection), + conn, + &crate::debug_query(&source), + true, + ) } } @@ -418,10 +502,11 @@ mod private { type Cursor<'conn, 'query> = RowByRowCursor<'conn>; type Row<'conn, 'query> = self::row::PgRow; - fn get_cursor<'query>( - raw_connection: &mut ConnectionAndTransactionManager, + fn get_cursor<'conn, 'query>( + raw_connection: &'conn mut ConnectionAndTransactionManager, result: PgResult, - ) -> QueryResult> { + _source: &dyn QueryFragment, + ) -> QueryResult> { Ok(RowByRowCursor::new(result, raw_connection)) } } diff --git a/diesel/src/r2d2.rs b/diesel/src/r2d2.rs index 523baf80bbe1..79b8d0431e58 100644 --- a/diesel/src/r2d2.rs +++ b/diesel/src/r2d2.rs @@ -248,6 +248,14 @@ where ) -> &mut >::TransactionStateData { (**self).transaction_state() } + + fn instrumentation(&mut self) -> &mut dyn crate::connection::Instrumentation { + (**self).instrumentation() + } + + fn set_instrumentation(&mut self, instrumentation: impl crate::connection::Instrumentation) { + (**self).set_instrumentation(instrumentation) + } } impl LoadConnection for PooledConnection diff --git a/diesel/src/sqlite/connection/mod.rs b/diesel/src/sqlite/connection/mod.rs index e21234086b86..2549897d35ec 100644 --- a/diesel/src/sqlite/connection/mod.rs +++ b/diesel/src/sqlite/connection/mod.rs @@ -20,6 +20,8 @@ use self::raw::RawConnection; use self::statement_iterator::*; use self::stmt::{Statement, StatementUse}; use super::SqliteAggregateFunction; +use crate::connection::instrumentation::InstrumentationEvent; +use crate::connection::instrumentation::StrQueryHelper; use crate::connection::statement_cache::StatementCache; use crate::connection::*; use crate::deserialize::{FromSqlRow, StaticallySizedRow}; @@ -122,6 +124,7 @@ pub struct SqliteConnection { statement_cache: StatementCache, raw_connection: RawConnection, transaction_state: AnsiTransactionManager, + instrumentation: Option>, } // This relies on the invariant that RawConnection or Statement are never @@ -132,7 +135,17 @@ unsafe impl Send for SqliteConnection {} impl SimpleConnection for SqliteConnection { fn batch_execute(&mut self, query: &str) -> QueryResult<()> { - self.raw_connection.exec(query) + self.instrumentation + .on_connection_event(InstrumentationEvent::StartQuery { + query: &StrQueryHelper::new(query), + }); + let resp = self.raw_connection.exec(query); + self.instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: &StrQueryHelper::new(query), + error: resp.as_ref().err(), + }); + resp } } @@ -149,16 +162,18 @@ impl Connection for SqliteConnection { /// If the database does not exist, this method will try to /// create a new database and then establish a connection to it. fn establish(database_url: &str) -> ConnectionResult { - use crate::result::ConnectionError::CouldntSetupConfiguration; - - let raw_connection = RawConnection::establish(database_url)?; - let conn = Self { - statement_cache: StatementCache::new(), - raw_connection, - transaction_state: AnsiTransactionManager::default(), - }; - conn.register_diesel_sql_functions() - .map_err(CouldntSetupConfiguration)?; + let mut instrumentation = crate::connection::instrumentation::get_default_instrumentation(); + instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection { + url: database_url, + }); + + let establish_result = Self::establish_inner(database_url); + instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection { + url: database_url, + error: establish_result.as_ref().err(), + }); + let mut conn = establish_result?; + conn.instrumentation = instrumentation; Ok(conn) } @@ -167,9 +182,9 @@ impl Connection for SqliteConnection { T: QueryFragment + QueryId, { let statement_use = self.prepared_query(source)?; - statement_use.run()?; - - Ok(self.raw_connection.rows_affected_by_last_query()) + statement_use + .run() + .map(|_| self.raw_connection.rows_affected_by_last_query()) } fn transaction_state(&mut self) -> &mut AnsiTransactionManager @@ -178,6 +193,14 @@ impl Connection for SqliteConnection { { &mut self.transaction_state } + + fn instrumentation(&mut self) -> &mut dyn Instrumentation { + &mut self.instrumentation + } + + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { + self.instrumentation = Some(Box::new(instrumentation)); + } } impl LoadConnection for SqliteConnection { @@ -192,9 +215,9 @@ impl LoadConnection for SqliteConnection { T: Query + QueryFragment + QueryId + 'query, Self::Backend: QueryMetadata, { - let statement_use = self.prepared_query(source)?; + let statement = self.prepared_query(source)?; - Ok(StatementIterator::new(statement_use)) + Ok(StatementIterator::new(statement)) } } @@ -302,17 +325,39 @@ impl SqliteConnection { } } - fn prepared_query<'a, 'b, T>(&'a mut self, source: T) -> QueryResult> + fn prepared_query<'conn, 'query, T>( + &'conn mut self, + source: T, + ) -> QueryResult> where - T: QueryFragment + QueryId + 'b, + T: QueryFragment + QueryId + 'query, { + self.instrumentation + .on_connection_event(InstrumentationEvent::StartQuery { + query: &crate::debug_query(&source), + }); let raw_connection = &self.raw_connection; let cache = &mut self.statement_cache; - let statement = cache.cached_statement(&source, &Sqlite, &[], |sql, is_cached| { - Statement::prepare(raw_connection, sql, is_cached) - })?; + let statement = match cache.cached_statement( + &source, + &Sqlite, + &[], + |sql, is_cached| Statement::prepare(raw_connection, sql, is_cached), + &mut self.instrumentation, + ) { + Ok(statement) => statement, + Err(e) => { + self.instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: &crate::debug_query(&source), + error: Some(&e), + }); + + return Err(e); + } + }; - StatementUse::bind(statement, source) + StatementUse::bind(statement, source, &mut self.instrumentation) } #[doc(hidden)] @@ -477,6 +522,20 @@ impl SqliteConnection { }, ) } + + fn establish_inner(database_url: &str) -> Result { + use crate::result::ConnectionError::CouldntSetupConfiguration; + let raw_connection = RawConnection::establish(database_url)?; + let conn = Self { + statement_cache: StatementCache::new(), + raw_connection, + transaction_state: AnsiTransactionManager::default(), + instrumentation: None, + }; + conn.register_diesel_sql_functions() + .map_err(CouldntSetupConfiguration)?; + Ok(conn) + } } fn error_message(err_code: libc::c_int) -> &'static str { diff --git a/diesel/src/sqlite/connection/stmt.rs b/diesel/src/sqlite/connection/stmt.rs index a5ba556690c1..07aee05bce60 100644 --- a/diesel/src/sqlite/connection/stmt.rs +++ b/diesel/src/sqlite/connection/stmt.rs @@ -3,6 +3,7 @@ use super::bind_collector::{InternalSqliteBindValue, SqliteBindCollector}; use super::raw::RawConnection; use super::sqlite_value::OwnedSqliteValue; use crate::connection::statement_cache::{MaybeCached, PrepareForCache}; +use crate::connection::Instrumentation; use crate::query_builder::{QueryFragment, QueryId}; use crate::result::Error::DatabaseError; use crate::result::*; @@ -235,12 +236,15 @@ struct BoundStatement<'stmt, 'query> { // contained in the query itself. We use NonNull to // communicate that this is a shared buffer binds_to_free: Vec<(i32, Option>)>, + instrumentation: &'stmt mut dyn Instrumentation, + has_error: bool, } impl<'stmt, 'query> BoundStatement<'stmt, 'query> { fn bind( statement: MaybeCached<'stmt, Statement>, query: T, + instrumentation: &'stmt mut dyn Instrumentation, ) -> QueryResult> where T: QueryFragment + QueryId + 'query, @@ -259,6 +263,8 @@ impl<'stmt, 'query> BoundStatement<'stmt, 'query> { statement, query: None, binds_to_free: Vec::new(), + instrumentation, + has_error: false, }; ret.bind_buffers(binds)?; @@ -322,6 +328,20 @@ impl<'stmt, 'query> BoundStatement<'stmt, 'query> { } Ok(()) } + + fn finish_query_with_error(mut self, e: &Error) { + self.has_error = true; + if let Some(q) = self.query { + // it's safe to get a reference from this ptr as it's guaranteed to not be null + let q = unsafe { q.as_ref() }; + self.instrumentation.on_connection_event( + crate::connection::InstrumentationEvent::FinishQuery { + query: &crate::debug_query(&q), + error: Some(e), + }, + ); + } + } } impl<'stmt, 'query> Drop for BoundStatement<'stmt, 'query> { @@ -353,11 +373,20 @@ impl<'stmt, 'query> Drop for BoundStatement<'stmt, 'query> { } if let Some(query) = self.query { - unsafe { + let query = unsafe { // Constructing the `Box` here is safe as we // got the pointer from a box + it is guaranteed to be not null. - std::mem::drop(Box::from_raw(query.as_ptr())); + Box::from_raw(query.as_ptr()) + }; + if !self.has_error { + self.instrumentation.on_connection_event( + crate::connection::InstrumentationEvent::FinishQuery { + query: &crate::debug_query(&query), + error: None, + }, + ); } + std::mem::drop(query); self.query = None; } } @@ -373,23 +402,28 @@ impl<'stmt, 'query> StatementUse<'stmt, 'query> { pub(super) fn bind( statement: MaybeCached<'stmt, Statement>, query: T, + instrumentation: &'stmt mut dyn Instrumentation, ) -> QueryResult> where T: QueryFragment + QueryId + 'query, { Ok(Self { - statement: BoundStatement::bind(statement, query)?, + statement: BoundStatement::bind(statement, query, instrumentation)?, column_names: OnceCell::new(), }) } pub(super) fn run(mut self) -> QueryResult<()> { - unsafe { + let r = unsafe { // This is safe as we pass `first_step = true` // and we consume the statement so nobody could // access the columns later on anyway. self.step(true).map(|_| ()) + }; + if let Err(ref e) = r { + self.statement.finish_query_with_error(e); } + r } // This function is marked as unsafe incorrectly passing `false` to `first_step` diff --git a/diesel_derives/src/multiconnection.rs b/diesel_derives/src/multiconnection.rs index d32c142c53c2..b4ba74214bda 100644 --- a/diesel_derives/src/multiconnection.rs +++ b/diesel_derives/src/multiconnection.rs @@ -109,6 +109,24 @@ fn generate_connection_impl( } }); + let instrumentation_impl = connection_types.iter().map(|c| { + let variant_ident = c.name; + quote::quote! { + #ident::#variant_ident(conn) => { + diesel::connection::Connection::set_instrumentation(conn, instrumentation); + } + } + }); + + let get_instrumentation_impl = connection_types.iter().map(|c| { + let variant_ident = c.name; + quote::quote! { + #ident::#variant_ident(conn) => { + diesel::connection::Connection::instrumentation(conn) + } + } + }); + let establish_impls = connection_types.iter().map(|c| { let ident = c.name; let ty = c.ty; @@ -326,6 +344,18 @@ fn generate_connection_impl( ) -> &mut >::TransactionStateData { self } + + fn instrumentation(&mut self) -> &mut dyn diesel::connection::Instrumentation { + match self { + #(#get_instrumentation_impl,)* + } + } + + fn set_instrumentation(&mut self, instrumentation: impl diesel::connection::Instrumentation) { + match self { + #(#instrumentation_impl,)* + } + } } impl LoadConnection for MultiConnection diff --git a/diesel_derives/tests/multiconnection.rs b/diesel_derives/tests/multiconnection.rs index 51c93836d116..96474a6d9606 100644 --- a/diesel_derives/tests/multiconnection.rs +++ b/diesel_derives/tests/multiconnection.rs @@ -1,4 +1,5 @@ use crate::schema::users; +use diesel::connection::Instrumentation; use diesel::prelude::*; #[derive(diesel::MultiConnection)] @@ -21,6 +22,10 @@ pub struct User { fn check_queries_work() { let mut conn = establish_connection(); + // checks that this trait is implemented + conn.set_instrumentation(None::>); + let _ = conn.instrumentation(); + diesel::sql_query( "CREATE TEMPORARY TABLE users(\ id INTEGER NOT NULL PRIMARY KEY, \ diff --git a/diesel_derives/tests/selectable.rs b/diesel_derives/tests/selectable.rs index 4b151f7fc2af..240fdccaa482 100644 --- a/diesel_derives/tests/selectable.rs +++ b/diesel_derives/tests/selectable.rs @@ -1,6 +1,6 @@ use std::marker::PhantomData; -use diesel::deserialize::FromSql; +use diesel::deserialize::{FromSql, FromSqlRow}; use diesel::sql_types::Text; use diesel::*; diff --git a/diesel_tests/tests/instrumentation.rs b/diesel_tests/tests/instrumentation.rs new file mode 100644 index 000000000000..73077dd9fcda --- /dev/null +++ b/diesel_tests/tests/instrumentation.rs @@ -0,0 +1,231 @@ +use crate::schema::users; +use crate::schema::TestConnection; +use diesel::connection::DefaultLoadingMode; +use diesel::connection::InstrumentationEvent; +use diesel::connection::LoadConnection; +use diesel::connection::SimpleConnection; +use diesel::query_builder::AsQuery; +use diesel::Connection; +use diesel::QueryResult; +use std::num::NonZeroU32; +use std::sync::Arc; +use std::sync::Mutex; + +use crate::schema::connection_with_sean_and_tess_in_users_table; + +#[derive(Debug, PartialEq)] +enum Event { + StartQuery { query: String }, + CacheQuery { sql: String }, + FinishQuery { query: String, error: Option<()> }, + BeginTransaction { depth: NonZeroU32 }, + CommitTransaction { depth: NonZeroU32 }, + RollbackTransaction { depth: NonZeroU32 }, +} + +impl From> for Event { + fn from(value: InstrumentationEvent<'_>) -> Self { + match value { + InstrumentationEvent::StartEstablishConnection { .. } => unreachable!(), + InstrumentationEvent::FinishEstablishConnection { .. } => unreachable!(), + InstrumentationEvent::StartQuery { query, .. } => Event::StartQuery { + query: query.to_string(), + }, + InstrumentationEvent::CacheQuery { sql, .. } => Event::CacheQuery { + sql: sql.to_owned(), + }, + InstrumentationEvent::FinishQuery { query, error, .. } => Event::FinishQuery { + query: query.to_string(), + error: error.map(|_| ()), + }, + InstrumentationEvent::BeginTransaction { depth, .. } => { + Event::BeginTransaction { depth } + } + InstrumentationEvent::CommitTransaction { depth, .. } => { + Event::CommitTransaction { depth } + } + InstrumentationEvent::RollbackTransaction { depth, .. } => { + Event::RollbackTransaction { depth } + } + _ => unreachable!(), + } + } +} + +fn setup_test_case() -> (Arc>>, TestConnection) { + let events = Arc::new(Mutex::new(Vec::::new())); + let events_to_check = events.clone(); + let mut conn = connection_with_sean_and_tess_in_users_table(); + conn.set_instrumentation(move |event: InstrumentationEvent<'_>| { + events.lock().unwrap().push(event.into()); + }); + assert_eq!(events_to_check.lock().unwrap().len(), 0); + (events_to_check, conn) +} + +#[test] +fn check_events_are_emitted_for_batch_execute() { + let (events_to_check, mut conn) = setup_test_case(); + conn.batch_execute("select 1").unwrap(); + + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2); + assert_eq!( + events[0], + Event::StartQuery { + query: String::from("select 1") + } + ); + assert_eq!( + events[1], + Event::FinishQuery { + query: String::from("select 1"), + error: None, + } + ); +} + +#[test] +fn check_events_are_emitted_for_execute_returning_count() { + let (events_to_check, mut conn) = setup_test_case(); + conn.execute_returning_count(&users::table.as_query()) + .unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 3, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::CacheQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_are_emitted_for_load() { + let (events_to_check, mut conn) = setup_test_case(); + LoadConnection::::load(&mut conn, users::table.as_query()).unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 3, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::CacheQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_are_emitted_for_execute_returning_count_does_not_contain_cache_for_uncached_queries( +) { + let (events_to_check, mut conn) = setup_test_case(); + conn.execute_returning_count(&diesel::sql_query("select 1")) + .unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_are_emitted_for_load_does_not_contain_cache_for_uncached_queries() { + let (events_to_check, mut conn) = setup_test_case(); + LoadConnection::::load(&mut conn, diesel::sql_query("select 1")).unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_are_emitted_for_execute_returning_count_does_contain_error_for_failures() { + let (events_to_check, mut conn) = setup_test_case(); + let _ = conn.execute_returning_count(&diesel::sql_query("invalid")); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::FinishQuery { error: Some(_), .. }); +} + +#[test] +fn check_events_are_emitted_for_load_does_contain_error_for_failures() { + let (events_to_check, mut conn) = setup_test_case(); + let _ = LoadConnection::::load(&mut conn, diesel::sql_query("invalid")); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::FinishQuery { error: Some(_), .. }); +} + +#[test] +fn check_events_are_emitted_for_execute_returning_count_repeat_does_not_repeat_cache() { + let (events_to_check, mut conn) = setup_test_case(); + conn.execute_returning_count(&users::table.as_query()) + .unwrap(); + conn.execute_returning_count(&users::table.as_query()) + .unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 5, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::CacheQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); + assert_matches!(events[3], Event::StartQuery { .. }); + assert_matches!(events[4], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_are_emitted_for_load_repeat_does_not_repeat_cache() { + let (events_to_check, mut conn) = setup_test_case(); + LoadConnection::::load(&mut conn, users::table.as_query()).unwrap(); + LoadConnection::::load(&mut conn, users::table.as_query()).unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 5, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::CacheQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); + assert_matches!(events[3], Event::StartQuery { .. }); + assert_matches!(events[4], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_transaction() { + let (events_to_check, mut conn) = setup_test_case(); + conn.transaction(|_conn| QueryResult::Ok(())).unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 6, "{:?}", events); + assert_matches!(events[0], Event::BeginTransaction { .. }); + assert_matches!(events[1], Event::StartQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); + assert_matches!(events[3], Event::CommitTransaction { .. }); + assert_matches!(events[4], Event::StartQuery { .. }); + assert_matches!(events[5], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_transaction_error() { + let (events_to_check, mut conn) = setup_test_case(); + let _ = conn + .transaction(|_conn| QueryResult::<()>::Err(diesel::result::Error::RollbackTransaction)); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 6, "{:?}", events); + assert_matches!(events[0], Event::BeginTransaction { .. }); + assert_matches!(events[1], Event::StartQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); + assert_matches!(events[3], Event::RollbackTransaction { .. }); + assert_matches!(events[4], Event::StartQuery { .. }); + assert_matches!(events[5], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_transaction_nested() { + let (events_to_check, mut conn) = setup_test_case(); + conn.transaction(|conn| conn.transaction(|_conn| QueryResult::Ok(()))) + .unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 12, "{:?}", events); + assert_matches!(events[0], Event::BeginTransaction { .. }); + assert_matches!(events[1], Event::StartQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); + assert_matches!(events[3], Event::BeginTransaction { .. }); + assert_matches!(events[4], Event::StartQuery { .. }); + assert_matches!(events[5], Event::FinishQuery { .. }); + assert_matches!(events[6], Event::CommitTransaction { .. }); + assert_matches!(events[7], Event::StartQuery { .. }); + assert_matches!(events[8], Event::FinishQuery { .. }); + assert_matches!(events[9], Event::CommitTransaction { .. }); + assert_matches!(events[10], Event::StartQuery { .. }); + assert_matches!(events[11], Event::FinishQuery { .. }); +} diff --git a/diesel_tests/tests/lib.rs b/diesel_tests/tests/lib.rs index e4963a855de3..81c78f73186c 100644 --- a/diesel_tests/tests/lib.rs +++ b/diesel_tests/tests/lib.rs @@ -28,6 +28,7 @@ mod group_by; mod having; mod insert; mod insert_from_select; +mod instrumentation; mod internal_details; mod joins; mod limit_offset;