From 6d0725b616a450073655ad4bfc106d71a475fbdd Mon Sep 17 00:00:00 2001 From: link2xt Date: Fri, 19 Apr 2024 02:19:21 +0000 Subject: [PATCH] Replace event channel with broadcast channel --- Cargo.lock | 13 ++++ Cargo.toml | 1 + deltachat-ffi/deltachat.h | 8 +- deltachat-jsonrpc/src/api.rs | 14 +++- deltachat-rpc-server/src/main.rs | 2 +- src/contact.rs | 4 +- src/events.rs | 42 +++++----- src/receive_imf/tests.rs | 4 +- src/sql.rs | 12 +-- src/test_utils.rs | 130 ++++++++++++++++--------------- 10 files changed, 129 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a81a842708..ec870a82e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -227,6 +227,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-broadcast" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258b52a1aa741b9f09783b2d86cf0aeeb617bbf847f6933340a39644227acbdb" +dependencies = [ + "event-listener 5.2.0", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -1149,6 +1161,7 @@ version = "1.137.3" dependencies = [ "ansi_term", "anyhow", + "async-broadcast", "async-channel 2.2.0", "async-imap", "async-native-tls", diff --git a/Cargo.toml b/Cargo.toml index 58f6cd8bb5..3f95fb942c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ format-flowed = { path = "./format-flowed" } ratelimit = { path = "./deltachat-ratelimit" } anyhow = { workspace = true } +async-broadcast = "0.7.0" async-channel = "2.0.0" async-imap = { version = "0.9.7", default-features = false, features = ["runtime-tokio"] } async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] } diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 9e1cd449cc..3a014f26a4 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -362,8 +362,12 @@ uint32_t dc_get_id (dc_context_t* context); * Must be freed using dc_event_emitter_unref() after usage. * * Note: Use only one event emitter per context. - * Having more than one event emitter running at the same time on the same context - * will result in events being randomly delivered to one of the emitters. + * The result of having multiple event emitters is unspecified. + * Currently events are broadcasted to all existing event emitters, + * but previous versions delivered events to only one event emitter + * and this behavior may change again in the future. + * Events emitted before creation of event emitter + * may or may not be available to event emitter. */ dc_event_emitter_t* dc_get_event_emitter(dc_context_t* context); diff --git a/deltachat-jsonrpc/src/api.rs b/deltachat-jsonrpc/src/api.rs index c0c4b783c8..43c65d7382 100644 --- a/deltachat-jsonrpc/src/api.rs +++ b/deltachat-jsonrpc/src/api.rs @@ -29,6 +29,7 @@ use deltachat::reaction::{get_msg_reactions, send_reaction}; use deltachat::securejoin; use deltachat::stock_str::StockMessage; use deltachat::webxdc::StatusUpdateSerial; +use deltachat::EventEmitter; use sanitize_filename::is_sanitized; use tokio::fs; use tokio::sync::{watch, Mutex, RwLock}; @@ -77,25 +78,31 @@ impl Default for AccountState { } } -#[derive(Clone, Debug)] +#[derive(Debug, Clone)] pub struct CommandApi { pub(crate) accounts: Arc>, + event_emitter: Arc, + states: Arc>>, } impl CommandApi { pub fn new(accounts: Accounts) -> Self { + let event_emitter = Arc::new(accounts.get_event_emitter()); CommandApi { accounts: Arc::new(RwLock::new(accounts)), + event_emitter, states: Arc::new(Mutex::new(BTreeMap::new())), } } #[allow(dead_code)] - pub fn from_arc(accounts: Arc>) -> Self { + pub async fn from_arc(accounts: Arc>) -> Self { + let event_emitter = Arc::new(accounts.read().await.get_event_emitter()); CommandApi { accounts, + event_emitter, states: Arc::new(Mutex::new(BTreeMap::new())), } } @@ -158,8 +165,7 @@ impl CommandApi { /// Get the next event. async fn get_next_event(&self) -> Result { - let event_emitter = self.accounts.read().await.get_event_emitter(); - event_emitter + self.event_emitter .recv() .await .map(|event| event.into()) diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index 4be58760b5..0fd981e5bd 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -68,7 +68,7 @@ async fn main_impl() -> Result<()> { log::info!("Creating JSON-RPC API."); let accounts = Arc::new(RwLock::new(accounts)); - let state = CommandApi::from_arc(accounts.clone()); + let state = CommandApi::from_arc(accounts.clone()).await; let (client, mut out_receiver) = RpcClient::new(); let session = RpcSession::new(client.clone(), state.clone()); diff --git a/src/contact.rs b/src/contact.rs index 0c12d3b71d..1cf3afd868 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -2730,7 +2730,7 @@ Hi."#; let sent_msg = alice.send_text(chat.id, "moin").await; let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?; assert!(!contact.was_seen_recently()); - while bob.evtracker.try_recv().is_ok() {} + bob.evtracker.clear_events().await; bob.recv_msg(&sent_msg).await; let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?; assert!(contact.was_seen_recently()); @@ -2742,7 +2742,7 @@ Hi."#; .await; // Wait for `was_seen_recently()` to turn off. - while bob.evtracker.try_recv().is_ok() {} + bob.evtracker.clear_events().await; SystemTime::shift(Duration::from_secs(SEEN_RECENTLY_SECONDS as u64 * 2)); recently_seen_loop.interrupt(ContactId::UNDEFINED, 0).await; let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?; diff --git a/src/events.rs b/src/events.rs index dcbee99d3b..fcbcc853f4 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,6 +1,6 @@ //! # Events specification. -use async_channel::{self as channel, Receiver, Sender, TrySendError}; +use tokio::sync::Mutex; pub(crate) mod chatlist_events; mod payload; @@ -10,8 +10,8 @@ pub use self::payload::EventType; /// Event channel. #[derive(Debug, Clone)] pub struct Events { - receiver: Receiver, - sender: Sender, + receiver: async_broadcast::Receiver, + sender: async_broadcast::Sender, } impl Default for Events { @@ -23,7 +23,10 @@ impl Default for Events { impl Events { /// Creates a new event channel. pub fn new() -> Self { - let (sender, receiver) = channel::bounded(1_000); + let (mut sender, receiver) = async_broadcast::broadcast(1_000); + + // Remove oldest event on overflow. + sender.set_overflow(true); Self { receiver, sender } } @@ -32,24 +35,12 @@ impl Events { /// /// If the channel is full, deletes the oldest event first. pub fn emit(&self, event: Event) { - match self.sender.try_send(event) { - Ok(()) => {} - Err(TrySendError::Full(event)) => { - // when we are full, we pop remove the oldest event and push on the new one - let _ = self.receiver.try_recv(); - - // try again - self.emit(event); - } - Err(TrySendError::Closed(_)) => { - unreachable!("unable to emit event, channel disconnected"); - } - } + self.sender.try_broadcast(event).ok(); } /// Creates an event emitter. pub fn get_emitter(&self) -> EventEmitter { - EventEmitter(self.receiver.clone()) + EventEmitter(Mutex::new(self.receiver.clone())) } } @@ -61,13 +52,22 @@ impl Events { /// /// [`Context`]: crate::context::Context /// [`Context::get_event_emitter`]: crate::context::Context::get_event_emitter -#[derive(Debug, Clone)] -pub struct EventEmitter(Receiver); +#[derive(Debug)] +pub struct EventEmitter(Mutex>); impl EventEmitter { /// Async recv of an event. Return `None` if the `Sender` has been dropped. pub async fn recv(&self) -> Option { - self.0.recv().await.ok() + let mut lock = self.0.lock().await; + lock.recv().await.ok() + } + + /// Tries to receive an event without blocking. + /// + /// Returns `None` if no events are available for reception. + pub async fn try_recv(&self) -> Option { + let mut lock = self.0.lock().await; + lock.try_recv().ok() } } diff --git a/src/receive_imf/tests.rs b/src/receive_imf/tests.rs index f58d3f76b6..146855fdcb 100644 --- a/src/receive_imf/tests.rs +++ b/src/receive_imf/tests.rs @@ -1081,8 +1081,8 @@ async fn test_block_mailing_list() { receive_imf(&t.ctx, DC_MAILINGLIST2, false).await.unwrap(); // Check that no notification is displayed for blocked mailing list message. - while let Ok(event) = t.evtracker.try_recv() { - assert!(!matches!(event.typ, EventType::IncomingMsg { .. })); + while let Ok(event) = tokio::time::timeout(Duration::from_secs(1), t.evtracker.recv()).await { + assert!(!matches!(event.unwrap().typ, EventType::IncomingMsg { .. })); } // Test that the mailing list stays disappeared diff --git a/src/sql.rs b/src/sql.rs index 1a43acf8e2..04417ca4a9 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -1007,10 +1007,9 @@ pub fn repeat_vars(count: usize) -> String { #[cfg(test)] mod tests { - use async_channel as channel; - use super::*; use crate::{test_utils::TestContext, EventType}; + use std::time::Duration; #[test] fn test_maybe_add_file() { @@ -1085,8 +1084,7 @@ mod tests { .await .unwrap(); - let (event_sink, event_source) = channel::unbounded(); - t.add_event_sender(event_sink).await; + let event_source = t.get_event_emitter(); let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap(); assert_eq!(avatar_bytes, &tokio::fs::read(&a).await.unwrap()[..]); @@ -1098,8 +1096,10 @@ mod tests { let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap(); assert_eq!(avatar_bytes, &tokio::fs::read(&a).await.unwrap()[..]); - while let Ok(event) = event_source.try_recv() { - match event.typ { + while let Ok(event) = + tokio::time::timeout(Duration::from_secs(1), event_source.recv()).await + { + match event.unwrap().typ { EventType::Info(s) => assert!( !s.contains("Keeping new unreferenced file"), "File {s} was almost deleted, only reason it was kept is that it was created recently (as the tests don't run for a long time)" diff --git a/src/test_utils.rs b/src/test_utils.rs index fe72cf34d3..00e3e47840 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -19,7 +19,6 @@ use pretty_assertions::assert_eq; use rand::Rng; use tempfile::{tempdir, TempDir}; use tokio::runtime::Handle; -use tokio::sync::RwLock; use tokio::{fs, task}; use crate::chat::{ @@ -34,7 +33,7 @@ use crate::constants::{Blocked, Chattype}; use crate::contact::{Contact, ContactId, Modifier, Origin}; use crate::context::Context; use crate::e2ee::EncryptHelper; -use crate::events::{Event, EventType, Events}; +use crate::events::{Event, EventEmitter, EventType, Events}; use crate::key::{self, DcKey, KeyPairUse}; use crate::message::{update_msg_state, Message, MessageState, MsgId, Viewtype}; use crate::mimeparser::{MimeMessage, SystemMessage}; @@ -57,20 +56,19 @@ static CONTEXT_NAMES: Lazy>> = /// occurred rather than grouped by context like would happen when you use separate /// [`TestContext`]s without managing your own [`LogSink`]. pub struct TestContextManager { - log_tx: Sender, - _log_sink: LogSink, + log_sink: LogSink, } impl TestContextManager { pub fn new() -> Self { - let (log_tx, _log_sink) = LogSink::create(); - Self { log_tx, _log_sink } + let log_sink = LogSink::new(); + Self { log_sink } } pub async fn alice(&mut self) -> TestContext { TestContext::builder() .configure_alice() - .with_log_sink(self.log_tx.clone()) + .with_log_sink(self.log_sink.clone()) .build() .await } @@ -78,7 +76,7 @@ impl TestContextManager { pub async fn bob(&mut self) -> TestContext { TestContext::builder() .configure_bob() - .with_log_sink(self.log_tx.clone()) + .with_log_sink(self.log_sink.clone()) .build() .await } @@ -86,7 +84,7 @@ impl TestContextManager { pub async fn fiona(&mut self) -> TestContext { TestContext::builder() .configure_fiona() - .with_log_sink(self.log_tx.clone()) + .with_log_sink(self.log_sink.clone()) .build() .await } @@ -94,7 +92,7 @@ impl TestContextManager { /// Creates a new unconfigured test account. pub async fn unconfigured(&mut self) -> TestContext { TestContext::builder() - .with_log_sink(self.log_tx.clone()) + .with_log_sink(self.log_sink.clone()) .build() .await } @@ -103,7 +101,8 @@ impl TestContextManager { /// /// ========== `msg` goes here ========== pub fn section(&self, msg: &str) { - self.log_tx + self.log_sink + .sender .try_send(LogEvent::Section(msg.to_string())) .expect( "The events channel should be unbounded and not closed, so try_send() shouldn't fail", @@ -194,7 +193,7 @@ impl TestContextManager { #[derive(Debug, Clone, Default)] pub struct TestContextBuilder { key_pair: Option, - log_sink: Option>, + log_sink: LogSink, } impl TestContextBuilder { @@ -234,8 +233,8 @@ impl TestContextBuilder { /// using a single [`LogSink`] for both contexts. This shows the log messages in /// sequence as they occurred rather than all messages from each context in a single /// block. - pub fn with_log_sink(mut self, sink: Sender) -> Self { - self.log_sink = Some(sink); + pub fn with_log_sink(mut self, sink: LogSink) -> Self { + self.log_sink = sink; self } @@ -243,7 +242,7 @@ impl TestContextBuilder { pub async fn build(self) -> TestContext { let name = self.key_pair.as_ref().map(|key| key.addr.local.clone()); - let test_context = TestContext::new_internal(name, self.log_sink).await; + let test_context = TestContext::new_internal(name, Some(self.log_sink.clone())).await; if let Some(key_pair) = self.key_pair { test_context @@ -266,8 +265,7 @@ pub struct TestContext { pub dir: TempDir, pub evtracker: EventTracker, - /// Channels which should receive events from this context. - event_senders: Arc>>>, + /// Reference to implicit [`LogSink`] so it is dropped together with the context. /// /// Only used if no explicit `log_sender` is passed into [`TestContext::new_internal`] @@ -337,7 +335,7 @@ impl TestContext { /// `log_sender` is assumed to be the sender for a [`LogSink`]. If not supplied a new /// [`LogSink`] will be created so that events are logged to this test when the /// [`TestContext`] is dropped. - async fn new_internal(name: Option, log_sender: Option>) -> Self { + async fn new_internal(name: Option, log_sink: Option) -> Self { let dir = tempdir().unwrap(); let dbfile = dir.path().join("db.sqlite"); let id = rand::thread_rng().gen(); @@ -349,30 +347,14 @@ impl TestContext { .await .expect("failed to create context"); - let events = ctx.get_event_emitter(); - - let (log_sender, log_sink) = match log_sender { - Some(sender) => (sender, None), - None => { - let (sender, sink) = LogSink::create(); - (sender, Some(sink)) - } + if let Some(log_sink) = &log_sink { + log_sink.subscribe(ctx.get_event_emitter()); + } else { + let log_sink = LogSink::new(); + log_sink.subscribe(ctx.get_event_emitter()); }; - let (evtracker_sender, evtracker_receiver) = channel::unbounded(); - let event_senders = Arc::new(RwLock::new(vec![evtracker_sender])); - let senders = Arc::clone(&event_senders); - - task::spawn(async move { - while let Some(event) = events.recv().await { - for sender in senders.read().await.iter() { - // Don't block because someone wanted to use a oneshot receiver, use - // an unbounded channel if you want all events. - sender.try_send(event.clone()).ok(); - } - log_sender.try_send(LogEvent::Event(event.clone())).ok(); - } - }); + let evtracker_receiver = ctx.get_event_emitter(); ctx.set_config(Config::SkipStartMessages, Some("1")) .await @@ -383,7 +365,6 @@ impl TestContext { ctx, dir, evtracker: EventTracker(evtracker_receiver), - event_senders, log_sink, } } @@ -407,14 +388,6 @@ impl TestContext { context_names.get(id).unwrap_or(&id.to_string()).to_string() } - /// Adds a new [`Event`]s sender. - /// - /// Once added, all events emitted by this context will be sent to this channel. This - /// is useful if you need to wait for events or make assertions on them. - pub async fn add_event_sender(&self, sink: Sender) { - self.event_senders.write().await.push(sink) - } - /// Configure as a given email address. /// /// The context will be configured but the key will not be pre-generated so if a key is @@ -851,20 +824,58 @@ pub enum LogEvent { /// /// To use this create an instance using [`LogSink::create`] and then use the /// [`TestContextBuilder::with_log_sink`]. +#[derive(Debug, Clone, Default)] +pub struct LogSink(Arc); + #[derive(Debug)] -pub struct LogSink { +pub struct InnerLogSink { events: Receiver, + + /// Sender side of the log receiver. + /// + /// It is not used directly, but cloned when + /// log sink is subscribed to new event emitter. + sender: Sender, } impl LogSink { /// Creates a new [`LogSink`] and returns the attached event sink. - pub fn create() -> (Sender, Self) { + pub fn new() -> Self { + Default::default() + } +} + +impl Default for InnerLogSink { + fn default() -> Self { let (tx, rx) = channel::unbounded(); - (tx, Self { events: rx }) + Self { + events: rx, + sender: tx, + } + } +} + +impl Deref for LogSink { + type Target = InnerLogSink; + + fn deref(&self) -> &Self::Target { + &self.0 } } -impl Drop for LogSink { +impl InnerLogSink { + /// Subscribes this log sink to event emitter. + pub fn subscribe(&self, event_emitter: EventEmitter) { + let sender = self.sender.clone(); + task::spawn(async move { + while let Some(event) = event_emitter.recv().await { + sender.try_send(LogEvent::Event(event.clone())).ok(); + } + }); + } +} + +impl Drop for InnerLogSink { fn drop(&mut self) { while let Ok(event) = self.events.try_recv() { print_logevent(&event); @@ -975,10 +986,10 @@ pub fn fiona_keypair() -> KeyPair { /// be attached to a single [`TestContext`] and therefore the context is already known as /// you will be accessing it as [`TestContext::evtracker`]. #[derive(Debug)] -pub struct EventTracker(Receiver); +pub struct EventTracker(EventEmitter); impl Deref for EventTracker { - type Target = Receiver; + type Target = EventEmitter; fn deref(&self) -> &Self::Target { &self.0 @@ -1025,15 +1036,8 @@ impl EventTracker { } /// Clears event queue. - /// - /// This spends 1 second instead of using `try_recv` - /// to avoid accidentally leaving an event that - /// was emitted right before calling `clear_events()`. - /// - /// Avoid using this function if you can - /// by waiting for specific events you expect to receive. pub async fn clear_events(&self) { - while let Ok(_ev) = tokio::time::timeout(Duration::from_secs(1), self.recv()).await {} + while let Some(_ev) = self.try_recv().await {} } }