From 469ff799addcd76fab4b1080c76a03b715cef89d Mon Sep 17 00:00:00 2001 From: link2xt Date: Tue, 21 May 2024 19:14:41 +0000 Subject: [PATCH] api: add event channel overflow event --- deltachat-ffi/deltachat.h | 8 +++++ deltachat-ffi/src/lib.rs | 10 ++++-- deltachat-jsonrpc/src/api/types/events.rs | 4 +++ node/constants.js | 1 + node/events.js | 3 +- node/lib/constants.ts | 2 ++ src/events.rs | 38 +++++++++++------------ src/events/payload.rs | 6 ++++ 8 files changed, 48 insertions(+), 24 deletions(-) diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 9a31f2ff90..8d12301618 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -6324,6 +6324,14 @@ void dc_event_unref(dc_event_t* event); #define DC_EVENT_CHATLIST_ITEM_CHANGED 2301 + +/** + * Inform that some events have been skipped due to event channel overflow. + * + * @param data1 (int) number of events that have been skipped + */ +#define DC_EVENT_CHANNEL_OVERFLOW 2400 + /** * @} */ diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 61eba53fff..f3e71e2f91 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -566,6 +566,7 @@ pub unsafe extern "C" fn dc_event_get_id(event: *mut dc_event_t) -> libc::c_int EventType::AccountsBackgroundFetchDone => 2200, EventType::ChatlistChanged => 2300, EventType::ChatlistItemChanged { .. } => 2301, + EventType::EventChannelOverflow { .. } => 2400, } } @@ -624,6 +625,7 @@ pub unsafe extern "C" fn dc_event_get_data1_int(event: *mut dc_event_t) -> libc: EventType::ChatlistItemChanged { chat_id } => { chat_id.unwrap_or_default().to_u32() as libc::c_int } + EventType::EventChannelOverflow { n } => *n as libc::c_int, } } @@ -662,8 +664,9 @@ pub unsafe extern "C" fn dc_event_get_data2_int(event: *mut dc_event_t) -> libc: | EventType::AccountsBackgroundFetchDone | EventType::ChatlistChanged | EventType::ChatlistItemChanged { .. } - | EventType::ConfigSynced { .. } => 0, - EventType::ChatModified(_) => 0, + | EventType::ConfigSynced { .. } + | EventType::ChatModified(_) + | EventType::EventChannelOverflow { .. } => 0, EventType::MsgsChanged { msg_id, .. } | EventType::ReactionsChanged { msg_id, .. } | EventType::IncomingMsg { msg_id, .. } @@ -729,7 +732,8 @@ pub unsafe extern "C" fn dc_event_get_data2_str(event: *mut dc_event_t) -> *mut | EventType::ChatEphemeralTimerModified { .. } | EventType::IncomingMsgBunch { .. } | EventType::ChatlistItemChanged { .. } - | EventType::ChatlistChanged => ptr::null_mut(), + | EventType::ChatlistChanged + | EventType::EventChannelOverflow { .. } => ptr::null_mut(), EventType::ConfigureProgress { comment, .. } => { if let Some(comment) = comment { comment.to_c_string().unwrap_or_default().into_raw() diff --git a/deltachat-jsonrpc/src/api/types/events.rs b/deltachat-jsonrpc/src/api/types/events.rs index 59b89cfbd9..f72a183e0f 100644 --- a/deltachat-jsonrpc/src/api/types/events.rs +++ b/deltachat-jsonrpc/src/api/types/events.rs @@ -263,6 +263,9 @@ pub enum EventType { /// If `chat_id` is set to None, then all currently visible chats need to be rerendered, and all not-visible items need to be cleared from cache if the UI has a cache. #[serde(rename_all = "camelCase")] ChatlistItemChanged { chat_id: Option }, + + /// Inform than some events have been skipped due to event channel overflow. + EventChannelOverflow { n: u64 }, } impl From for EventType { @@ -378,6 +381,7 @@ impl From for EventType { chat_id: chat_id.map(|id| id.to_u32()), }, CoreEventType::ChatlistChanged => ChatlistChanged, + CoreEventType::EventChannelOverflow { n } => EventChannelOverflow { n }, } } } diff --git a/node/constants.js b/node/constants.js index f753d2a388..afdd5c99e7 100644 --- a/node/constants.js +++ b/node/constants.js @@ -30,6 +30,7 @@ module.exports = { DC_DOWNLOAD_IN_PROGRESS: 1000, DC_DOWNLOAD_UNDECIPHERABLE: 30, DC_EVENT_ACCOUNTS_BACKGROUND_FETCH_DONE: 2200, + DC_EVENT_CHANNEL_OVERFLOW: 2400, DC_EVENT_CHATLIST_CHANGED: 2300, DC_EVENT_CHATLIST_ITEM_CHANGED: 2301, DC_EVENT_CHAT_EPHEMERAL_TIMER_MODIFIED: 2021, diff --git a/node/events.js b/node/events.js index c87f31490d..611628b051 100644 --- a/node/events.js +++ b/node/events.js @@ -40,5 +40,6 @@ module.exports = { 2150: 'DC_EVENT_WEBXDC_REALTIME_DATA', 2200: 'DC_EVENT_ACCOUNTS_BACKGROUND_FETCH_DONE', 2300: 'DC_EVENT_CHATLIST_CHANGED', - 2301: 'DC_EVENT_CHATLIST_ITEM_CHANGED' + 2301: 'DC_EVENT_CHATLIST_ITEM_CHANGED', + 2400: 'DC_EVENT_CHANNEL_OVERFLOW' } diff --git a/node/lib/constants.ts b/node/lib/constants.ts index e86f41d742..4689757b13 100644 --- a/node/lib/constants.ts +++ b/node/lib/constants.ts @@ -30,6 +30,7 @@ export enum C { DC_DOWNLOAD_IN_PROGRESS = 1000, DC_DOWNLOAD_UNDECIPHERABLE = 30, DC_EVENT_ACCOUNTS_BACKGROUND_FETCH_DONE = 2200, + DC_EVENT_CHANNEL_OVERFLOW = 2400, DC_EVENT_CHATLIST_CHANGED = 2300, DC_EVENT_CHATLIST_ITEM_CHANGED = 2301, DC_EVENT_CHAT_EPHEMERAL_TIMER_MODIFIED = 2021, @@ -343,4 +344,5 @@ export const EventId2EventName: { [key: number]: string } = { 2200: 'DC_EVENT_ACCOUNTS_BACKGROUND_FETCH_DONE', 2300: 'DC_EVENT_CHATLIST_CHANGED', 2301: 'DC_EVENT_CHATLIST_ITEM_CHANGED', + 2400: 'DC_EVENT_CHANNEL_OVERFLOW', } diff --git a/src/events.rs b/src/events.rs index 296785a23f..d95a16d21e 100644 --- a/src/events.rs +++ b/src/events.rs @@ -71,16 +71,13 @@ impl EventEmitter { /// [`try_recv`]: Self::try_recv pub async fn recv(&self) -> Option { let mut lock = self.0.lock().await; - loop { - match lock.recv().await { - Err(async_broadcast::RecvError::Overflowed(_)) => { - // Some events have been lost, - // but the channel is not closed. - continue; - } - Err(async_broadcast::RecvError::Closed) => return None, - Ok(event) => return Some(event), - } + match lock.recv().await { + Err(async_broadcast::RecvError::Overflowed(n)) => Some(Event { + id: 0, + typ: EventType::EventChannelOverflow { n }, + }), + Err(async_broadcast::RecvError::Closed) => None, + Ok(event) => Some(event), } } @@ -96,17 +93,18 @@ impl EventEmitter { // to avoid blocking // in case there is a concurrent call to `recv`. let mut lock = self.0.try_lock()?; - loop { - match lock.try_recv() { - Err(async_broadcast::TryRecvError::Overflowed(_)) => { - // Some events have been lost, - // but the channel is not closed. - continue; - } - res @ (Err(async_broadcast::TryRecvError::Empty) - | Err(async_broadcast::TryRecvError::Closed) - | Ok(_)) => return Ok(res?), + match lock.try_recv() { + Err(async_broadcast::TryRecvError::Overflowed(n)) => { + // Some events have been lost, + // but the channel is not closed. + Ok(Event { + id: 0, + typ: EventType::EventChannelOverflow { n }, + }) } + res @ (Err(async_broadcast::TryRecvError::Empty) + | Err(async_broadcast::TryRecvError::Closed) + | Ok(_)) => Ok(res?), } } } diff --git a/src/events/payload.rs b/src/events/payload.rs index cd61942a32..d8d842edfc 100644 --- a/src/events/payload.rs +++ b/src/events/payload.rs @@ -315,4 +315,10 @@ pub enum EventType { /// Event for using in tests, e.g. as a fence between normally generated events. #[cfg(test)] Test, + + /// Inform than some events have been skipped due to event channel overflow. + EventChannelOverflow { + /// Number of events skipped. + n: u64, + }, }