Skip to content

Commit

Permalink
Minor improvements
Browse files Browse the repository at this point in the history
* Move the example as documentation example to the function to make it
  more discoverable
* Use `std::iter::from_fn` instead of a custom iterator type
* Move the `PgNotification` type to the backend module, as it is
  independend from the connection implementation (It might be reused
from diesel-async or similar crates)
* Improve error handling in the ffi wrapping function
   + Throw errors on null ptrs and non-UTF-8 strings
   + Make sure to always call `pqfree` on the notification, even if we
     return an error
  • Loading branch information
weiznich committed Jan 10, 2025
1 parent 3db6c6c commit edc81cc
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 92 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ members = [
"examples/postgres/custom_types",
"examples/postgres/composite_types",
"examples/postgres/relations",
"examples/postgres/notifications",
"examples/sqlite/all_about_inserts",
"examples/sqlite/getting_started_step_1",
"examples/sqlite/getting_started_step_2",
Expand Down
17 changes: 17 additions & 0 deletions diesel/src/pg/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,20 @@ impl sql_dialect::on_conflict_clause::PgLikeOnConflictClause for PgOnConflictCla
pub struct PgStyleArrayComparison;

impl LikeIsAllowedForType<crate::sql_types::Binary> for Pg {}

// Using the same field names as tokio-postgres
/// See Postgres documentation for SQL Commands NOTIFY and LISTEN
#[derive(Clone, Debug)]
pub struct PgNotification {
/// process ID of notifying server process
pub process_id: i32,
/// Name of the notification channel
pub channel: String,
/// optional data that was submitted with the notification,
///
/// This is set to an empty string if no data was submitted
///
/// (Postgres unfortunally does not provide a way to differentiate between
/// not set and empty here)
pub payload: String,
}
57 changes: 38 additions & 19 deletions diesel/src/pg/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::connection::instrumentation::{
use crate::connection::statement_cache::{MaybeCached, StatementCache};
use crate::connection::*;
use crate::expression::QueryMetadata;
use crate::pg::backend::PgNotification;
use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
use crate::pg::query_builder::copy::InternalCopyFromQuery;
use crate::pg::{Pg, TransactionBuilder};
Expand Down Expand Up @@ -551,28 +552,46 @@ impl PgConnection {
/// See Postgres documentation for SQL commands [NOTIFY][] and [LISTEN][]
///
/// The returned iterator can yield items even after a None value when new notifications have been received.
/// The iterator can be polled again after a `None` value was received as new notifications might have
/// been send in the mean time.
///
/// [NOTIFY]: https://www.postgresql.org/docs/current/sql-notify.html
/// [LISTEN]: https://www.postgresql.org/docs/current/sql-listen.html
pub fn notifications_iter(&self) -> NotificationsIterator<'_> {
NotificationsIterator {
conn: &self.connection_and_transaction_manager.raw_connection,
}
}
}

#[allow(missing_debug_implementations)]
pub struct NotificationsIterator<'a> {
conn: &'a RawConnection,
}

pub use raw::PgNotification;

impl Iterator for NotificationsIterator<'_> {
type Item = Result<PgNotification, Error>;

fn next(&mut self) -> Option<Self::Item> {
self.conn.pqnotifies().transpose()
///
/// ## Example
///
/// ```
/// # include!("../../doctest_setup.rs");
/// #
/// # fn main() {
/// # run_test().unwrap();
/// # }
/// #
/// # fn run_test() -> QueryResult<()> {
/// # let connection = &mut establish_connection();
///
/// // register the notifications channel we want to receive notifications for
/// diesel::sql_query("LISTEN example_channel").execute(connection)?;
/// // send some notification
/// // this is usually done from a different connection/thread/application
/// diesel::sql_query("NOTIFY example_channel, 'additional data'").execute(connection)?;
///
/// for result in connection.notifications_iter() {
/// let notification = result.unwrap();
/// assert_eq!(notification.channel, "example_channel");
/// assert_eq!(notification.payload, "additional data");
///
/// println!(
/// "Notification received from server process with id {}.",
/// notification.process_id
/// );
/// }
/// # Ok(())
/// # }
/// ```
pub fn notifications_iter(&mut self) -> impl Iterator<Item = QueryResult<PgNotification>> + '_ {
let conn = &self.connection_and_transaction_manager.raw_connection;
std::iter::from_fn(move || conn.pq_notifies().transpose())
}
}

