Skip to content

Commit

Permalink
feat: replace event channel with broadcast channel
Browse files Browse the repository at this point in the history
This makes `EventTracker` receive events immediately
instead of being moved from event emitter to event tracker
by a task spawned from `TestContext::new_internal`.

This makes `EventTracker.clear_events` reliable
as it is guaranteed to remove all events emitted
by the time it is called rather than only events
that have been moved already.
  • Loading branch information
link2xt committed Apr 22, 2024
1 parent 72d5a38 commit 34f4ec0
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 137 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ format-flowed = { path = "./format-flowed" }
ratelimit = { path = "./deltachat-ratelimit" }

anyhow = { workspace = true }
async-broadcast = "0.7.0"
async-channel = "2.0.0"
async-imap = { version = "0.9.7", default-features = false, features = ["runtime-tokio"] }
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
Expand Down
8 changes: 6 additions & 2 deletions deltachat-ffi/deltachat.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,12 @@ uint32_t dc_get_id (dc_context_t* context);
* Must be freed using dc_event_emitter_unref() after usage.
*
* Note: Use only one event emitter per context.
* Having more than one event emitter running at the same time on the same context
* will result in events being randomly delivered to one of the emitters.
* The result of having multiple event emitters is unspecified.
* Currently events are broadcasted to all existing event emitters,
* but previous versions delivered events to only one event emitter
* and this behavior may change again in the future.
* Events emitted before creation of event emitter
* may or may not be available to event emitter.
*/
dc_event_emitter_t* dc_get_event_emitter(dc_context_t* context);

Expand Down
4 changes: 3 additions & 1 deletion deltachat-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4922,7 +4922,9 @@ mod jsonrpc {
}

let account_manager = &*account_manager;
let cmd_api = deltachat_jsonrpc::api::CommandApi::from_arc(account_manager.inner.clone());
let cmd_api = block_on(deltachat_jsonrpc::api::CommandApi::from_arc(
account_manager.inner.clone(),
));

