Skip to content

Commit

Permalink
Make try_recv non-blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Apr 21, 2024
1 parent d7a16c4 commit 1357019
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 38 deletions.
4 changes: 2 additions & 2 deletions src/contact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2730,7 +2730,7 @@ Hi."#;
let sent_msg = alice.send_text(chat.id, "moin").await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
assert!(!contact.was_seen_recently());
bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
bob.recv_msg(&sent_msg).await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
assert!(contact.was_seen_recently());
Expand All @@ -2742,7 +2742,7 @@ Hi."#;
.await;

// Wait for `was_seen_recently()` to turn off.
bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
SystemTime::shift(Duration::from_secs(SEEN_RECENTLY_SECONDS as u64 * 2));
recently_seen_loop.interrupt(ContactId::UNDEFINED, 0).await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
Expand Down
18 changes: 14 additions & 4 deletions src/events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! # Events specification.
use anyhow::Result;
use tokio::sync::Mutex;

pub(crate) mod chatlist_events;
Expand Down Expand Up @@ -66,17 +67,26 @@ pub struct EventEmitter(Mutex<async_broadcast::Receiver<Event>>);

impl EventEmitter {
/// Async recv of an event. Return `None` if the `Sender` has been dropped.
///
/// [`try_recv`]: Self::try_recv
pub async fn recv(&self) -> Option<Event> {
let mut lock = self.0.lock().await;
lock.recv().await.ok()
}

/// Tries to receive an event without blocking.
///
/// Returns `None` if no events are available for reception.
pub async fn try_recv(&self) -> Option<Event> {
let mut lock = self.0.lock().await;
lock.try_recv().ok()
/// Returns error if no events are available for reception
/// or if receiver is blocked by a concurrent call to [`recv`].
///
/// [`recv`]: Self::recv
pub fn try_recv(&self) -> Result<Event> {
// Using `try_lock` instead of `lock`
// to avoid blocking
// in case there is a concurrent call to `recv`.
let mut lock = self.0.try_lock()?;
let event = lock.try_recv()?;
Ok(event)
}
}

Expand Down
56 changes: 28 additions & 28 deletions src/events/chatlist_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ mod test_chatlist_events {
.await?;
set_muted(&bob, bob_chat.id, MuteDuration::Forever).await?;

bob.evtracker.clear_events().await;
bob.evtracker.clear_events();

let sent_msg = alice.send_text(chat.id, "moin2").await;
bob.recv_msg(&sent_msg).await;
Expand Down Expand Up @@ -216,7 +216,7 @@ mod test_chatlist_events {
let sent_msg = alice.send_text(chat.id, "moin2").await;
bob.recv_msg(&sent_msg).await;

bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
chat::marknoticed_chat(&bob, DC_CHAT_ID_ARCHIVED_LINK).await?;
wait_for_chatlist_specific_item(&bob, DC_CHAT_ID_ARCHIVED_LINK).await;

Expand All @@ -233,7 +233,7 @@ mod test_chatlist_events {
let sent_msg = alice.send_text(alice_to_bob_chat.id, "hello").await;
bob.recv_msg(&sent_msg).await;

bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
// set alice name then receive messagefrom her with bob
alice.set_config(Config::Displayname, Some("Alice")).await?;
let sent_msg = alice
Expand All @@ -245,7 +245,7 @@ mod test_chatlist_events {

wait_for_chatlist_all_items(&bob).await;

bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
// set name
let addr = alice_on_bob.get_addr();
Contact::create(&bob, "Alice2", addr).await?;
Expand All @@ -266,7 +266,7 @@ mod test_chatlist_events {
let sent_msg = alice.send_text(alice_to_bob_chat.id, "hello").await;
bob.recv_msg(&sent_msg).await;

bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
// set alice avatar then receive messagefrom her with bob
let file = alice.dir.path().join("avatar.png");
let bytes = include_bytes!("../../test-data/image/avatar64x64.png");
Expand All @@ -292,7 +292,7 @@ mod test_chatlist_events {
let alice = tcm.alice().await;
let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?;

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
chat.delete(&alice).await?;
wait_for_chatlist(&alice).await;
Ok(())
Expand All @@ -303,7 +303,7 @@ mod test_chatlist_events {
async fn test_create_group_chat() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice = tcm.alice().await;
alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?;
wait_for_chatlist_and_specific_item(&alice, chat).await;
Ok(())
Expand All @@ -314,7 +314,7 @@ mod test_chatlist_events {
async fn test_create_broadcastlist() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice = tcm.alice().await;
alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
create_broadcast_list(&alice).await?;
wait_for_chatlist(&alice).await;
Ok(())
Expand All @@ -327,11 +327,11 @@ mod test_chatlist_events {
let alice = tcm.alice().await;
let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?;

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
chat::set_muted(&alice, chat, MuteDuration::Forever).await?;
wait_for_chatlist_specific_item(&alice, chat).await;

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
chat::set_muted(&alice, chat, MuteDuration::NotMuted).await?;
wait_for_chatlist_specific_item(&alice, chat).await;

Expand All @@ -352,7 +352,7 @@ mod test_chatlist_events {
.unwrap(),
);
chat::set_muted(&alice, chat, mute_duration).await?;
alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
SystemTime::shift(Duration::from_secs(3));
wait_for_chatlist_specific_item(&alice, chat).await;

Expand All @@ -366,7 +366,7 @@ mod test_chatlist_events {
let alice = tcm.alice().await;
let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?;

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
chat::set_chat_name(&alice, chat, "New Name").await?;
wait_for_chatlist_specific_item(&alice, chat).await;

Expand All @@ -380,7 +380,7 @@ mod test_chatlist_events {
let alice = tcm.alice().await;
let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?;

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
let file = alice.dir.path().join("avatar.png");
let bytes = include_bytes!("../../test-data/image/avatar64x64.png");
tokio::fs::write(&file, bytes).await?;
Expand All @@ -405,7 +405,7 @@ mod test_chatlist_events {
wait_for_chatlist_specific_item(&bob, chat_id_for_bob).await;
chat_id_for_bob.accept(&bob).await?;

bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
chat::set_chat_name(&alice, chat, "New Name").await?;
let sent_msg = alice.send_text(chat, "Hello").await;
bob.recv_msg(&sent_msg).await;
Expand All @@ -426,7 +426,7 @@ mod test_chatlist_events {
let sent_msg = alice.send_text(chat, "Hello").await;
let chat_id_for_bob = bob.recv_msg(&sent_msg).await.chat_id;

bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
chat_id_for_bob.accept(&bob).await?;
wait_for_chatlist_specific_item(&bob, chat_id_for_bob).await;

Expand All @@ -445,7 +445,7 @@ mod test_chatlist_events {
let sent_msg = alice.send_text(chat, "Hello").await;
let chat_id_for_bob = bob.recv_msg(&sent_msg).await.chat_id;

bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
chat_id_for_bob.block(&bob).await?;
wait_for_chatlist(&bob).await;

Expand All @@ -460,7 +460,7 @@ mod test_chatlist_events {
let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?;
let message = chat::send_text_msg(&alice, chat, "Hello World".to_owned()).await?;

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
message::delete_msgs(&alice, &[message]).await?;
wait_for_chatlist_specific_item(&alice, chat).await;

Expand All @@ -485,7 +485,7 @@ mod test_chatlist_events {
let chat_id_for_bob = bob.recv_msg(&sent_msg).await.chat_id;
assert!(chat_id_for_bob.get_fresh_msg_cnt(&bob).await? >= 1);

bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
chat::marknoticed_chat(&bob, chat_id_for_bob).await?;
wait_for_chatlist_specific_item(&bob, chat_id_for_bob).await;

Expand All @@ -500,11 +500,11 @@ mod test_chatlist_events {
let contact_id = Contact::create(&alice, "example", "[email protected]").await?;
let _ = ChatId::create_for_contact(&alice, contact_id).await;

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
Contact::block(&alice, contact_id).await?;
wait_for_chatlist(&alice).await;

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
Contact::unblock(&alice, contact_id).await?;
wait_for_chatlist(&alice).await;

Expand Down Expand Up @@ -547,7 +547,7 @@ Content-Type: text/plain; charset=utf-8; format=flowed; delsp=no
First thread."#;

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
receive_imf(&alice, mime, false).await?;
wait_for_chatlist(&alice).await;

Expand All @@ -568,34 +568,34 @@ First thread."#;
let qr = get_securejoin_qr(&alice.ctx, Some(alice_chatid)).await?;

// Step 2: Bob scans QR-code, sends vg-request
bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
let bob_chatid = join_securejoin(&bob.ctx, &qr).await?;
wait_for_chatlist(&bob).await;

let sent = bob.pop_sent_msg().await;

// Step 3: Alice receives vg-request, sends vg-auth-required
alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
alice.recv_msg_trash(&sent).await;

let sent = alice.pop_sent_msg().await;

// Step 4: Bob receives vg-auth-required, sends vg-request-with-auth
bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
bob.recv_msg_trash(&sent).await;
wait_for_chatlist_and_specific_item(&bob, bob_chatid).await;

let sent = bob.pop_sent_msg().await;

// Step 5+6: Alice receives vg-request-with-auth, sends vg-member-added
alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
alice.recv_msg_trash(&sent).await;
wait_for_chatlist_and_specific_item(&alice, alice_chatid).await;

let sent = alice.pop_sent_msg().await;

// Step 7: Bob receives vg-member-added
bob.evtracker.clear_events().await;
bob.evtracker.clear_events();
bob.recv_msg(&sent).await;
wait_for_chatlist_and_specific_item(&bob, bob_chatid).await;

Expand All @@ -617,7 +617,7 @@ First thread."#;
let message = Message::load_from_db(&alice, msg_id).await?;
assert_eq!(message.get_state(), MessageState::OutDelivered);

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
chat::resend_msgs(&alice, &[msg_id]).await?;
wait_for_chatlist_specific_item(&alice, chat).await;

Expand All @@ -633,7 +633,7 @@ First thread."#;
let msg_id = chat::send_text_msg(&alice, chat, "Hello".to_owned()).await?;
let _ = alice.pop_sent_msg().await;

alice.evtracker.clear_events().await;
alice.evtracker.clear_events();
reaction::send_reaction(&alice, msg_id, "👍").await?;
let _ = alice.pop_sent_msg().await;
wait_for_chatlist_specific_item(&alice, chat).await;
Expand Down
2 changes: 1 addition & 1 deletion src/receive_imf/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ async fn test_block_mailing_list() {
receive_imf(&t.ctx, DC_MAILINGLIST2, false).await.unwrap();

// Check that no notification is displayed for blocked mailing list message.
while let Some(event) = t.evtracker.try_recv().await {
while let Ok(event) = t.evtracker.try_recv() {
assert!(!matches!(event.typ, EventType::IncomingMsg { .. }));
}

Expand Down
2 changes: 1 addition & 1 deletion src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ mod tests {
let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
assert_eq!(avatar_bytes, &tokio::fs::read(&a).await.unwrap()[..]);

while let Some(event) = event_source.try_recv().await {
while let Ok(event) = event_source.try_recv() {
match event.typ {
EventType::Info(s) => assert!(
!s.contains("Keeping new unreferenced file"),
Expand Down
4 changes: 2 additions & 2 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,8 +1041,8 @@ impl EventTracker {
}

/// Clears event queue.
pub async fn clear_events(&self) {
while let Some(_ev) = self.try_recv().await {}
pub fn clear_events(&self) {
while let Ok(_ev) = self.try_recv() {}
}
}

Expand Down

0 comments on commit 1357019

Please sign in to comment.