From 27b802d4b74f0d894005ae88b6b052e684b2d760 Mon Sep 17 00:00:00 2001 From: sim Date: Thu, 19 Dec 2024 09:21:04 +0000 Subject: [PATCH] Wipe expired messages --- autopush-common/src/db/redis/mod.rs | 1 + .../src/db/redis/redis_client/mod.rs | 103 +++++++++++++++--- 2 files changed, 91 insertions(+), 13 deletions(-) diff --git a/autopush-common/src/db/redis/mod.rs b/autopush-common/src/db/redis/mod.rs index bfce28bc..f906a0b9 100644 --- a/autopush-common/src/db/redis/mod.rs +++ b/autopush-common/src/db/redis/mod.rs @@ -6,6 +6,7 @@ /// `autopush/co/{uaid}` u64 to store the last time the user has interacted with the server /// `autopush/channels/{uaid}` List to store the list of the channels of the user /// `autopush/msgs/{uaid}` SortedSet to store the list of the pending message ids for the user +/// `autopush/msgs_exp/{uaid}` SortedSet to store the list of the pending message ids, ordered by expiry date, this is because SortedSet elements can't have independant expiry date /// `autopush/msg/{uaid}/{chidmessageid}`, with `{chidmessageid} == {chid}:{version}` String to store /// the content of the messages /// `autopush/topic/{uaid}/{chid}/{topic}` String to store the (last) message id of a given topic diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 0081b776..6ebba652 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -109,6 +109,10 @@ impl RedisClientImpl { format!("autopush/msgs/{}", uaid.as_hyphenated()) } + fn message_exp_list_key(&self, uaid: &Uuid) -> String { + format!("autopush/msgs_exp/{}", uaid.as_hyphenated()) + } + fn message_key(&self, uaid: &Uuid, chidmessageid: &str) -> String { format!("autopush/msg/{}/{}", uaid.as_hyphenated(), chidmessageid) } @@ -192,11 +196,13 @@ impl DbClient for RedisClientImpl { let co_key = self.last_co_key(&uaid); let chan_list_key = self.channel_list_key(&uaid); let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); redis::pipe() .del(&user_key) .del(&co_key) .del(&chan_list_key) .del(&msg_list_key) + .del(&exp_list_key) .exec(&mut con) .unwrap(); Ok(()) @@ -287,6 +293,7 @@ impl DbClient for RedisClientImpl { async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> { let mut con = self.connection()?; let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); let msg_key = self.message_key(&uaid, &message.chidmessageid()); // message.ttl is already min(headers.ttl, MAX_NOTIFICATION_TTL) // see autoendpoint/src/extractors/notification_headers.rs @@ -300,14 +307,11 @@ impl DbClient for RedisClientImpl { // Remember, `timestamp` is effectively the time to kill the message, not the // current time. - let expiry = SystemTime::now() + Duration::from_secs(message.ttl); - trace!( - "🉑 Message Expiry {}", - expiry - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() - ); + let expiry = (SystemTime::now() + Duration::from_secs(message.ttl)) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + trace!("🉑 Message Expiry {}", expiry); let mut pipe = redis::pipe(); @@ -318,8 +322,11 @@ impl DbClient for RedisClientImpl { // If a message is already stored for that topic, we remove it if let Some(id) = old_msg_id { trace!("🉑 The topic had a message: {}", &id); - pipe.zrem(&msg_list_key, &id) - .del(self.message_key(&uaid, &id)); + // We remove the id from the exp list at the end, to be sure + // it can't be removed from the list before the message is removed + pipe.del(self.message_key(&uaid, &id)) + .zrem(&msg_list_key, &id) + .zrem(&exp_list_key, &id); } // Setting the key replace the old one if any pipe.set_options(&topic_key, &message.chidmessageid(), opts); @@ -339,6 +346,7 @@ impl DbClient for RedisClientImpl { ) // The function [fecth_timestamp_messages] takes a timestamp in input, // here we use the timestamp of the record (in ms) + .zadd(&exp_list_key, &msg_id, expiry) .zadd(&msg_list_key, &msg_id, ms_since_epoch()); let _: () = pipe.exec(&mut con).unwrap(); @@ -362,8 +370,22 @@ impl DbClient for RedisClientImpl { Ok(()) } - /// Doesn't seem to be useful for redis - async fn increment_storage(&self, _uaid: &Uuid, _timestamp: u64) -> DbResult<()> { + /// Delete expired messages + async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> { + debug!("🉑🔥 Incrementing storage to {}", timestamp); + let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); + let mut con = self.connection()?; + let exp_id_list: Vec = con.zrangebyscore(&exp_list_key, 0, timestamp).unwrap(); + if exp_id_list.len() > 0 { + trace!("🉑🔥 Deleting {} expired msgs", exp_id_list.len()); + redis::pipe() + .del(&exp_id_list) + .zrem(&msg_list_key, &exp_id_list) + .zrem(&exp_list_key, &exp_id_list) + .exec(&mut con) + .unwrap(); + } Ok(()) } @@ -376,11 +398,15 @@ impl DbClient for RedisClientImpl { ); let msg_key = self.message_key(&uaid, &chidmessageid); let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); debug!("🉑🔥 Deleting message {}", &msg_key); let mut con = self.connection()?; + // We remove the id from the exp list at the end, to be sure + // it can't be removed from the list before the message is removed redis::pipe() - .zrem(&msg_list_key, &chidmessageid) .del(&msg_key) + .zrem(&msg_list_key, &chidmessageid) + .zrem(&exp_list_key, &chidmessageid) .exec(&mut con) .unwrap(); self.metrics @@ -533,6 +559,57 @@ mod tests { assert!(result.unwrap()); } + /// Test if [increment_storage] correctly wipe expired messages + #[actix_rt::test] + async fn wipe_expired() -> DbResult<()> { + init_test_logging(); + let client = new_client()?; + + let connected_at = ms_since_epoch(); + + let uaid = Uuid::parse_str(TEST_USER).unwrap(); + let chid = Uuid::parse_str(TEST_CHID).unwrap(); + + let node_id = "test_node".to_owned(); + + // purge the user record if it exists. + let _ = client.remove_user(&uaid).await; + + let test_user = User { + uaid, + router_type: "webpush".to_owned(), + connected_at, + router_data: None, + node_id: Some(node_id.clone()), + ..Default::default() + }; + + // purge the old user (if present) + // in case a prior test failed for whatever reason. + let _ = client.remove_user(&uaid).await; + + // can we add the user? + let timestamp = now(); + let fetch_timestamp = ms_since_epoch(); + client.add_user(&test_user).await?; + let test_notification = crate::db::Notification { + channel_id: chid, + version: "test".to_owned(), + ttl: 1, + timestamp, + data: Some("Encrypted".into()), + sortkey_timestamp: Some(timestamp), + ..Default::default() + }; + client.save_message(&uaid, test_notification).await?; + client + .increment_storage(&uaid, fetch_timestamp + 10000) + .await?; + let msgs = client.fetch_timestamp_messages(&uaid, None, 999).await?; + assert_eq!(msgs.messages.len(), 0); + Ok(()) + } + /// run a gauntlet of testing. These are a bit linear because they need /// to run in sequence. #[actix_rt::test]