Expand Down
76 changes: 53 additions & 23 deletions diesel/src/pg/connection/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{ptr, str};
use crate::result::*;

use super::result::PgResult;
use crate::pg::PgNotification;

#[allow(missing_debug_implementations, missing_copy_implementations)]
pub(super) struct RawConnection {
Expand Down Expand Up @@ -184,7 +185,7 @@ impl RawConnection {
}
}

pub(super) fn pqnotifies(&self) -> Result<Option<PgNotification>, Error> {
pub(super) fn pq_notifies(&self) -> Result<Option<PgNotification>, Error> {
let conn = self.internal_connection;
let ret = unsafe { PQconsumeInput(conn.as_ptr()) };
if ret == 0 {
Expand All @@ -198,35 +199,64 @@ impl RawConnection {
if pgnotify.is_null() {
Ok(None)
} else {
// we use a drop guard here to
// make sure that we always free
// the provided pointer, even if we
// somehow return an error below
struct Guard<'a> {
value: &'a mut pgNotify,
}

impl Drop for Guard<'_> {
fn drop(&mut self) {
unsafe {
// SAFETY: We know that this value is not null here
PQfreemem(self.value as *mut pgNotify as *mut std::ffi::c_void)
};
}
}

let pgnotify = unsafe {
// SAFETY: We checked for null values above
Guard {
value: &mut *pgnotify,
}
};
if pgnotify.value.relname.is_null() {
return Err(Error::DeserializationError(
"Received an unexpected null value for `relname` from the notification".into(),
));
}
if pgnotify.value.extra.is_null() {
return Err(Error::DeserializationError(
"Received an unexpected null value for `extra` from the notification".into(),
));
}

let channel = unsafe {
// SAFETY: We checked for null values above
CStr::from_ptr(pgnotify.value.relname)
}
.to_str()
.map_err(|e| Error::DeserializationError(e.into()))?
.to_string();
let payload = unsafe {
// SAFETY: We checked for null values above
CStr::from_ptr(pgnotify.value.extra)
}
.to_str()
.map_err(|e| Error::DeserializationError(e.into()))?
.to_string();
let ret = PgNotification {
process_id: unsafe { (*pgnotify).be_pid },
channel: unsafe { CStr::from_ptr((*pgnotify).relname) }
.to_str()
.expect("Channel name should be UTF-8")
.to_string(),
payload: unsafe { CStr::from_ptr((*pgnotify).extra) }
.to_str()
.expect("Could not parse payload to UTF-8")
.to_string(),
process_id: pgnotify.value.be_pid,
channel,
payload,
};
unsafe { PQfreemem(pgnotify as *mut std::ffi::c_void) };
Ok(Some(ret))
}
}
}

// Using the same field names as tokio-postgres
/// See Postgres documentation for SQL Commands NOTIFY and LISTEN
#[derive(Clone, Debug)]
pub struct PgNotification {
/// process ID of notifying server process
pub process_id: i32,
/// Name of the notification channel
pub channel: String,
/// optional data that was submitted with the notification, empty string if no data was submitted
pub payload: String,
}

/// Represents the current in-transaction status of the connection
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum PgTransactionStatus {
Expand Down
5 changes: 3 additions & 2 deletions diesel/src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ pub(crate) mod serialize;
mod transaction;
mod value;

pub use self::backend::{Pg, PgTypeMetadata};
#[doc(inline)]
pub use self::backend::{Pg, PgNotification, PgTypeMetadata};
#[cfg(feature = "postgres")]
pub use self::connection::{PgConnection, PgNotification, PgRowByRowLoadingMode};
pub use self::connection::{PgConnection, PgRowByRowLoadingMode};
#[doc(inline)]
pub use self::metadata_lookup::PgMetadataLookup;
#[doc(inline)]
Expand Down
15 changes: 0 additions & 15 deletions examples/postgres/notifications/Cargo.toml

This file was deleted.

32 changes: 0 additions & 32 deletions examples/postgres/notifications/src/main.rs

This file was deleted.

0 comments on commit edc81cc

Please sign in to comment.