From fa10eef29e88046ac158f048798a9eec8f3a99c8 Mon Sep 17 00:00:00 2001 From: Martijn Gribnau Date: Fri, 17 Jun 2022 16:54:53 +0200 Subject: [PATCH 1/2] EventListener::run_handler now takes an Arc to the handler instead of moving it This is convenient when the handler stores, or refers to outside state. By taking an atomic reference, we can simply clone the reference counting ptr and keep it around to inspect the state of the handler (assuming the handler allows inspection of its contents). The cost compared to just moving the handler is negligible compared to the expected cost of the running lifetime of the handler. Unfortunately, since thread::spawn takes an F: 'static, we can't just take the handler by reference. We may be able to use scoped threads and put the scope in the EventListener::FinishProcessingHandle, but doing so would require doing acrobatics with lifetimes beyond the added value, over the little added cost of introducing the Arc. A bigger disadvantage of taking an Arc is that the Arc will be present in the EventListener::run_handler API. Why Arc over Rc? It is expected that since the handler must run in some place where it doesn't block the main loop, some form of concurrency will be exhibited. As such, we would require the atomicity of the Arc. --- examples/json_lines.rs | 12 +-- src/channel_reporter/listener.rs | 3 +- src/listener.rs | 3 +- src/tests.rs | 4 +- ...{test_handler.rs => collecting_handler.rs} | 5 +- tests/registering_handler.rs | 94 +++++++++++++++++++ 6 files changed, 109 insertions(+), 12 deletions(-) rename tests/{test_handler.rs => collecting_handler.rs} (95%) create mode 100644 tests/registering_handler.rs diff --git a/examples/json_lines.rs b/examples/json_lines.rs index 793c24e..ec5f210 100644 --- a/examples/json_lines.rs +++ b/examples/json_lines.rs @@ -27,28 +27,28 @@ fn main() { // // If we don't run the handler, we'll end up in an infinite loop, because our `reporter.disconnect()` // below will block until it receives a Disconnect message. - let fin = listener.run_handler(handler); + let fin = listener.run_handler(Arc::new(handler)); #[allow(unused_must_use)] // sending events can fail, but we'll assume they won't for this example { - reporter.report_event(ExampleEvent::text("[status]\t\tOne")); + reporter.report_event(ExampleEvent::text("[status] One")); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); - reporter.report_event(ExampleEvent::text("[status::before]\tTwo before reset")); + reporter.report_event(ExampleEvent::text("[status::before] Two before reset")); reporter.report_event(ExampleEvent::event(MyEvent::Reset)); - reporter.report_event(ExampleEvent::text("[status::after]\t\tTwo after reset")); + reporter.report_event(ExampleEvent::text("[status::after] Two after reset")); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); - reporter.report_event(ExampleEvent::text("[status]\t\tThree")); + reporter.report_event(ExampleEvent::text("[status] Three")); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); - reporter.report_event(ExampleEvent::text("[status]\t\tFour")); + reporter.report_event(ExampleEvent::text("[status] Four")); } // Within the ChannelReporter, the sender is dropped, thereby disconnecting the channel diff --git a/src/channel_reporter/listener.rs b/src/channel_reporter/listener.rs index 736b5d3..74b7c34 100644 --- a/src/channel_reporter/listener.rs +++ b/src/channel_reporter/listener.rs @@ -1,5 +1,6 @@ use crate::listener::FinishProcessing; use crate::{EventHandler, EventListener, EventReceiver}; +use std::sync::Arc; use std::thread; use std::thread::JoinHandle; @@ -38,7 +39,7 @@ where type Event = Event; type FinishProcessingHandle = ChannelFinalizeHandler; - fn run_handler(&self, handler: H) -> Self::FinishProcessingHandle + fn run_handler(&self, handler: Arc) -> Self::FinishProcessingHandle where H: EventHandler + 'static, { diff --git a/src/listener.rs b/src/listener.rs index c56f8a8..a8a6325 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -1,4 +1,5 @@ use crate::EventHandler; +use std::sync::Arc; /// A listener, which listens to events from a [`Reporter`], /// and can act upon these events by using an [`EventHandler`]. @@ -22,7 +23,7 @@ pub trait EventListener { /// Can be used to stop running the event handler. type FinishProcessingHandle: FinishProcessing; - fn run_handler(&self, handler: H) -> Self::FinishProcessingHandle + fn run_handler(&self, handler: Arc) -> Self::FinishProcessingHandle where H: EventHandler + 'static; } diff --git a/src/tests.rs b/src/tests.rs index d6584e5..ba903b5 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -119,7 +119,7 @@ fn bar() { let reporter = ChannelReporter::new(sender); let listener = ChannelEventListener::new(receiver); - let finalize_handle = listener.run_handler(handler); + let finalize_handle = listener.run_handler(Arc::new(handler)); reporter.report_event(ExampleEvent::text("[status]\t\tOne")); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); @@ -151,7 +151,7 @@ fn json() { let reporter = ChannelReporter::new(sender); let listener = ChannelEventListener::new(receiver); - let finalize_handle = listener.run_handler(handler); + let finalize_handle = listener.run_handler(Arc::new(handler)); reporter.report_event(ExampleEvent::text("[status]\t\tOne")); reporter.report_event(ExampleEvent::event(MyEvent::Increment)); diff --git a/tests/test_handler.rs b/tests/collecting_handler.rs similarity index 95% rename from tests/test_handler.rs rename to tests/collecting_handler.rs index 9b03e7a..9c64a18 100644 --- a/tests/test_handler.rs +++ b/tests/collecting_handler.rs @@ -3,6 +3,7 @@ extern crate core; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use storyteller::{ event_channel, ChannelEventListener, ChannelReporter, EventHandler, EventListener, FinishProcessing, Reporter, @@ -73,7 +74,7 @@ fn test() { MyEvent(4), ]); - let fin = listener.run_handler(handler); + let fin = listener.run_handler(Arc::new(handler)); for i in 0..5 { reporter.report_event(MyEvent(i)).unwrap(); @@ -97,7 +98,7 @@ fn expect_failure(expected_events: Vec) { let handler = CollectingHandler::new(expected_events); - let fin = listener.run_handler(handler); + let fin = listener.run_handler(Arc::new(handler)); for i in 0..5 { reporter.report_event(MyEvent(i)).unwrap(); diff --git a/tests/registering_handler.rs b/tests/registering_handler.rs new file mode 100644 index 0000000..db229a2 --- /dev/null +++ b/tests/registering_handler.rs @@ -0,0 +1,94 @@ +// A sample implementation which collects the events it receives +#![cfg(feature = "channel_reporter")] +extern crate core; + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use storyteller::{ + event_channel, ChannelEventListener, ChannelReporter, EventHandler, EventListener, + FinishProcessing, Reporter, +}; + +#[derive(Clone, Debug, Eq, PartialEq)] +struct MyEvent(usize); + +// Caution: does only check whether `received` events match expected events +// Must also use `FinalizeHandler::finish_processing` to ensure panic's are caught. +struct RegisteringHandler { + registered_events: Arc>>, +} + +impl RegisteringHandler { + fn new() -> Self { + Self { + registered_events: Arc::new(Mutex::new(Vec::new())), + } + } + + fn events(&self) -> Vec { + let guard = self.registered_events.lock().unwrap(); + guard.clone() + } +} + +impl EventHandler for RegisteringHandler { + type Event = MyEvent; + + fn handle(&self, event: Self::Event) { + let mut guard = self.registered_events.lock().unwrap(); + guard.push(event); + + dbg!(&guard); + } +} + +#[test] +fn test() { + let (event_sender, event_receiver) = event_channel::(); + + let reporter = ChannelReporter::new(event_sender); + let listener = ChannelEventListener::new(event_receiver); + + let handler = Arc::new(RegisteringHandler::new()); + let fin = listener.run_handler(handler.clone()); + + for i in 0..5 { + reporter.report_event(MyEvent(i)).unwrap(); + } + + reporter.disconnect().unwrap(); + fin.finish_processing().unwrap(); + + // NB: Order is important, must be placed after finish_processing() to ensure all expected + // events have been processed + let expected = vec![MyEvent(0), MyEvent(1), MyEvent(2), MyEvent(3), MyEvent(4)]; + assert_eq!(handler.events(), expected); +} + +#[yare::parameterized( + to_few = { vec![ MyEvent(0), MyEvent(1), MyEvent(2), MyEvent(3), MyEvent(4), MyEvent(5)] }, + to_many = { vec![ MyEvent(0), MyEvent(1), MyEvent(2), MyEvent(3) ] }, + incorrect = { vec![ MyEvent(0), MyEvent(1), MyEvent(2), MyEvent(3), MyEvent(5), ] }, +)] +#[should_panic] +fn expect_failure(expected_events: Vec) { + let (event_sender, event_receiver) = event_channel::(); + + let reporter = ChannelReporter::new(event_sender); + let listener = ChannelEventListener::new(event_receiver); + + let handler = Arc::new(RegisteringHandler::new()); + + let fin = listener.run_handler(handler.clone()); + + for i in 0..5 { + reporter.report_event(MyEvent(i)).unwrap(); + } + + reporter.disconnect().unwrap(); + fin.finish_processing().unwrap(); + + // NB: Order is important, must be placed after finish_processing() to ensure all expected + // events have been processed + assert_eq!(handler.events(), expected_events); +} From 1d42a4e09ebaf120cdc5b0159eb398783b1e4221 Mon Sep 17 00:00:00 2001 From: Martijn Gribnau Date: Fri, 17 Jun 2022 16:59:41 +0200 Subject: [PATCH 2/2] Update changelog --- CHANGELOG.md | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e9990b..2522250 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,13 +2,22 @@ ## [Unreleased] -[Unreleased]: https://github.com/foresterre/storyteller/compare/v0.5.0...HEAD +[Unreleased]: https://github.com/foresterre/storyteller/compare/v0.6.0...HEAD + +## [0.6.0] - 2022-06-17 + +### Changed + +* ⚠ `EventListener::run_handler` now takes an `Arc` of the handler instead of moving the handler into the method + +[0.6.0]: https://github.com/foresterre/bisector/compare/v0.5.0...v0.6.0 + ## [0.5.0] - 2022-06-16 ### Changed -* Remove Disconnect Channel in `ChannelReporter` +* ⚠ Remove Disconnect Channel in `ChannelReporter` * Removed all disconnect related types, such as: `Disconnect`, `DisconnectSender`, `DisconnectReceiver`, `disconnect_channel()` * Split process of disconnecting channel and waiting for unfinished events to be processed. The former can be done via `Reporter::disconnect()`, the latter via the new `FinishProcessing::finish_processing()`. As a result, if `FinishProcessing::finish_processing()` is not called after `Reporter::disconnect()`, events may go unprocessed. * Caution: if `FinishProcessing::finish_processing()` is called before **`ChannelReporter::disconnect()`** (in case of the included `ChannelReporter` and `ChannelListener` implementations), the program will hang since the event handling thread will never be finish via the disconnect mechanism. @@ -23,3 +32,9 @@ * Let the reporter take anything which can be converted into an Event via `impl Into` instead of raw `Reporter::Event` instances. [0.4.0]: https://github.com/foresterre/bisector/compare/v0.3.2...v0.4.0 + +# Legend + +| Pictogram | Meaning | +|-----------|-----------------| +| ⚠ | Breaking change |