Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace event channel with broadcast channel #5478

Merged
merged 2 commits into from
Apr 22, 2024

Conversation

link2xt
Copy link
Collaborator

@link2xt link2xt commented Apr 19, 2024

Purpose of this PR is to make emitter events immediately available to TestContext "event tracker". Before this PR "event tracker" relied on this task copying events from EventEmitter to "event tracker" and LogSink:

task::spawn(async move {
while let Some(event) = events.recv().await {
for sender in senders.read().await.iter() {
// Don't block because someone wanted to use a oneshot receiver, use
// an unbounded channel if you want all events.
sender.try_send(event.clone()).ok();
}
log_sender.try_send(LogEvent::Event(event.clone())).ok();
}
});

The problem with current approach is that events are not available to event tracker immediately after emitting them because this task may not be waken up immediately after emitting the event. This makes clear_events unreliable as it only consumes events that have already been moved from event emitter to event tracker.

This PR replaces event channel with broadcast channel, so it is possible to have primary EventEmitter and "event tracker" receiving events immediately. LogSink now also gets its own broadcast receiver for each account and runs a task moving events from receivers to the log channel. This may result in reordering of events in the logs, but it is not as critical as "event tracker" not receiving events immediately as it does not break tests.

Related comment:
#5471 (comment)

There is another PR solving the problem of clearing events in event tracker in Python by using special "checkpoint" events: #5477
In Python current approach from this PR is not possible because events are polled via get_next_event call which is not guaranteed to return result immediately as event is available.

@link2xt link2xt force-pushed the link2xt/broadcast-event-channel branch 11 times, most recently from 6d0725b to c86528c Compare April 20, 2024 07:40
@link2xt link2xt changed the title WIP: Replace event channel with broadcast channel Replace event channel with broadcast channel Apr 20, 2024
@link2xt link2xt force-pushed the link2xt/broadcast-event-channel branch 4 times, most recently from 2277ddc to 3853696 Compare April 20, 2024 10:07
@link2xt link2xt marked this pull request as ready for review April 20, 2024 10:08
@link2xt link2xt force-pushed the link2xt/broadcast-event-channel branch 3 times, most recently from 6836aab to 4928dc2 Compare April 20, 2024 10:25
@link2xt link2xt requested review from iequidoo and Simon-Laux April 20, 2024 10:25
@@ -11,8 +10,11 @@ pub use self::payload::EventType;
/// Event channel.
#[derive(Debug, Clone)]
pub struct Events {
receiver: Receiver<Event>,
sender: Sender<Event>,
/// Unused receiver to prevent the channel from closing.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ugly, but removing it actually makes a lot of tests fail.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have deactivated the receiver. This is a documented use of InactiveReceiver, so I guess this is how this is supposed to be done:
https://docs.rs/async-broadcast/0.7.0/async_broadcast/struct.InactiveReceiver.html

#[pin_project]
pub struct EventEmitter(#[pin] Receiver<Event>);
#[derive(Debug)]
pub struct EventEmitter(Mutex<async_broadcast::Receiver<Event>>);
Copy link
Collaborator Author

@link2xt link2xt Apr 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Mutex is needed because recv of broadcast channel takes &mut self instead of &self like with a normal channel. This is the same for async-broadcast crate I used and tokio broadcast channels. I think the idea is that there is no need for synchronization if receiver is only used by a single thread as in broadcast channels each receiver reads from its own chunk of memory, but in our case we expose event emitter to FFI and otherwise allow using it from multiple threads, so we want thread-safety and have to add our own synchronization.

src/events.rs Outdated
///
/// Returns `None` if no events are available for reception.
pub async fn try_recv(&self) -> Option<Event> {
let mut lock = self.0.lock().await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, EventEmitter::recv() can be called in parallel by multiple tasks, try_recv() as well, but together they can't be called in parallel as try_recv() would wait for recv(). How critical it is? Afaiu not at all as each task should use its own emitter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean if we call recv() and it starts waiting, attempt to call try_recv() will lock until recv() returns? The other way round we cannot easily deadlock as try_recv() returns immediately.

This is actually a bug, at least needs a comment above recv that it may lock out try_recv if there is no easy way to solve it.

Copy link
Collaborator Author

@link2xt link2xt Apr 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.rs/async-broadcast/0.7.0/async_broadcast/index.html describes broadcast channel as "MPMC". But it accepts &mut self in both https://docs.rs/async-broadcast/0.7.0/async_broadcast/struct.Receiver.html#method.recv and https://docs.rs/async-broadcast/0.7.0/async_broadcast/struct.Receiver.html#method.try_recv

So it is not MPMC in the same sense as ordinary channel is, multiple "consumers" cannot read from the same receiver at the same time. I filed upstream issue: smol-rs/async-broadcast#57

Copy link
Collaborator

@iequidoo iequidoo Apr 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it is not MPMC in the same sense as ordinary channel is, multiple "consumers" cannot read from the same receiver at the same time. Maybe this should be filed as a bug to async-broadcast?

I think they meant that multiple consumers should use multiple receivers. But then yes, it's not MPMC because those receivers don't "steal" messages from each other, but receive message copies.

This is actually a bug, at least needs a comment above recv that it may lock out try_recv if there is no easy way to solve it.

Seems that adding some flag like is_empty: RwLock<bool> (set by recv()) solves this, but then try_recv() can miss events if the flag isn't yet reset, but new events have already arrived.

EDIT: But probably it's ok to miss events if there are other consumers working in parallel.

Copy link
Collaborator

@iequidoo iequidoo Apr 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even bool isn't needed, just RwLock<()>. recv() should do try_recv() first which takes a read lock (i.e. calls try_read()) and if try_recv() didn't succeed, take a write lock

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made try_recv() use try_lock on the mutex. So try_recv may return an error if there are concurrent calls to recv or try_recv, but we don't use it from concurrent threads so it is fine. It is now documented that it may return an error and good thing is that try_recv is non-async again.

src/test_utils.rs Outdated Show resolved Hide resolved
@link2xt link2xt force-pushed the link2xt/broadcast-event-channel branch from 163959d to 2b6c35b Compare April 21, 2024 02:30
@link2xt link2xt force-pushed the link2xt/broadcast-event-channel branch from 2b6c35b to d7a16c4 Compare April 21, 2024 02:33
src/events.rs Outdated
/// Tries to receive an event without blocking.
///
/// Returns error if no events are available for reception
/// or if receiver is blocked by a concurrent call to [`recv`].
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or a concurrent call to try_recv() i guess. If it's done from another os thread

This makes `EventTracker` receive events immediately
instead of being moved from event emitter to event tracker
by a task spawned from `TestContext::new_internal`.

This makes `EventTracker.clear_events` reliable
as it is guaranteed to remove all events emitted
by the time it is called rather than only events
that have been moved already.
@link2xt link2xt force-pushed the link2xt/broadcast-event-channel branch from 2c0592a to 5241ed6 Compare April 22, 2024 06:24
@link2xt link2xt merged commit 34f4ec0 into main Apr 22, 2024
38 checks passed
@link2xt link2xt deleted the link2xt/broadcast-event-channel branch April 22, 2024 07:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants