Skip to content

Commit

Permalink
Merge pull request #9 from foresterre/arc_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
foresterre authored Jun 17, 2022
2 parents f2aef16 + 1d42a4e commit 7916b63
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 14 deletions.
19 changes: 17 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@

## [Unreleased]

[Unreleased]: https://github.com/foresterre/storyteller/compare/v0.5.0...HEAD
[Unreleased]: https://github.com/foresterre/storyteller/compare/v0.6.0...HEAD

## [0.6.0] - 2022-06-17

### Changed

*`EventListener::run_handler` now takes an `Arc` of the handler instead of moving the handler into the method

[0.6.0]: https://github.com/foresterre/bisector/compare/v0.5.0...v0.6.0


## [0.5.0] - 2022-06-16

### Changed

* Remove Disconnect Channel in `ChannelReporter`
* Remove Disconnect Channel in `ChannelReporter`
* Removed all disconnect related types, such as: `Disconnect`, `DisconnectSender`, `DisconnectReceiver`, `disconnect_channel()`
* Split process of disconnecting channel and waiting for unfinished events to be processed. The former can be done via `Reporter::disconnect()`, the latter via the new `FinishProcessing::finish_processing()`. As a result, if `FinishProcessing::finish_processing()` is not called after `Reporter::disconnect()`, events may go unprocessed.
* Caution: if `FinishProcessing::finish_processing()` is called before **`ChannelReporter::disconnect()`** (in case of the included `ChannelReporter` and `ChannelListener` implementations), the program will hang since the event handling thread will never be finish via the disconnect mechanism.
Expand All @@ -23,3 +32,9 @@
* Let the reporter take anything which can be converted into an Event via `impl Into<Reporter::Event>` instead of raw `Reporter::Event` instances.

[0.4.0]: https://github.com/foresterre/bisector/compare/v0.3.2...v0.4.0

# Legend

| Pictogram | Meaning |
|-----------|-----------------|
|| Breaking change |
12 changes: 6 additions & 6 deletions examples/json_lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,28 @@ fn main() {
//
// If we don't run the handler, we'll end up in an infinite loop, because our `reporter.disconnect()`
// below will block until it receives a Disconnect message.
let fin = listener.run_handler(handler);
let fin = listener.run_handler(Arc::new(handler));

#[allow(unused_must_use)]
// sending events can fail, but we'll assume they won't for this example
{
reporter.report_event(ExampleEvent::text("[status]\t\tOne"));
reporter.report_event(ExampleEvent::text("[status] One"));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::text("[status::before]\tTwo before reset"));
reporter.report_event(ExampleEvent::text("[status::before] Two before reset"));
reporter.report_event(ExampleEvent::event(MyEvent::Reset));
reporter.report_event(ExampleEvent::text("[status::after]\t\tTwo after reset"));
reporter.report_event(ExampleEvent::text("[status::after] Two after reset"));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::text("[status]\t\tThree"));
reporter.report_event(ExampleEvent::text("[status] Three"));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
reporter.report_event(ExampleEvent::text("[status]\t\tFour"));
reporter.report_event(ExampleEvent::text("[status] Four"));
}

// Within the ChannelReporter, the sender is dropped, thereby disconnecting the channel
Expand Down
3 changes: 2 additions & 1 deletion src/channel_reporter/listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::listener::FinishProcessing;
use crate::{EventHandler, EventListener, EventReceiver};
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

Expand Down Expand Up @@ -38,7 +39,7 @@ where
type Event = Event;
type FinishProcessingHandle = ChannelFinalizeHandler;

fn run_handler<H>(&self, handler: H) -> Self::FinishProcessingHandle
fn run_handler<H>(&self, handler: Arc<H>) -> Self::FinishProcessingHandle
where
H: EventHandler<Event = Self::Event> + 'static,
{
Expand Down
3 changes: 2 additions & 1 deletion src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::EventHandler;
use std::sync::Arc;

/// A listener, which listens to events from a [`Reporter`],
/// and can act upon these events by using an [`EventHandler`].
Expand All @@ -22,7 +23,7 @@ pub trait EventListener {
/// Can be used to stop running the event handler.
type FinishProcessingHandle: FinishProcessing;

fn run_handler<H>(&self, handler: H) -> Self::FinishProcessingHandle
fn run_handler<H>(&self, handler: Arc<H>) -> Self::FinishProcessingHandle
where
H: EventHandler<Event = Self::Event> + 'static;
}
Expand Down
4 changes: 2 additions & 2 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ fn bar() {
let reporter = ChannelReporter::new(sender);
let listener = ChannelEventListener::new(receiver);

let finalize_handle = listener.run_handler(handler);
let finalize_handle = listener.run_handler(Arc::new(handler));

reporter.report_event(ExampleEvent::text("[status]\t\tOne"));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
Expand Down Expand Up @@ -151,7 +151,7 @@ fn json() {
let reporter = ChannelReporter::new(sender);
let listener = ChannelEventListener::new(receiver);

let finalize_handle = listener.run_handler(handler);
let finalize_handle = listener.run_handler(Arc::new(handler));

reporter.report_event(ExampleEvent::text("[status]\t\tOne"));
reporter.report_event(ExampleEvent::event(MyEvent::Increment));
Expand Down
5 changes: 3 additions & 2 deletions tests/test_handler.rs → tests/collecting_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
extern crate core;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use storyteller::{
event_channel, ChannelEventListener, ChannelReporter, EventHandler, EventListener,
FinishProcessing, Reporter,
Expand Down Expand Up @@ -73,7 +74,7 @@ fn test() {
MyEvent(4),
]);

let fin = listener.run_handler(handler);
let fin = listener.run_handler(Arc::new(handler));

for i in 0..5 {
reporter.report_event(MyEvent(i)).unwrap();
Expand All @@ -97,7 +98,7 @@ fn expect_failure(expected_events: Vec<MyEvent>) {

let handler = CollectingHandler::new(expected_events);

let fin = listener.run_handler(handler);
let fin = listener.run_handler(Arc::new(handler));

for i in 0..5 {
reporter.report_event(MyEvent(i)).unwrap();
Expand Down
94 changes: 94 additions & 0 deletions tests/registering_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// A sample implementation which collects the events it receives
#![cfg(feature = "channel_reporter")]
extern crate core;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use storyteller::{
event_channel, ChannelEventListener, ChannelReporter, EventHandler, EventListener,
FinishProcessing, Reporter,
};

#[derive(Clone, Debug, Eq, PartialEq)]
struct MyEvent(usize);

// Caution: does only check whether `received` events match expected events
// Must also use `FinalizeHandler::finish_processing` to ensure panic's are caught.
struct RegisteringHandler {
registered_events: Arc<Mutex<Vec<MyEvent>>>,
}

impl RegisteringHandler {
fn new() -> Self {
Self {
registered_events: Arc::new(Mutex::new(Vec::new())),
}
}

fn events(&self) -> Vec<MyEvent> {
let guard = self.registered_events.lock().unwrap();
guard.clone()
}
}

impl EventHandler for RegisteringHandler {
type Event = MyEvent;

fn handle(&self, event: Self::Event) {
let mut guard = self.registered_events.lock().unwrap();
guard.push(event);

dbg!(&guard);
}
}

#[test]
fn test() {
let (event_sender, event_receiver) = event_channel::<MyEvent>();

let reporter = ChannelReporter::new(event_sender);
let listener = ChannelEventListener::new(event_receiver);

let handler = Arc::new(RegisteringHandler::new());
let fin = listener.run_handler(handler.clone());

for i in 0..5 {
reporter.report_event(MyEvent(i)).unwrap();
}

reporter.disconnect().unwrap();
fin.finish_processing().unwrap();

// NB: Order is important, must be placed after finish_processing() to ensure all expected
// events have been processed
let expected = vec![MyEvent(0), MyEvent(1), MyEvent(2), MyEvent(3), MyEvent(4)];
assert_eq!(handler.events(), expected);
}

#[yare::parameterized(
to_few = { vec![ MyEvent(0), MyEvent(1), MyEvent(2), MyEvent(3), MyEvent(4), MyEvent(5)] },
to_many = { vec![ MyEvent(0), MyEvent(1), MyEvent(2), MyEvent(3) ] },
incorrect = { vec![ MyEvent(0), MyEvent(1), MyEvent(2), MyEvent(3), MyEvent(5), ] },
)]
#[should_panic]
fn expect_failure(expected_events: Vec<MyEvent>) {
let (event_sender, event_receiver) = event_channel::<MyEvent>();

let reporter = ChannelReporter::new(event_sender);
let listener = ChannelEventListener::new(event_receiver);

let handler = Arc::new(RegisteringHandler::new());

let fin = listener.run_handler(handler.clone());

for i in 0..5 {
reporter.report_event(MyEvent(i)).unwrap();
}

reporter.disconnect().unwrap();
fin.finish_processing().unwrap();

// NB: Order is important, must be placed after finish_processing() to ensure all expected
// events have been processed
assert_eq!(handler.events(), expected_events);
}

0 comments on commit 7916b63

Please sign in to comment.