Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle read notifications from other clients #312

Merged
merged 2 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

### Added

- Add `colored_messages` config option
- Add `colored_messages` config option ([#311])
- Handle read receipts from other clients ([#312])

[#311]: https://github.com/boxdot/gurk-rs/pull/311
[#312]: https://github.com/boxdot/gurk-rs/pull/312

## 0.5.1

Expand Down
10 changes: 7 additions & 3 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ impl App {
}

pub async fn on_message(&mut self, content: Content) -> anyhow::Result<()> {
tracing::info!(?content, "incoming");
// tracing::info!(?content, "incoming");

#[cfg(feature = "dev")]
if self.config.developer.dump_raw_messages {
Expand All @@ -491,6 +491,10 @@ impl App {

let user_id = self.user_id;

if let ContentBody::SynchronizeMessage(SyncMessage { ref read, .. }) = content.body {
self.handle_read(read);
}

let (channel_idx, message) = match (content.metadata, content.body) {
// Private note message
(
Expand Down Expand Up @@ -1559,7 +1563,7 @@ fn add_emoji_from_sticker(body: &mut Option<String>, sticker: Option<Sticker>) {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;

use crate::config::User;
Expand All @@ -1570,7 +1574,7 @@ mod tests {
use std::cell::RefCell;
use std::rc::Rc;

fn test_app() -> (
pub(crate) fn test_app() -> (
App,
mpsc::UnboundedReceiver<Event>,
Rc<RefCell<Vec<Message>>>,
Expand Down
74 changes: 71 additions & 3 deletions src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::BTreeMap;

use anyhow::Context;
use presage::libsignal_service::content::Metadata;
use presage::proto::sync_message::Sent;
use presage::proto::sync_message::{Read, Sent};
use presage::proto::{DataMessage, EditMessage, SyncMessage};
use tracing::debug;

Expand All @@ -19,8 +21,6 @@ impl App {
return Ok(());
};

tracing::info!(?sync_message, "#########");

// edit message
if let Some(Sent {
edit_message:
Expand Down Expand Up @@ -88,6 +88,36 @@ impl App {

Ok(())
}

/// Handles read notifications
pub(crate) fn handle_read(&mut self, read: &[Read]) {
// First collect all the read counters to avoid hitting the storage for the same channel
let read_counters: BTreeMap<ChannelId, u32> = read
.iter()
.filter_map(|read| {
let arrived_at = read.timestamp?;
let channel_id = self.storage.message_channel(arrived_at)?;
let num_unread = self
.storage
.messages(channel_id)
.rev()
.take_while(|msg| arrived_at < msg.arrived_at)
.count();
let num_unread: u32 = num_unread.try_into().ok()?;
Some((channel_id, num_unread))
})
.collect();
// Update the unread counters
for (channel_id, num_unread) in read_counters {
if let Some(channel) = self.storage.channel(channel_id) {
if channel.unread_messages > 0 {
let mut channel = channel.into_owned();
channel.unread_messages = num_unread;
self.storage.store_channel(channel);
}
}
}
}
}

trait MessageExt {
Expand Down Expand Up @@ -122,3 +152,41 @@ impl MessageExt for SyncMessage {
}
}
}

#[cfg(test)]
mod tests {
use crate::app::tests::test_app;

use super::*;

#[test]
#[ignore = "forgetful storage does not support lookup by arrived_at"]
fn test_handle_read() {
let (mut app, _events, _sent_messages) = test_app();

let channel_id = *app.channels.items.first().unwrap();

// new incoming message
let message = app
.storage
.store_message(
channel_id,
Message::text(app.user_id, 42, "unread message".to_string()),
)
.into_owned();

// mark as unread
app.storage
.channel(channel_id)
.unwrap()
.into_owned()
.unread_messages = 1;

app.handle_read(&[Read {
timestamp: Some(message.arrived_at),
..Default::default()
}]);

assert_eq!(app.storage.channel(channel_id).unwrap().unread_messages, 0);
}
}
4 changes: 4 additions & 0 deletions src/storage/forgetful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ impl Storage for ForgetfulStorage {
}

fn save(&mut self) {}

fn message_channel(&self, _arrived_at: u64) -> Option<ChannelId> {
None
}
}
23 changes: 23 additions & 0 deletions src/storage/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,16 @@ impl Storage for JsonStorage {
error!(error =% e, "failed to save json storage");
}
}

fn message_channel(&self, arrived_at: u64) -> Option<ChannelId> {
self.data.channels.items.iter().find_map(|channel| {
channel
.messages
.binary_search_by_key(&arrived_at, |msg| msg.arrived_at)
.is_ok()
.then_some(channel.id)
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -601,4 +611,17 @@ mod tests {
);
assert_eq!(storage.metadata().contacts_sync_request_at, Some(dt));
}

#[test]
fn test_json_storage_message_channel() {
let mut storage = json_storage_from_snapshot();
let channel_id = ChannelId::User(uuid!("966960e0-a8cd-43f1-ac7a-2c986dd470cd"));
let from_id = uuid!("00000000-0000-0000-0000-000000000000");
storage.store_message(
channel_id,
Message::text(from_id, 1664832050004, "hello".to_owned()),
);
assert_eq!(storage.message_channel(1664832050004), Some(channel_id));
assert_eq!(storage.message_channel(0), None);
}
}
5 changes: 5 additions & 0 deletions src/storage/memcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,9 @@ impl<S: Storage> Storage for MemCache<S> {
fn save(&mut self) {
self.storage.save();
}

fn message_channel(&self, arrived_at: u64) -> Option<ChannelId> {
// message arrived_at to channel_id conversion is not cached
self.storage.message_channel(arrived_at)
}
}
2 changes: 2 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub trait Storage {
/// Gets the message by id
fn message(&self, message_id: MessageId) -> Option<Cow<Message>>;

fn message_channel(&self, arrived_at: u64) -> Option<ChannelId>;

fn edits(
&self,
message_id: MessageId,
Expand Down
43 changes: 43 additions & 0 deletions src/storage/sql/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,36 @@ impl Storage for SqliteStorage {
}

fn save(&mut self) {}

fn message_channel(&self, arrived_at: u64) -> Option<ChannelId> {
struct SqlChannelId {
channel_id: ChannelId,
}

let arrived_at: i64 = arrived_at
.try_into()
.map_err(|_| MessageConvertError::InvalidTimestamp)
.ok_logged()?;

self.execute(|ctx| {
Box::pin(
sqlx::query_as!(
SqlChannelId,
r#"
SELECT
m.channel_id AS "channel_id: _"
FROM messages AS m
WHERE m.arrived_at = ?
LIMIT 1
"#,
arrived_at
)
.fetch_optional(ctx.conn),
)
})
.ok_logged()?
.map(|channel_id| channel_id.channel_id)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -958,4 +988,17 @@ mod tests {

assert_eq!(is_sqlite_encrypted_heuristics(&url), Some(true));
}

#[test]
fn test_sqlite_storage_message_channel() {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
let mut storage = fixtures();
let from_id = uuid!("966960e0-a8cd-43f1-ac7a-2c986dd470cd");
let channel_id = ChannelId::User(uuid!("a955d20f-6b83-4e69-846e-a99b1779ff7a"));
storage.store_message(
channel_id,
Message::text(from_id, 1664832050000, "hello".to_owned()),
);
assert_eq!(storage.message_channel(1664832050000), Some(channel_id));
}
}