-
-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace event channel with broadcast channel #5478
Conversation
6d0725b
to
c86528c
Compare
2277ddc
to
3853696
Compare
6836aab
to
4928dc2
Compare
@@ -11,8 +10,11 @@ pub use self::payload::EventType; | |||
/// Event channel. | |||
#[derive(Debug, Clone)] | |||
pub struct Events { | |||
receiver: Receiver<Event>, | |||
sender: Sender<Event>, | |||
/// Unused receiver to prevent the channel from closing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ugly, but removing it actually makes a lot of tests fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have deactivated the receiver. This is a documented use of InactiveReceiver
, so I guess this is how this is supposed to be done:
https://docs.rs/async-broadcast/0.7.0/async_broadcast/struct.InactiveReceiver.html
4928dc2
to
aca11ab
Compare
#[pin_project] | ||
pub struct EventEmitter(#[pin] Receiver<Event>); | ||
#[derive(Debug)] | ||
pub struct EventEmitter(Mutex<async_broadcast::Receiver<Event>>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Mutex
is needed because recv
of broadcast channel takes &mut self
instead of &self
like with a normal channel. This is the same for async-broadcast
crate I used and tokio broadcast channels. I think the idea is that there is no need for synchronization if receiver is only used by a single thread as in broadcast channels each receiver reads from its own chunk of memory, but in our case we expose event emitter to FFI and otherwise allow using it from multiple threads, so we want thread-safety and have to add our own synchronization.
src/events.rs
Outdated
/// | ||
/// Returns `None` if no events are available for reception. | ||
pub async fn try_recv(&self) -> Option<Event> { | ||
let mut lock = self.0.lock().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, EventEmitter::recv()
can be called in parallel by multiple tasks, try_recv()
as well, but together they can't be called in parallel as try_recv()
would wait for recv()
. How critical it is? Afaiu not at all as each task should use its own emitter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean if we call recv()
and it starts waiting, attempt to call try_recv()
will lock until recv()
returns? The other way round we cannot easily deadlock as try_recv()
returns immediately.
This is actually a bug, at least needs a comment above recv
that it may lock out try_recv
if there is no easy way to solve it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.rs/async-broadcast/0.7.0/async_broadcast/index.html describes broadcast channel as "MPMC". But it accepts &mut self
in both https://docs.rs/async-broadcast/0.7.0/async_broadcast/struct.Receiver.html#method.recv and https://docs.rs/async-broadcast/0.7.0/async_broadcast/struct.Receiver.html#method.try_recv
So it is not MPMC in the same sense as ordinary channel is, multiple "consumers" cannot read from the same receiver at the same time. I filed upstream issue: smol-rs/async-broadcast#57
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it is not MPMC in the same sense as ordinary channel is, multiple "consumers" cannot read from the same receiver at the same time. Maybe this should be filed as a bug to async-broadcast?
I think they meant that multiple consumers should use multiple receivers. But then yes, it's not MPMC because those receivers don't "steal" messages from each other, but receive message copies.
This is actually a bug, at least needs a comment above
recv
that it may lock outtry_recv
if there is no easy way to solve it.
Seems that adding some flag like is_empty: RwLock<bool>
(set by recv()
) solves this, but then try_recv()
can miss events if the flag isn't yet reset, but new events have already arrived.
EDIT: But probably it's ok to miss events if there are other consumers working in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even bool
isn't needed, just RwLock<()>
. recv()
should do try_recv()
first which takes a read lock (i.e. calls try_read()
) and if try_recv()
didn't succeed, take a write lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made try_recv()
use try_lock
on the mutex. So try_recv
may return an error if there are concurrent calls to recv
or try_recv
, but we don't use it from concurrent threads so it is fine. It is now documented that it may return an error and good thing is that try_recv
is non-async again.
163959d
to
2b6c35b
Compare
2b6c35b
to
d7a16c4
Compare
1357019
to
2c0592a
Compare
src/events.rs
Outdated
/// Tries to receive an event without blocking. | ||
/// | ||
/// Returns error if no events are available for reception | ||
/// or if receiver is blocked by a concurrent call to [`recv`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or a concurrent call to try_recv()
i guess. If it's done from another os thread
This makes `EventTracker` receive events immediately instead of being moved from event emitter to event tracker by a task spawned from `TestContext::new_internal`. This makes `EventTracker.clear_events` reliable as it is guaranteed to remove all events emitted by the time it is called rather than only events that have been moved already.
2c0592a
to
5241ed6
Compare
Purpose of this PR is to make emitter events immediately available to
TestContext
"event tracker". Before this PR "event tracker" relied on this task copying events fromEventEmitter
to "event tracker" andLogSink
:deltachat-core-rust/src/test_utils.rs
Lines 367 to 376 in 65822e5
The problem with current approach is that events are not available to event tracker immediately after emitting them because this task may not be waken up immediately after emitting the event. This makes
clear_events
unreliable as it only consumes events that have already been moved from event emitter to event tracker.This PR replaces event channel with broadcast channel, so it is possible to have primary
EventEmitter
and "event tracker" receiving events immediately.LogSink
now also gets its own broadcast receiver for each account and runs a task moving events from receivers to the log channel. This may result in reordering of events in the logs, but it is not as critical as "event tracker" not receiving events immediately as it does not break tests.Related comment:
#5471 (comment)
There is another PR solving the problem of clearing events in event tracker in Python by using special "checkpoint" events: #5477
In Python current approach from this PR is not possible because events are polled via
get_next_event
call which is not guaranteed to return result immediately as event is available.