From e9693f99e2e84c1eb682c2da145cd9976b5ab19c Mon Sep 17 00:00:00 2001 From: Kevin Reid Date: Mon, 2 Dec 2024 19:28:04 -0800 Subject: [PATCH] listen: Move `Notifier` into a module. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There’s enough related things, now — in particular, `NotifierForwarder` and `Buffer` — that it makes sense to gather them together. --- all-is-cubes/src/listen.rs | 232 ++-------------------- all-is-cubes/src/listen/listeners.rs | 12 ++ all-is-cubes/src/listen/notifier.rs | 280 +++++++++++++++++++++++++++ all-is-cubes/src/listen/util.rs | 35 +--- 4 files changed, 305 insertions(+), 254 deletions(-) create mode 100644 all-is-cubes/src/listen/notifier.rs diff --git a/all-is-cubes/src/listen.rs b/all-is-cubes/src/listen.rs index 2c2344822..671aae9ec 100644 --- a/all-is-cubes/src/listen.rs +++ b/all-is-cubes/src/listen.rs @@ -10,12 +10,10 @@ //! `Weak>` or similar multiply-owned mutable structure to aggregate incoming //! messages, which will then be read and cleared by a later task. -use alloc::sync::{Arc, Weak}; -use alloc::vec::Vec; +use alloc::sync::Arc; use core::fmt; -use core::sync::atomic::{AtomicBool, Ordering::Relaxed}; -use crate::util::maybe_sync::{RwLock, SendSyncIfStd}; +use crate::util::maybe_sync::SendSyncIfStd; mod cell; pub use cell::*; @@ -23,9 +21,14 @@ pub use cell::*; mod listeners; pub use listeners::*; +mod notifier; +pub use notifier::*; + mod util; pub use util::*; +// ------------------------------------------------------------------------------------------------- + /// Ability to subscribe to a source of messages, causing a [`Listener`] to receive them /// as long as it wishes to. pub trait Listen { @@ -57,204 +60,7 @@ impl Listen for Arc { } } -/// Message broadcaster, usually used for change notifications. -/// -/// A `Notifier` delivers messages of type `M` to a dynamic set of [`Listener`]s. -/// -/// The `Notifier` is usually owned by some entity which emits messages when it changes, -/// such as a [`ListenableCell`]. -/// Each `Listener` usually holds a weak reference to allow it to be removed when the -/// actual recipient is gone or uninterested. -/// -/// [`Listener`]s may be added using the [`Listen`] implementation, and are removed when -/// they report themselves as dead. -pub struct Notifier { - listeners: RwLock>>, -} - -struct NotifierEntry { - listener: DynListener, - /// True iff every call to `listener.receive()` has returned true. - was_alive: AtomicBool, -} - -impl Notifier { - /// Constructs a new empty [`Notifier`]. - pub fn new() -> Self { - Self { - listeners: Default::default(), - } - } - - /// Returns a [`Listener`] which forwards messages to the listeners registered with - /// this `Notifier`, provided that it is owned by an [`Arc`]. - /// - /// This may be used together with [`Listener::filter()`] to forward notifications - /// of changes in dependencies. Using this operation means that the dependent does not - /// need to fan out listener registrations to all of its current dependencies. - /// - /// ``` - /// use std::sync::Arc; - /// use all_is_cubes::listen::{Listen, Notifier, Sink}; - /// - /// let notifier_1 = Notifier::new(); - /// let notifier_2 = Arc::new(Notifier::new()); - /// let mut sink = Sink::new(); - /// notifier_1.listen(Notifier::forwarder(Arc::downgrade(¬ifier_2))); - /// notifier_2.listen(sink.listener()); - /// # assert_eq!(notifier_1.count(), 1); - /// # assert_eq!(notifier_2.count(), 1); - /// - /// notifier_1.notify("a"); - /// assert_eq!(sink.drain(), vec!["a"]); - /// drop(notifier_2); - /// notifier_1.notify("a"); - /// assert!(sink.drain().is_empty()); - /// - /// # assert_eq!(notifier_1.count(), 0); - /// ``` - pub fn forwarder(this: Weak) -> NotifierForwarder { - NotifierForwarder(this) - } - - /// Deliver a message to all [`Listener`]s. - pub fn notify(&self, message: M) { - self.notify_many(&[message]) - } - - /// Deliver multiple messages to all [`Listener`]s. - pub fn notify_many(&self, messages: &[M]) { - for NotifierEntry { - listener, - was_alive, - } in self.listeners.read().unwrap().iter() - { - // Don't load was_alive before sending, because we assume the common case is that - // a listener implements receive() cheaply when it is dead. - let alive = listener.receive(messages); - - was_alive.fetch_and(alive, Relaxed); - } - } - - /// Creates a [`Buffer`] which batches messages sent through it. - /// This may be used as a more convenient interface to [`Notifier::notify_many()`], - /// at the cost of delaying messages until the buffer is dropped. - /// - /// The buffer does not use any heap allocations and will collect up to `CAPACITY` messages - /// per batch. - pub fn buffer(&self) -> Buffer<'_, M, CAPACITY> { - Buffer::new(self) - } - - /// Computes the exact count of listeners, including asking all current listeners - /// if they are alive. - /// - /// This operation is intended for testing and diagnostic purposes. - pub fn count(&self) -> usize { - let mut listeners = self.listeners.write().unwrap(); - Self::cleanup(&mut listeners); - listeners.len() - } - - /// Discard all dead weak pointers in `listeners`. - fn cleanup(listeners: &mut Vec>) { - let mut i = 0; - while i < listeners.len() { - let entry = &listeners[i]; - // We must ask the listener, not just consult was_alive, in order to avoid - // leaking memory if listen() is called repeatedly without any notify(). - // TODO: But we can skip it if the last operation was notify(). - if entry.was_alive.load(Relaxed) && entry.listener.receive(&[]) { - i += 1; - } else { - listeners.swap_remove(i); - } - } - } -} - -impl Listen for Notifier { - type Msg = M; - - fn listen + 'static>(&self, listener: L) { - if !listener.receive(&[]) { - // skip adding it if it's already dead - return; - } - let mut listeners = self.listeners.write().unwrap(); - // TODO: consider amortization by not doing cleanup every time - Self::cleanup(&mut listeners); - listeners.push(NotifierEntry { - listener: listener.erased(), - was_alive: AtomicBool::new(true), - }); - } -} - -impl Default for Notifier { - fn default() -> Self { - Self::new() - } -} - -impl fmt::Debug for Notifier { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - // not using fmt.debug_tuple() so this is never printed on multiple lines - if let Ok(listeners) = self.listeners.try_read() { - write!(fmt, "Notifier({})", listeners.len()) - } else { - write!(fmt, "Notifier(?)") - } - } -} - -/// A batch of messages of type `M` to be sent through a [`Notifier`]. -/// -/// Messages may be added to the buffer, and when the buffer is full or when it is dropped, -/// they are sent through the notifier. Creating such a batch is intended to increase performance -/// by not executing dynamic dispatch to every notifier for every message. -#[derive(Debug)] -pub struct Buffer<'notifier, M, const CAPACITY: usize> { - buffer: arrayvec::ArrayVec, - notifier: &'notifier Notifier, -} - -impl<'notifier, M, const CAPACITY: usize> Buffer<'notifier, M, CAPACITY> { - fn new(notifier: &'notifier Notifier) -> Self { - Self { - buffer: arrayvec::ArrayVec::new(), - notifier, - } - } - - /// Store a message in this buffer, to be delivered later as if by [`Notifier::notify()`]. - pub fn push(&mut self, message: M) { - // We don't need to check for fullness before pushing, because we always flush immediately - // if full. - self.buffer.push(message); - if self.buffer.is_full() { - self.flush(); - } - } - - #[cold] - fn flush(&mut self) { - self.notifier.notify_many(&self.buffer); - self.buffer.clear(); - } -} - -impl Drop for Buffer<'_, M, CAPACITY> { - fn drop(&mut self) { - // TODO: Should we discard messages if panicking? - // Currently leaning no, because we've specified that listeners should not panic even under - // error conditions such as poisoned mutexes. - if !self.buffer.is_empty() { - self.flush(); - } - } -} +// ------------------------------------------------------------------------------------------------- /// A receiver of messages (typically from something implementing [`Listen`]) which can /// indicate when it is no longer interested in them (typically because the associated @@ -382,28 +188,12 @@ impl Listener for DynListener { } } +// ------------------------------------------------------------------------------------------------- + #[cfg(test)] mod tests { use super::*; - #[test] - fn notifier_basics_and_debug() { - let cn: Notifier = Notifier::new(); - assert_eq!(format!("{cn:?}"), "Notifier(0)"); - cn.notify(0); - assert_eq!(format!("{cn:?}"), "Notifier(0)"); - let sink = Sink::new(); - cn.listen(sink.listener()); - assert_eq!(format!("{cn:?}"), "Notifier(1)"); - // type annotation to prevent spurious inference failures in the presence - // of other compiler errors - assert_eq!(sink.drain(), Vec::::new()); - cn.notify(1); - cn.notify(2); - assert_eq!(sink.drain(), vec![1, 2]); - assert_eq!(format!("{cn:?}"), "Notifier(1)"); - } - #[test] fn erased_listener() { let sink = Sink::new(); @@ -430,7 +220,7 @@ mod tests { /// Demonstrate that [`DynListener`] implements [`fmt::Debug`]. #[test] - fn erased_debug() { + fn dyn_listener_debug() { let sink: Sink<&str> = Sink::new(); let listener: DynListener<&str> = Arc::new(sink.listener()); diff --git a/all-is-cubes/src/listen/listeners.rs b/all-is-cubes/src/listen/listeners.rs index 98fa9eea5..d104a5629 100644 --- a/all-is-cubes/src/listen/listeners.rs +++ b/all-is-cubes/src/listen/listeners.rs @@ -10,6 +10,8 @@ use manyfmt::Refmt as _; use crate::listen::{Listen, Listener}; use crate::util::maybe_sync::{RwLock, SendSyncIfStd}; +// ------------------------------------------------------------------------------------------------- + /// A [`Listener`] which discards all messages. /// /// Use this when a [`Listener`] is demanded, but there is nothing it should do. @@ -23,6 +25,8 @@ impl Listener for NullListener { } } +// ------------------------------------------------------------------------------------------------- + /// Tuples of listeners may be used to distribute messages. impl Listener for (L1, L2) where @@ -35,6 +39,8 @@ where } } +// ------------------------------------------------------------------------------------------------- + /// A [`Listener`] which delivers messages by calling a function on a [`Weak`] reference's /// referent, and stops when the weak reference breaks. #[derive(Clone)] @@ -82,6 +88,8 @@ where } } +// ------------------------------------------------------------------------------------------------- + /// A [`Listener`] which stores all the messages it receives. /// /// This is only intended for testing. @@ -163,6 +171,8 @@ impl Default for Sink { } } +// ------------------------------------------------------------------------------------------------- + /// A [`Listener`] destination which only stores a single flag indicating if any messages /// were received. pub struct DirtyFlag { @@ -234,6 +244,8 @@ impl Listener for DirtyFlagListener { } } +// ------------------------------------------------------------------------------------------------- + #[cfg(test)] mod tests { use super::*; diff --git a/all-is-cubes/src/listen/notifier.rs b/all-is-cubes/src/listen/notifier.rs new file mode 100644 index 000000000..ff9320d88 --- /dev/null +++ b/all-is-cubes/src/listen/notifier.rs @@ -0,0 +1,280 @@ +#![allow( + clippy::module_name_repetitions, + reason = "false positive; TODO: remove after Rust 1.84 is released" +)] + +use alloc::sync::Weak; +use alloc::vec::Vec; +use core::fmt; +use core::sync::atomic::{AtomicBool, Ordering::Relaxed}; + +#[cfg(doc)] +use alloc::sync::Arc; + +use crate::listen::{DynListener, Listen, Listener}; +use crate::util::maybe_sync::RwLock; + +#[cfg(doc)] +use crate::listen::ListenableCell; + +// ------------------------------------------------------------------------------------------------- + +/// Message broadcaster, usually used for change notifications. +/// +/// A `Notifier` delivers messages of type `M` to a dynamic set of [`Listener`]s. +/// +/// The `Notifier` is usually owned by some entity which emits messages when it changes, +/// such as a [`ListenableCell`]. +/// Each `Listener` usually holds a weak reference to allow it to be removed when the +/// actual recipient is gone or uninterested. +/// +/// [`Listener`]s may be added using the [`Listen`] implementation, and are removed when +/// they report themselves as dead. +pub struct Notifier { + pub(crate) listeners: RwLock>>, +} + +pub(crate) struct NotifierEntry { + pub(crate) listener: DynListener, + /// True iff every call to `listener.receive()` has returned true. + pub(crate) was_alive: AtomicBool, +} + +impl Notifier { + /// Constructs a new empty [`Notifier`]. + pub fn new() -> Self { + Self { + listeners: Default::default(), + } + } + + /// Returns a [`Listener`] which forwards messages to the listeners registered with + /// this `Notifier`, provided that it is owned by an [`Arc`]. + /// + /// This may be used together with [`Listener::filter()`] to forward notifications + /// of changes in dependencies. Using this operation means that the dependent does not + /// need to fan out listener registrations to all of its current dependencies. + /// + /// ``` + /// use std::sync::Arc; + /// use all_is_cubes::listen::{Listen, Notifier, Sink}; + /// + /// let notifier_1 = Notifier::new(); + /// let notifier_2 = Arc::new(Notifier::new()); + /// let mut sink = Sink::new(); + /// notifier_1.listen(Notifier::forwarder(Arc::downgrade(¬ifier_2))); + /// notifier_2.listen(sink.listener()); + /// # assert_eq!(notifier_1.count(), 1); + /// # assert_eq!(notifier_2.count(), 1); + /// + /// notifier_1.notify("a"); + /// assert_eq!(sink.drain(), vec!["a"]); + /// drop(notifier_2); + /// notifier_1.notify("a"); + /// assert!(sink.drain().is_empty()); + /// + /// # assert_eq!(notifier_1.count(), 0); + /// ``` + pub fn forwarder(this: Weak) -> NotifierForwarder { + NotifierForwarder(this) + } + + /// Deliver a message to all [`Listener`]s. + pub fn notify(&self, message: M) { + self.notify_many(&[message]) + } + + /// Deliver multiple messages to all [`Listener`]s. + pub fn notify_many(&self, messages: &[M]) { + for NotifierEntry { + listener, + was_alive, + } in self.listeners.read().unwrap().iter() + { + // Don't load was_alive before sending, because we assume the common case is that + // a listener implements receive() cheaply when it is dead. + let alive = listener.receive(messages); + + was_alive.fetch_and(alive, Relaxed); + } + } + + /// Creates a [`Buffer`] which batches messages sent through it. + /// This may be used as a more convenient interface to [`Notifier::notify_many()`], + /// at the cost of delaying messages until the buffer is dropped. + /// + /// The buffer does not use any heap allocations and will collect up to `CAPACITY` messages + /// per batch. + pub fn buffer(&self) -> Buffer<'_, M, CAPACITY> { + Buffer::new(self) + } + + /// Computes the exact count of listeners, including asking all current listeners + /// if they are alive. + /// + /// This operation is intended for testing and diagnostic purposes. + pub fn count(&self) -> usize { + let mut listeners = self.listeners.write().unwrap(); + Self::cleanup(&mut listeners); + listeners.len() + } + + /// Discard all dead weak pointers in `listeners`. + pub(crate) fn cleanup(listeners: &mut Vec>) { + let mut i = 0; + while i < listeners.len() { + let entry = &listeners[i]; + // We must ask the listener, not just consult was_alive, in order to avoid + // leaking memory if listen() is called repeatedly without any notify(). + // TODO: But we can skip it if the last operation was notify(). + if entry.was_alive.load(Relaxed) && entry.listener.receive(&[]) { + i += 1; + } else { + listeners.swap_remove(i); + } + } + } +} + +impl Listen for Notifier { + type Msg = M; + + fn listen + 'static>(&self, listener: L) { + if !listener.receive(&[]) { + // skip adding it if it's already dead + return; + } + let mut listeners = self.listeners.write().unwrap(); + // TODO: consider amortization by not doing cleanup every time + Self::cleanup(&mut listeners); + listeners.push(NotifierEntry { + listener: listener.erased(), + was_alive: AtomicBool::new(true), + }); + } +} + +impl Default for Notifier { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for Notifier { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + // not using fmt.debug_tuple() so this is never printed on multiple lines + if let Ok(listeners) = self.listeners.try_read() { + write!(fmt, "Notifier({})", listeners.len()) + } else { + write!(fmt, "Notifier(?)") + } + } +} + +// ------------------------------------------------------------------------------------------------- + +/// A batch of messages of type `M` to be sent through a [`Notifier`]. +/// +/// Messages may be added to the buffer, and when the buffer is full or when it is dropped, +/// they are sent through the notifier. Creating such a batch is intended to increase performance +/// by not executing dynamic dispatch to every notifier for every message. +#[derive(Debug)] +pub struct Buffer<'notifier, M, const CAPACITY: usize> { + pub(crate) buffer: arrayvec::ArrayVec, + pub(crate) notifier: &'notifier Notifier, +} + +impl<'notifier, M, const CAPACITY: usize> Buffer<'notifier, M, CAPACITY> { + pub(crate) fn new(notifier: &'notifier Notifier) -> Self { + Self { + buffer: arrayvec::ArrayVec::new(), + notifier, + } + } + + /// Store a message in this buffer, to be delivered later as if by [`Notifier::notify()`]. + pub fn push(&mut self, message: M) { + // We don't need to check for fullness before pushing, because we always flush immediately + // if full. + self.buffer.push(message); + if self.buffer.is_full() { + self.flush(); + } + } + + #[cold] + pub(crate) fn flush(&mut self) { + self.notifier.notify_many(&self.buffer); + self.buffer.clear(); + } +} + +impl Drop for Buffer<'_, M, CAPACITY> { + fn drop(&mut self) { + // TODO: Should we discard messages if panicking? + // Currently leaning no, because we've specified that listeners should not panic even under + // error conditions such as poisoned mutexes. + if !self.buffer.is_empty() { + self.flush(); + } + } +} + +// ------------------------------------------------------------------------------------------------- + +/// A [`Listener`] which forwards messages through a [`Notifier`] to its listeners. +/// Constructed by [`Notifier::forwarder()`]. +pub struct NotifierForwarder(pub(super) Weak>); + +impl fmt::Debug for NotifierForwarder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NotifierForwarder") + .field("alive(shallow)", &(self.0.strong_count() > 0)) + .finish_non_exhaustive() + } +} + +impl Listener for NotifierForwarder { + fn receive(&self, messages: &[M]) -> bool { + if let Some(notifier) = self.0.upgrade() { + notifier.notify_many(messages); + true + } else { + false + } + } +} + +impl Clone for NotifierForwarder { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +// ------------------------------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::listen::Sink; + + #[test] + fn notifier_basics_and_debug() { + let cn: Notifier = Notifier::new(); + assert_eq!(format!("{cn:?}"), "Notifier(0)"); + cn.notify(0); + assert_eq!(format!("{cn:?}"), "Notifier(0)"); + let sink = Sink::new(); + cn.listen(sink.listener()); + assert_eq!(format!("{cn:?}"), "Notifier(1)"); + // type annotation to prevent spurious inference failures in the presence + // of other compiler errors + assert_eq!(sink.drain(), Vec::::new()); + cn.notify(1); + cn.notify(2); + assert_eq!(sink.drain(), vec![1, 2]); + assert_eq!(format!("{cn:?}"), "Notifier(1)"); + } + + // Test for NotifierForwarder exists as a doc-test. +} diff --git a/all-is-cubes/src/listen/util.rs b/all-is-cubes/src/listen/util.rs index 9cd6f2401..61d525d95 100644 --- a/all-is-cubes/src/listen/util.rs +++ b/all-is-cubes/src/listen/util.rs @@ -4,7 +4,7 @@ use core::fmt; use manyfmt::formats::Unquote; use manyfmt::Refmt as _; -use crate::listen::{Listener, Notifier}; +use crate::listen::Listener; /// A [`Listener`] which transforms or discards messages before passing them on. /// Construct this using [`Listener::filter`]. @@ -150,39 +150,10 @@ where } } -/// A [`Listener`] which forwards messages through a [`Notifier`] to its listeners. -/// Constructed by [`Notifier::forwarder()`]. -pub struct NotifierForwarder(pub(super) Weak>); - -impl fmt::Debug for NotifierForwarder { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("NotifierForwarder") - .field("alive(shallow)", &(self.0.strong_count() > 0)) - .finish_non_exhaustive() - } -} - -impl Listener for NotifierForwarder { - fn receive(&self, messages: &[M]) -> bool { - if let Some(notifier) = self.0.upgrade() { - notifier.notify_many(messages); - true - } else { - false - } - } -} - -impl Clone for NotifierForwarder { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - #[cfg(test)] mod tests { use super::*; - use crate::listen::{Listen as _, Sink}; + use crate::listen::{Listen as _, Notifier, Sink}; use alloc::vec::Vec; /// Breaks the listener rules for testing by recording batch boundaries. @@ -302,6 +273,4 @@ mod tests { assert_eq!(notifier.count(), 0); } - - // Test for NotifierForwarder exists as a doc-test. }