Skip to content

Commit

Permalink
removed 5 sec timer and use latest_user_read_receipt_timeline_event_id
Browse files Browse the repository at this point in the history
  • Loading branch information
alanpoon committed Dec 26, 2024
1 parent 1643fc9 commit 8b22003
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 78 deletions.
42 changes: 8 additions & 34 deletions src/home/room_screen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,13 +993,6 @@ pub struct RoomScreen {
#[rust] room_name: String,
/// The persistent UI-relevant states for the room that this widget is currently displaying.
#[rust] tl_state: Option<TimelineUiState>,
/// Timer used to send fully-read receipts for the current room.
///
/// We start a short (5 second) timer when the user scrolls down,
/// if `scrolled_past_read_marker` is true. Then, we send a fully-read receipt
/// for the last displayed event in this room.
#[rust] fully_read_timer: Timer,

}
impl Drop for RoomScreen {
fn drop(&mut self) {
Expand Down Expand Up @@ -1252,22 +1245,6 @@ impl Widget for RoomScreen {
});
}
}
// Send fully read receipt for last displayed event 5 Seconds after user scrolled past the read marker
if self.fully_read_timer.is_event(event).is_some() && !self.fully_read_timer.is_empty() {
if let (Some(ref mut tl_state), Some(ref room_id)) = (&mut self.tl_state, &self.room_id) {
if let Some((event_id, timestamp)) = tl_state
.last_display_event
.take()
{
submit_async_request(MatrixRequest::FullyReadReceipt {
room_id: room_id.clone(),
event_id,
timestamp,
});
}
}
cx.stop_timer(self.fully_read_timer);
}

