Skip to content

Commit

Permalink
Replace event channel with broadcast channel
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Apr 20, 2024
1 parent 8f48313 commit 6d0725b
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 101 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ format-flowed = { path = "./format-flowed" }
ratelimit = { path = "./deltachat-ratelimit" }

anyhow = { workspace = true }
async-broadcast = "0.7.0"
async-channel = "2.0.0"
async-imap = { version = "0.9.7", default-features = false, features = ["runtime-tokio"] }
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
Expand Down
8 changes: 6 additions & 2 deletions deltachat-ffi/deltachat.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,12 @@ uint32_t dc_get_id (dc_context_t* context);
* Must be freed using dc_event_emitter_unref() after usage.
*
* Note: Use only one event emitter per context.
* Having more than one event emitter running at the same time on the same context
* will result in events being randomly delivered to one of the emitters.
* The result of having multiple event emitters is unspecified.
* Currently events are broadcasted to all existing event emitters,
* but previous versions delivered events to only one event emitter
* and this behavior may change again in the future.
* Events emitted before creation of event emitter
* may or may not be available to event emitter.
*/
dc_event_emitter_t* dc_get_event_emitter(dc_context_t* context);

Expand Down
14 changes: 10 additions & 4 deletions deltachat-jsonrpc/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use deltachat::reaction::{get_msg_reactions, send_reaction};
use deltachat::securejoin;
use deltachat::stock_str::StockMessage;
use deltachat::webxdc::StatusUpdateSerial;
use deltachat::EventEmitter;
use sanitize_filename::is_sanitized;
use tokio::fs;
use tokio::sync::{watch, Mutex, RwLock};
Expand Down Expand Up @@ -77,25 +78,31 @@ impl Default for AccountState {
}
}

#[derive(Clone, Debug)]
#[derive(Debug, Clone)]
pub struct CommandApi {
pub(crate) accounts: Arc<RwLock<Accounts>>,

event_emitter: Arc<EventEmitter>,

states: Arc<Mutex<BTreeMap<u32, AccountState>>>,
}