let (request_handle, receiver) = RpcClient::new();
let handle = RpcSession::new(request_handle, cmd_api);
Expand Down
15 changes: 12 additions & 3 deletions deltachat-jsonrpc/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use deltachat::reaction::{get_msg_reactions, send_reaction};
use deltachat::securejoin;
use deltachat::stock_str::StockMessage;
use deltachat::webxdc::StatusUpdateSerial;
use deltachat::EventEmitter;
use sanitize_filename::is_sanitized;
use tokio::fs;
use tokio::sync::{watch, Mutex, RwLock};
Expand Down Expand Up @@ -81,21 +82,30 @@ impl Default for AccountState {
pub struct CommandApi {
pub(crate) accounts: Arc<RwLock<Accounts>>,

/// Receiver side of the event channel.
///
/// Events from it can be received by calling `get_next_event` method.
event_emitter: Arc<EventEmitter>,

states: Arc<Mutex<BTreeMap<u32, AccountState>>>,
}

impl CommandApi {
pub fn new(accounts: Accounts) -> Self {
let event_emitter = Arc::new(accounts.get_event_emitter());
CommandApi {
accounts: Arc::new(RwLock::new(accounts)),
event_emitter,
states: Arc::new(Mutex::new(BTreeMap::new())),
}
}

#[allow(dead_code)]
pub fn from_arc(accounts: Arc<RwLock<Accounts>>) -> Self {
pub async fn from_arc(accounts: Arc<RwLock<Accounts>>) -> Self {
let event_emitter = Arc::new(accounts.read().await.get_event_emitter());
CommandApi {
accounts,
event_emitter,
states: Arc::new(Mutex::new(BTreeMap::new())),
}
}
Expand Down Expand Up @@ -158,8 +168,7 @@ impl CommandApi {

/// Get the next event.
async fn get_next_event(&self) -> Result<Event> {
let event_emitter = self.accounts.read().await.get_event_emitter();
event_emitter
self.event_emitter
.recv()
.await
.map(|event| event.into())
Expand Down
2 changes: 1 addition & 1 deletion deltachat-rpc-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn main_impl() -> Result<()> {

log::info!("Creating JSON-RPC API.");
let accounts = Arc::new(RwLock::new(accounts));
let state = CommandApi::from_arc(accounts.clone());
let state = CommandApi::from_arc(accounts.clone()).await;

let (client, mut out_receiver) = RpcClient::new();
let session = RpcSession::new(client.clone(), state.clone());
Expand Down
4 changes: 2 additions & 2 deletions src/contact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2730,7 +2730,7 @@ Hi."#;
let sent_msg = alice.send_text(chat.id, "moin").await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
assert!(!contact.was_seen_recently());
while bob.evtracker.try_recv().is_ok() {}
bob.evtracker.clear_events();
bob.recv_msg(&sent_msg).await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
assert!(contact.was_seen_recently());
Expand All @@ -2742,7 +2742,7 @@ Hi."#;
.await;

// Wait for `was_seen_recently()` to turn off.
while bob.evtracker.try_recv().is_ok() {}
bob.evtracker.clear_events();
SystemTime::shift(Duration::from_secs(SEEN_RECENTLY_SECONDS as u64 * 2));
recently_seen_loop.interrupt(ContactId::UNDEFINED, 0).await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
Expand Down
64 changes: 42 additions & 22 deletions src/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! # Events specification.
use async_channel::{self as channel, Receiver, Sender, TrySendError};
use anyhow::Result;
use tokio::sync::Mutex;

pub(crate) mod chatlist_events;
mod payload;
Expand All @@ -10,8 +11,11 @@ pub use self::payload::EventType;
/// Event channel.
#[derive(Debug, Clone)]
pub struct Events {
receiver: Receiver<Event>,
sender: Sender<Event>,
/// Unused receiver to prevent the channel from closing.
_receiver: async_broadcast::InactiveReceiver<Event>,

/// Sender side of the event channel.
sender: async_broadcast::Sender<Event>,
}

impl Default for Events {
Expand All @@ -23,33 +27,30 @@ impl Default for Events {
impl Events {
/// Creates a new event channel.
pub fn new() -> Self {
let (sender, receiver) = channel::bounded(1_000);
let (mut sender, _receiver) = async_broadcast::broadcast(1_000);

// We only keep this receiver around
// to prevent the channel from closing.
// Deactivating it to prevent it from consuming memory
// holding events that are not going to be received.
let _receiver = _receiver.deactivate();

// Remove oldest event on overflow.
sender.set_overflow(true);

Self { receiver, sender }
Self { _receiver, sender }
}

/// Emits an event into event channel.
///
/// If the channel is full, deletes the oldest event first.
pub fn emit(&self, event: Event) {
match self.sender.try_send(event) {
Ok(()) => {}
Err(TrySendError::Full(event)) => {
// when we are full, we pop remove the oldest event and push on the new one
let _ = self.receiver.try_recv();

// try again
self.emit(event);
}
Err(TrySendError::Closed(_)) => {
unreachable!("unable to emit event, channel disconnected");
}
}
self.sender.try_broadcast(event).ok();
}

/// Creates an event emitter.
pub fn get_emitter(&self) -> EventEmitter {
EventEmitter(self.receiver.clone())
EventEmitter(Mutex::new(self.sender.new_receiver()))
}
}

Expand All @@ -61,13 +62,32 @@ impl Events {
///
/// [`Context`]: crate::context::Context
/// [`Context::get_event_emitter`]: crate::context::Context::get_event_emitter
#[derive(Debug, Clone)]
pub struct EventEmitter(Receiver<Event>);
#[derive(Debug)]
pub struct EventEmitter(Mutex<async_broadcast::Receiver<Event>>);

impl EventEmitter {
/// Async recv of an event. Return `None` if the `Sender` has been dropped.
///
/// [`try_recv`]: Self::try_recv
pub async fn recv(&self) -> Option<Event> {
self.0.recv().await.ok()
let mut lock = self.0.lock().await;
lock.recv().await.ok()
}

/// Tries to receive an event without blocking.
///
/// Returns error if no events are available for reception
/// or if receiver mutex is locked by a concurrent call to [`recv`]
/// or `try_recv`.
///
/// [`recv`]: Self::recv
pub fn try_recv(&self) -> Result<Event> {
// Using `try_lock` instead of `lock`
// to avoid blocking
// in case there is a concurrent call to `recv`.
let mut lock = self.0.try_lock()?;
let event = lock.try_recv()?;
Ok(event)
}
}

Expand Down
Loading

0 comments on commit 34f4ec0

Please sign in to comment.