From bd52d4c5b08dd321cea487e16dce8a3aed39a48e Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Fri, 3 Jan 2025 20:04:22 +0000 Subject: [PATCH 1/3] automatically retry returning data in syncv3 --- src/api/client/sync/v3.rs | 42 +++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 406b497dc..edb2b9452 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -124,6 +124,33 @@ pub(crate) async fn sync_events_route( // Setup watchers, so if there's no response, we can wait for them let watcher = services.sync.watch(sender_user, sender_device); + let response = build_sync_events(services, &body).await?; + if body.body.full_state + || !(response.rooms.is_empty() + && response.presence.is_empty() + && response.account_data.is_empty() + && response.device_lists.is_empty() + && response.to_device.is_empty()) + { + return Ok(response); + } + + // Hang a few seconds so requests are not spammed + // Stop hanging if new info arrives + let default = Duration::from_secs(30); + let duration = cmp::min(body.body.timeout.unwrap_or(default), default); + _ = tokio::time::timeout(duration, watcher).await; + + // Retry returning data + build_sync_events(services, &body).await +} + +pub(crate) async fn build_sync_events( + services: crate::State, + body: &Ruma, +) -> Result> { + let (sender_user, sender_device) = body.sender(); + let next_batch = services.globals.current_count()?; let next_batch_string = next_batch.to_string(); @@ -327,21 +354,6 @@ pub(crate) async fn sync_events_route( to_device: ToDevice { events: to_device_events }, }; - // TODO: Retry the endpoint instead of returning - if !full_state - && response.rooms.is_empty() - && response.presence.is_empty() - && response.account_data.is_empty() - && response.device_lists.is_empty() - && response.to_device.is_empty() - { - // Hang a few seconds so requests are not spammed - // Stop hanging if new info arrives - let default = Duration::from_secs(30); - let duration = cmp::min(body.body.timeout.unwrap_or(default), default); - _ = tokio::time::timeout(duration, watcher).await; - } - Ok(response) } From 95c556d5a78e290d78630d708d25517983b6f24e Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Fri, 3 Jan 2025 20:15:02 +0000 Subject: [PATCH 2/3] reference service --- src/api/client/sync/v3.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index edb2b9452..057ca6c8d 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -124,7 +124,7 @@ pub(crate) async fn sync_events_route( // Setup watchers, so if there's no response, we can wait for them let watcher = services.sync.watch(sender_user, sender_device); - let response = build_sync_events(services, &body).await?; + let response = build_sync_events(&services, &body).await?; if body.body.full_state || !(response.rooms.is_empty() && response.presence.is_empty() @@ -142,11 +142,11 @@ pub(crate) async fn sync_events_route( _ = tokio::time::timeout(duration, watcher).await; // Retry returning data - build_sync_events(services, &body).await + build_sync_events(&services, &body).await } pub(crate) async fn build_sync_events( - services: crate::State, + services: &Services, body: &Ruma, ) -> Result> { let (sender_user, sender_device) = body.sender(); From 2b4211f367446892ed8979f39f486432bfad0976 Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Fri, 3 Jan 2025 21:04:34 +0000 Subject: [PATCH 3/3] clippy fixes --- src/api/client/sync/v3.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 057ca6c8d..9f9ccfab4 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -190,7 +190,7 @@ pub(crate) async fn build_sync_events( .map(ToOwned::to_owned) .broad_filter_map(|room_id| { load_joined_room( - &services, + services, sender_user, sender_device, room_id.clone(), @@ -223,7 +223,7 @@ pub(crate) async fn build_sync_events( .rooms_left(sender_user) .broad_filter_map(|(room_id, _)| { handle_left_room( - &services, + services, since, room_id.clone(), sender_user, @@ -269,7 +269,7 @@ pub(crate) async fn build_sync_events( let presence_updates: OptionFuture<_> = services .globals .allow_local_presence() - .then(|| process_presence_updates(&services, since, sender_user)) + .then(|| process_presence_updates(services, since, sender_user)) .into(); let account_data = services @@ -319,7 +319,7 @@ pub(crate) async fn build_sync_events( .stream() .broad_filter_map(|user_id| async move { let no_shared_encrypted_room = - !share_encrypted_room(&services, sender_user, &user_id, None).await; + !share_encrypted_room(services, sender_user, &user_id, None).await; no_shared_encrypted_room.then_some(user_id) }) .ready_fold(HashSet::new(), |mut device_list_left, user_id| {