impl CommandApi {
pub fn new(accounts: Accounts) -> Self {
let event_emitter = Arc::new(accounts.get_event_emitter());
CommandApi {
accounts: Arc::new(RwLock::new(accounts)),
event_emitter,
states: Arc::new(Mutex::new(BTreeMap::new())),
}
}

#[allow(dead_code)]
pub fn from_arc(accounts: Arc<RwLock<Accounts>>) -> Self {
pub async fn from_arc(accounts: Arc<RwLock<Accounts>>) -> Self {
let event_emitter = Arc::new(accounts.read().await.get_event_emitter());
CommandApi {
accounts,
event_emitter,
states: Arc::new(Mutex::new(BTreeMap::new())),
}
}
Expand Down Expand Up @@ -158,8 +165,7 @@ impl CommandApi {

/// Get the next event.
async fn get_next_event(&self) -> Result<Event> {
let event_emitter = self.accounts.read().await.get_event_emitter();
event_emitter
self.event_emitter
.recv()
.await
.map(|event| event.into())
Expand Down
2 changes: 1 addition & 1 deletion deltachat-rpc-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn main_impl() -> Result<()> {

log::info!("Creating JSON-RPC API.");
let accounts = Arc::new(RwLock::new(accounts));
let state = CommandApi::from_arc(accounts.clone());
let state = CommandApi::from_arc(accounts.clone()).await;

let (client, mut out_receiver) = RpcClient::new();
let session = RpcSession::new(client.clone(), state.clone());
Expand Down
4 changes: 2 additions & 2 deletions src/contact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2730,7 +2730,7 @@ Hi."#;
let sent_msg = alice.send_text(chat.id, "moin").await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
assert!(!contact.was_seen_recently());
while bob.evtracker.try_recv().is_ok() {}
bob.evtracker.clear_events().await;
bob.recv_msg(&sent_msg).await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
assert!(contact.was_seen_recently());
Expand All @@ -2742,7 +2742,7 @@ Hi."#;
.await;

// Wait for `was_seen_recently()` to turn off.
while bob.evtracker.try_recv().is_ok() {}
bob.evtracker.clear_events().await;
SystemTime::shift(Duration::from_secs(SEEN_RECENTLY_SECONDS as u64 * 2));
recently_seen_loop.interrupt(ContactId::UNDEFINED, 0).await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
Expand Down
42 changes: 21 additions & 21 deletions src/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! # Events specification.
use async_channel::{self as channel, Receiver, Sender, TrySendError};
use tokio::sync::Mutex;

pub(crate) mod chatlist_events;
mod payload;
Expand All @@ -10,8 +10,8 @@ pub use self::payload::EventType;
/// Event channel.
#[derive(Debug, Clone)]
pub struct Events {
receiver: Receiver<Event>,
sender: Sender<Event>,
receiver: async_broadcast::Receiver<Event>,
sender: async_broadcast::Sender<Event>,
}

impl Default for Events {
Expand All @@ -23,7 +23,10 @@ impl Default for Events {
impl Events {
/// Creates a new event channel.
pub fn new() -> Self {
let (sender, receiver) = channel::bounded(1_000);
let (mut sender, receiver) = async_broadcast::broadcast(1_000);

// Remove oldest event on overflow.
sender.set_overflow(true);

Self { receiver, sender }
}
Expand All @@ -32,24 +35,12 @@ impl Events {
///
/// If the channel is full, deletes the oldest event first.
pub fn emit(&self, event: Event) {
match self.sender.try_send(event) {
Ok(()) => {}
Err(TrySendError::Full(event)) => {
// when we are full, we pop remove the oldest event and push on the new one
let _ = self.receiver.try_recv();

// try again
self.emit(event);
}
Err(TrySendError::Closed(_)) => {
unreachable!("unable to emit event, channel disconnected");
}
}
self.sender.try_broadcast(event).ok();
}

/// Creates an event emitter.
pub fn get_emitter(&self) -> EventEmitter {
EventEmitter(self.receiver.clone())
EventEmitter(Mutex::new(self.receiver.clone()))
}
}

Expand All @@ -61,13 +52,22 @@ impl Events {
///
/// [`Context`]: crate::context::Context
/// [`Context::get_event_emitter`]: crate::context::Context::get_event_emitter
#[derive(Debug, Clone)]
pub struct EventEmitter(Receiver<Event>);
#[derive(Debug)]
pub struct EventEmitter(Mutex<async_broadcast::Receiver<Event>>);

impl EventEmitter {
/// Async recv of an event. Return `None` if the `Sender` has been dropped.
pub async fn recv(&self) -> Option<Event> {
self.0.recv().await.ok()
let mut lock = self.0.lock().await;
lock.recv().await.ok()
}

/// Tries to receive an event without blocking.
///
/// Returns `None` if no events are available for reception.
pub async fn try_recv(&self) -> Option<Event> {
let mut lock = self.0.lock().await;
lock.try_recv().ok()
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/receive_imf/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1081,8 +1081,8 @@ async fn test_block_mailing_list() {
receive_imf(&t.ctx, DC_MAILINGLIST2, false).await.unwrap();

// Check that no notification is displayed for blocked mailing list message.
while let Ok(event) = t.evtracker.try_recv() {
assert!(!matches!(event.typ, EventType::IncomingMsg { .. }));
while let Ok(event) = tokio::time::timeout(Duration::from_secs(1), t.evtracker.recv()).await {
assert!(!matches!(event.unwrap().typ, EventType::IncomingMsg { .. }));
}

// Test that the mailing list stays disappeared
Expand Down
12 changes: 6 additions & 6 deletions src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1007,10 +1007,9 @@ pub fn repeat_vars(count: usize) -> String {

#[cfg(test)]
mod tests {
use async_channel as channel;

use super::*;
use crate::{test_utils::TestContext, EventType};
use std::time::Duration;

#[test]
fn test_maybe_add_file() {
Expand Down Expand Up @@ -1085,8 +1084,7 @@ mod tests {
.await
.unwrap();

let (event_sink, event_source) = channel::unbounded();
t.add_event_sender(event_sink).await;
let event_source = t.get_event_emitter();

let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
assert_eq!(avatar_bytes, &tokio::fs::read(&a).await.unwrap()[..]);
Expand All @@ -1098,8 +1096,10 @@ mod tests {
let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
assert_eq!(avatar_bytes, &tokio::fs::read(&a).await.unwrap()[..]);

while let Ok(event) = event_source.try_recv() {
match event.typ {
while let Ok(event) =
tokio::time::timeout(Duration::from_secs(1), event_source.recv()).await
{
match event.unwrap().typ {
EventType::Info(s) => assert!(
!s.contains("Keeping new unreferenced file"),
"File {s} was almost deleted, only reason it was kept is that it was created recently (as the tests don't run for a long time)"
Expand Down
Loading

0 comments on commit 6d0725b

Please sign in to comment.