if self.animator_handle_event(cx, event).must_redraw() {
self.redraw(cx);
Expand Down Expand Up @@ -1531,7 +1508,6 @@ impl RoomScreen {
portal_list.set_first_id_and_scroll(new_item_idx, new_item_scroll);
tl.prev_first_index = Some(new_item_idx);
// Reset the fully read timer when we jump to a new event
cx.stop_timer(self.fully_read_timer);
tl.scrolled_past_read_marker = false;
}
}
Expand Down Expand Up @@ -1960,7 +1936,7 @@ impl RoomScreen {

// Query for User's fully read event
submit_async_request(MatrixRequest::GetFullyReadEvent { room_id: room_id.clone() });
submit_async_request(MatrixRequest::SubscribeToUpdates { room_id: room_id.clone(), subscribe: true });
submit_async_request(MatrixRequest::SubscribeToOwnUserReadReceiptsChanged { room_id: room_id.clone(), subscribe: true });
// Kick off a back pagination request for this room. This is "urgent",
// because we want to show the user some messages as soon as possible
// when they first open the room, and there might not be any messages yet.
Expand Down Expand Up @@ -2094,7 +2070,7 @@ impl RoomScreen {
/// Sends read receipts based on the current scroll position of the timeline.
fn send_user_read_receipts_based_on_scroll_pos(
&mut self,
cx: &mut Cx,
_cx: &mut Cx,
actions: &ActionsBuf,
portal_list: &PortalListRef,
) {
Expand All @@ -2109,8 +2085,6 @@ impl RoomScreen {
if let Some(ref mut index) = tl_state.prev_first_index {
// to detect change of scroll when scroll ends
if *index != first_index {
// Any scrolling, resets the fully_read_timer
cx.stop_timer(self.fully_read_timer);
if first_index >= *index {
// Get event_id and timestamp for the last visible event
let Some((last_event_id, last_timestamp)) = tl_state
Expand All @@ -2128,15 +2102,13 @@ impl RoomScreen {
});
// If scrolled_past_read_marker is true, set last_display_event to last_event_id and start the timer
if tl_state.scrolled_past_read_marker {
self.fully_read_timer = cx.start_timeout(5.0);
tl_state.last_display_event = Some((last_event_id.to_owned(), last_timestamp));
} else {
// Get event_id and timestamp for the first visible event
// If scrolled_past_read_marker is false, check if the saved fully read event's timestamp in between the first and last visible event
// If true, set scrolled_past_read_marker to true
// If true, set last_event_id to last_display_event
// If true, start the 5 seconds timer
let Some((_fully_read_event, fully_read_timestamp)) =
let Some((_, fully_read_timestamp)) =
get_fully_read_event(room_id)
else {
*index = first_index;
Expand All @@ -2155,9 +2127,11 @@ impl RoomScreen {
&& fully_read_timestamp <= last_timestamp
{
tl_state.scrolled_past_read_marker = true;
tl_state.last_display_event =
Some((last_event_id.to_owned(), last_timestamp));
self.fully_read_timer = cx.start_timeout(5.0);
submit_async_request(MatrixRequest::FullyReadReceipt {
room_id: room_id.clone(),
event_id: last_event_id.to_owned(),
timestamp: last_timestamp,
});
}
}
}
Expand Down
93 changes: 49 additions & 44 deletions src/sliding_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use matrix_sdk::{
message::{ForwardThread, RoomMessageEventContent}, MediaSource
}, FullStateEventContent
}, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomAliasId, OwnedRoomId, OwnedUserId, UserId
}, sliding_sync::VersionBuilder, sync::RoomUpdate, Client, Error, Room
}, sliding_sync::VersionBuilder, Client, Error, Room
};
use matrix_sdk_ui::{
room_list_service::{self, RoomListLoadingState},
sync_service::{self, SyncService},
timeline::{AnyOtherFullStateEventContent, EventTimelineItem, RepliedToInfo, RoomExt, TimelineDetails, TimelineItem, TimelineItemContent},
timeline::{AnyOtherFullStateEventContent, EventTimelineItem, RepliedToInfo, TimelineDetails, TimelineItem, TimelineItemContent},
Timeline,
};
use robius_open::Uri;
Expand All @@ -38,7 +38,6 @@ use crate::{
}, utils::MEDIA_THUMBNAIL_FORMAT, verification::add_verification_event_handlers_and_sync_client
};


#[derive(Parser, Debug, Default)]
struct Cli {
/// The user name that should be used for the login.
Expand Down Expand Up @@ -357,12 +356,12 @@ pub enum MatrixRequest {
/// Whether to subscribe or unsubscribe from typing notices for this room.
subscribe: bool,
},
/// Subscribe to Updates for the given room.
/// Subscribe to Updates own user read receipts changed for the given room.
///
/// This request does not return a response or notify the UI thread.
SubscribeToUpdates {
SubscribeToOwnUserReadReceiptsChanged {
room_id: OwnedRoomId,
/// Whether to subscribe or unsubscribe to Updates for this room.
/// Whether to subscribe or unsubscribe to own user read receipts changed for this room.
subscribe: bool,
},
/// Sends a read receipt for the given event in the given room.
Expand Down Expand Up @@ -417,7 +416,7 @@ async fn async_worker(
login_sender: Sender<LoginRequest>,
) -> Result<()> {
log!("Started async_worker task.");

let subscribe_to_owned_user_read_receipt_changed: std::sync::Arc<tokio::sync::Mutex<BTreeMap<OwnedRoomId, bool>>> = Arc::new(tokio::sync::Mutex::new(BTreeMap::new()));
while let Some(request) = request_receiver.recv().await {
match request {
MatrixRequest::Login(login_request) => {
Expand Down Expand Up @@ -753,51 +752,60 @@ async fn async_worker(
// log!("Note: typing notifications recv loop has ended for room {}", room_id);
});
}
MatrixRequest::SubscribeToUpdates { room_id, subscribe } => {
let mut update_receiver = {
MatrixRequest::SubscribeToOwnUserReadReceiptsChanged { room_id, subscribe } => {
let Some(room) = CLIENT.get().and_then(|c| c.get_room(&room_id)) else {
error!("BUG: client/room not found when subscribing to typing notices request, room: {room_id}");
continue;
};

let timeline = {
let mut all_room_info = ALL_ROOM_INFO.lock().unwrap();
let Some(room_info) = all_room_info.get_mut(&room_id) else {
log!("BUG: room info not found for subscribe to updates, room {room_id}");
continue;
};
let recv = if subscribe {
if room_info.update_subscriber_initialised {
warning!("Note: room {room_id} is already subscribed to updates.");
continue
}
let Some(room) = CLIENT.get().and_then(|c| c.get_room(&room_id)) else {
error!("BUG: client/room not found when subscribing to updates, room: {room_id}");
continue;
};
room_info.update_subscriber_initialised = true;
room.subscribe_to_updates()
} else {
log!("BUG: room info not found for send message request {room_id}");
continue;
};
recv
room_info.timeline.clone()
};
let subscribe_to_owned_user_read_receipt_changed = subscribe_to_owned_user_read_receipt_changed.clone();

let _to_updates_task = Handle::current().spawn(async move {
while let Ok(room_update) = update_receiver.recv().await {
if let RoomUpdate::Joined { room, updates: _ } = room_update {
if let Ok(timeline) = room.timeline().await {
if let Some(client_user_id) = current_user_id() {
let updated_event_id = timeline.latest_user_read_receipt(&client_user_id).await;
let fully_read_event = get_fully_read_event(&room.room_id().to_owned());
if let (Some((updated_event_id, _receipt)), Some((fully_read_event_id, _time))) = (updated_event_id, fully_read_event) {
if updated_event_id != fully_read_event_id {
if let Err(e) = get_event_timestamp(&room, &room_id, fully_read_event_id).await
.map(|(read_event, timestamp)| {
set_fully_read_event(&room_id, read_event, timestamp)
}) {
error!("Error: couldn't set fully read event for room {room_id}: {e:?}");
}
let update_receiver = timeline.subscribe_own_user_read_receipts_changed().await;
let read_receipt_change_mutex = subscribe_to_owned_user_read_receipt_changed.clone();
let mut read_receipt_change_mutex_guard = read_receipt_change_mutex.lock().await;
if let Some(subscription) = read_receipt_change_mutex_guard.get(&room_id) {
if *subscription && subscribe {
return
}
} else if subscribe {
read_receipt_change_mutex_guard.insert(room_id.clone(), true);
}
pin_mut!(update_receiver);
if let Some(client_user_id) = current_user_id() {
while (update_receiver.next().await).is_some() {
let read_receipt_change = subscribe_to_owned_user_read_receipt_changed.clone();
let read_receipt_change = read_receipt_change.lock().await;
let Some(subscribed_to_user_read_receipt) = read_receipt_change.get(&room_id) else { continue; };
if !subscribed_to_user_read_receipt {
break;
}

let updated_event_id = timeline.latest_user_read_receipt_timeline_event_id(&client_user_id).await;
let fully_read_event = get_fully_read_event(&room_id);
if let (Some(updated_event_id), Some((event_id, fully_read_event_timestamp))) = (updated_event_id, fully_read_event) {
if updated_event_id != event_id {
if let Err(e) = get_event_timestamp(&room, &room_id, updated_event_id).await
.map(|(read_event, timestamp)| {
if timestamp > fully_read_event_timestamp {
set_fully_read_event(&room_id, read_event, timestamp)
} else {
bail!("updated event's timestamp is older than current fully read event");
}
})
{
error!("Error: couldn't set fully read event for room {room_id}: {e:?}");
}
}

}

}
}
});
Expand Down Expand Up @@ -1039,8 +1047,6 @@ struct RoomInfo {
timeline_subscriber_handler_task: JoinHandle<()>,
/// A drop guard for the event handler that represents a subscription to typing notices for this room.
typing_notice_subscriber: Option<EventHandlerDropGuard>,
/// A boolean indicating if the update subsciber has been initialised
update_subscriber_initialised: bool,
/// The ID of the old tombstoned room that this room has replaced, if any.
replaces_tombstoned_room: Option<OwnedRoomId>,
/// Event_id and timestamp for read marker
Expand Down Expand Up @@ -1624,7 +1630,6 @@ async fn add_new_room(room: &room_list_service::Room) -> Result<()> {
timeline_update_sender,
timeline_subscriber_handler_task,
typing_notice_subscriber: None,
update_subscriber_initialised: false,
replaces_tombstoned_room: tombstoned_room_replaced_by_this_room,
fully_read_event: None
},
Expand Down

0 comments on commit 8b22003

Please sign in to comment.