From 8b220032e810fe94ff0cfea0314ee2173e265fe5 Mon Sep 17 00:00:00 2001 From: alanpoon Date: Thu, 26 Dec 2024 20:45:47 +0800 Subject: [PATCH] removed 5 sec timer and use latest_user_read_receipt_timeline_event_id --- src/home/room_screen.rs | 42 ++++--------------- src/sliding_sync.rs | 93 ++++++++++++++++++++++------------------- 2 files changed, 57 insertions(+), 78 deletions(-) diff --git a/src/home/room_screen.rs b/src/home/room_screen.rs index a6cd2d7..112d7a9 100644 --- a/src/home/room_screen.rs +++ b/src/home/room_screen.rs @@ -993,13 +993,6 @@ pub struct RoomScreen { #[rust] room_name: String, /// The persistent UI-relevant states for the room that this widget is currently displaying. #[rust] tl_state: Option, - /// Timer used to send fully-read receipts for the current room. - /// - /// We start a short (5 second) timer when the user scrolls down, - /// if `scrolled_past_read_marker` is true. Then, we send a fully-read receipt - /// for the last displayed event in this room. - #[rust] fully_read_timer: Timer, - } impl Drop for RoomScreen { fn drop(&mut self) { @@ -1252,22 +1245,6 @@ impl Widget for RoomScreen { }); } } - // Send fully read receipt for last displayed event 5 Seconds after user scrolled past the read marker - if self.fully_read_timer.is_event(event).is_some() && !self.fully_read_timer.is_empty() { - if let (Some(ref mut tl_state), Some(ref room_id)) = (&mut self.tl_state, &self.room_id) { - if let Some((event_id, timestamp)) = tl_state - .last_display_event - .take() - { - submit_async_request(MatrixRequest::FullyReadReceipt { - room_id: room_id.clone(), - event_id, - timestamp, - }); - } - } - cx.stop_timer(self.fully_read_timer); - } if self.animator_handle_event(cx, event).must_redraw() { self.redraw(cx); @@ -1531,7 +1508,6 @@ impl RoomScreen { portal_list.set_first_id_and_scroll(new_item_idx, new_item_scroll); tl.prev_first_index = Some(new_item_idx); // Reset the fully read timer when we jump to a new event - cx.stop_timer(self.fully_read_timer); tl.scrolled_past_read_marker = false; } } @@ -1960,7 +1936,7 @@ impl RoomScreen { // Query for User's fully read event submit_async_request(MatrixRequest::GetFullyReadEvent { room_id: room_id.clone() }); - submit_async_request(MatrixRequest::SubscribeToUpdates { room_id: room_id.clone(), subscribe: true }); + submit_async_request(MatrixRequest::SubscribeToOwnUserReadReceiptsChanged { room_id: room_id.clone(), subscribe: true }); // Kick off a back pagination request for this room. This is "urgent", // because we want to show the user some messages as soon as possible // when they first open the room, and there might not be any messages yet. @@ -2094,7 +2070,7 @@ impl RoomScreen { /// Sends read receipts based on the current scroll position of the timeline. fn send_user_read_receipts_based_on_scroll_pos( &mut self, - cx: &mut Cx, + _cx: &mut Cx, actions: &ActionsBuf, portal_list: &PortalListRef, ) { @@ -2109,8 +2085,6 @@ impl RoomScreen { if let Some(ref mut index) = tl_state.prev_first_index { // to detect change of scroll when scroll ends if *index != first_index { - // Any scrolling, resets the fully_read_timer - cx.stop_timer(self.fully_read_timer); if first_index >= *index { // Get event_id and timestamp for the last visible event let Some((last_event_id, last_timestamp)) = tl_state @@ -2128,15 +2102,13 @@ impl RoomScreen { }); // If scrolled_past_read_marker is true, set last_display_event to last_event_id and start the timer if tl_state.scrolled_past_read_marker { - self.fully_read_timer = cx.start_timeout(5.0); tl_state.last_display_event = Some((last_event_id.to_owned(), last_timestamp)); } else { // Get event_id and timestamp for the first visible event // If scrolled_past_read_marker is false, check if the saved fully read event's timestamp in between the first and last visible event // If true, set scrolled_past_read_marker to true // If true, set last_event_id to last_display_event - // If true, start the 5 seconds timer - let Some((_fully_read_event, fully_read_timestamp)) = + let Some((_, fully_read_timestamp)) = get_fully_read_event(room_id) else { *index = first_index; @@ -2155,9 +2127,11 @@ impl RoomScreen { && fully_read_timestamp <= last_timestamp { tl_state.scrolled_past_read_marker = true; - tl_state.last_display_event = - Some((last_event_id.to_owned(), last_timestamp)); - self.fully_read_timer = cx.start_timeout(5.0); + submit_async_request(MatrixRequest::FullyReadReceipt { + room_id: room_id.clone(), + event_id: last_event_id.to_owned(), + timestamp: last_timestamp, + }); } } } diff --git a/src/sliding_sync.rs b/src/sliding_sync.rs index 1718146..bd181d7 100644 --- a/src/sliding_sync.rs +++ b/src/sliding_sync.rs @@ -13,12 +13,12 @@ use matrix_sdk::{ message::{ForwardThread, RoomMessageEventContent}, MediaSource }, FullStateEventContent }, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomAliasId, OwnedRoomId, OwnedUserId, UserId - }, sliding_sync::VersionBuilder, sync::RoomUpdate, Client, Error, Room + }, sliding_sync::VersionBuilder, Client, Error, Room }; use matrix_sdk_ui::{ room_list_service::{self, RoomListLoadingState}, sync_service::{self, SyncService}, - timeline::{AnyOtherFullStateEventContent, EventTimelineItem, RepliedToInfo, RoomExt, TimelineDetails, TimelineItem, TimelineItemContent}, + timeline::{AnyOtherFullStateEventContent, EventTimelineItem, RepliedToInfo, TimelineDetails, TimelineItem, TimelineItemContent}, Timeline, }; use robius_open::Uri; @@ -38,7 +38,6 @@ use crate::{ }, utils::MEDIA_THUMBNAIL_FORMAT, verification::add_verification_event_handlers_and_sync_client }; - #[derive(Parser, Debug, Default)] struct Cli { /// The user name that should be used for the login. @@ -357,12 +356,12 @@ pub enum MatrixRequest { /// Whether to subscribe or unsubscribe from typing notices for this room. subscribe: bool, }, - /// Subscribe to Updates for the given room. + /// Subscribe to Updates own user read receipts changed for the given room. /// /// This request does not return a response or notify the UI thread. - SubscribeToUpdates { + SubscribeToOwnUserReadReceiptsChanged { room_id: OwnedRoomId, - /// Whether to subscribe or unsubscribe to Updates for this room. + /// Whether to subscribe or unsubscribe to own user read receipts changed for this room. subscribe: bool, }, /// Sends a read receipt for the given event in the given room. @@ -417,7 +416,7 @@ async fn async_worker( login_sender: Sender, ) -> Result<()> { log!("Started async_worker task."); - + let subscribe_to_owned_user_read_receipt_changed: std::sync::Arc>> = Arc::new(tokio::sync::Mutex::new(BTreeMap::new())); while let Some(request) = request_receiver.recv().await { match request { MatrixRequest::Login(login_request) => { @@ -753,51 +752,60 @@ async fn async_worker( // log!("Note: typing notifications recv loop has ended for room {}", room_id); }); } - MatrixRequest::SubscribeToUpdates { room_id, subscribe } => { - let mut update_receiver = { + MatrixRequest::SubscribeToOwnUserReadReceiptsChanged { room_id, subscribe } => { + let Some(room) = CLIENT.get().and_then(|c| c.get_room(&room_id)) else { + error!("BUG: client/room not found when subscribing to typing notices request, room: {room_id}"); + continue; + }; + + let timeline = { let mut all_room_info = ALL_ROOM_INFO.lock().unwrap(); let Some(room_info) = all_room_info.get_mut(&room_id) else { - log!("BUG: room info not found for subscribe to updates, room {room_id}"); - continue; - }; - let recv = if subscribe { - if room_info.update_subscriber_initialised { - warning!("Note: room {room_id} is already subscribed to updates."); - continue - } - let Some(room) = CLIENT.get().and_then(|c| c.get_room(&room_id)) else { - error!("BUG: client/room not found when subscribing to updates, room: {room_id}"); - continue; - }; - room_info.update_subscriber_initialised = true; - room.subscribe_to_updates() - } else { + log!("BUG: room info not found for send message request {room_id}"); continue; }; - recv + room_info.timeline.clone() }; + let subscribe_to_owned_user_read_receipt_changed = subscribe_to_owned_user_read_receipt_changed.clone(); let _to_updates_task = Handle::current().spawn(async move { - while let Ok(room_update) = update_receiver.recv().await { - if let RoomUpdate::Joined { room, updates: _ } = room_update { - if let Ok(timeline) = room.timeline().await { - if let Some(client_user_id) = current_user_id() { - let updated_event_id = timeline.latest_user_read_receipt(&client_user_id).await; - let fully_read_event = get_fully_read_event(&room.room_id().to_owned()); - if let (Some((updated_event_id, _receipt)), Some((fully_read_event_id, _time))) = (updated_event_id, fully_read_event) { - if updated_event_id != fully_read_event_id { - if let Err(e) = get_event_timestamp(&room, &room_id, fully_read_event_id).await - .map(|(read_event, timestamp)| { - set_fully_read_event(&room_id, read_event, timestamp) - }) { - error!("Error: couldn't set fully read event for room {room_id}: {e:?}"); - } + let update_receiver = timeline.subscribe_own_user_read_receipts_changed().await; + let read_receipt_change_mutex = subscribe_to_owned_user_read_receipt_changed.clone(); + let mut read_receipt_change_mutex_guard = read_receipt_change_mutex.lock().await; + if let Some(subscription) = read_receipt_change_mutex_guard.get(&room_id) { + if *subscription && subscribe { + return + } + } else if subscribe { + read_receipt_change_mutex_guard.insert(room_id.clone(), true); + } + pin_mut!(update_receiver); + if let Some(client_user_id) = current_user_id() { + while (update_receiver.next().await).is_some() { + let read_receipt_change = subscribe_to_owned_user_read_receipt_changed.clone(); + let read_receipt_change = read_receipt_change.lock().await; + let Some(subscribed_to_user_read_receipt) = read_receipt_change.get(&room_id) else { continue; }; + if !subscribed_to_user_read_receipt { + break; + } + + let updated_event_id = timeline.latest_user_read_receipt_timeline_event_id(&client_user_id).await; + let fully_read_event = get_fully_read_event(&room_id); + if let (Some(updated_event_id), Some((event_id, fully_read_event_timestamp))) = (updated_event_id, fully_read_event) { + if updated_event_id != event_id { + if let Err(e) = get_event_timestamp(&room, &room_id, updated_event_id).await + .map(|(read_event, timestamp)| { + if timestamp > fully_read_event_timestamp { + set_fully_read_event(&room_id, read_event, timestamp) + } else { + bail!("updated event's timestamp is older than current fully read event"); } + }) + { + error!("Error: couldn't set fully read event for room {room_id}: {e:?}"); } } - } - } } }); @@ -1039,8 +1047,6 @@ struct RoomInfo { timeline_subscriber_handler_task: JoinHandle<()>, /// A drop guard for the event handler that represents a subscription to typing notices for this room. typing_notice_subscriber: Option, - /// A boolean indicating if the update subsciber has been initialised - update_subscriber_initialised: bool, /// The ID of the old tombstoned room that this room has replaced, if any. replaces_tombstoned_room: Option, /// Event_id and timestamp for read marker @@ -1624,7 +1630,6 @@ async fn add_new_room(room: &room_list_service::Room) -> Result<()> { timeline_update_sender, timeline_subscriber_handler_task, typing_notice_subscriber: None, - update_subscriber_initialised: false, replaces_tombstoned_room: tombstoned_room_replaced_by_this_room, fully_read_event: None },