From b90254349bf952a83a9c1b804cf50f210edaa27e Mon Sep 17 00:00:00 2001 From: sim Date: Tue, 17 Dec 2024 16:45:08 +0100 Subject: [PATCH 01/21] Update yanked futures-utils --- Cargo.lock | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be9d2d08..280d1770 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1597,9 +1597,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1607,9 +1607,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" @@ -1624,9 +1624,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-locks" @@ -1641,9 +1641,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1652,21 +1652,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures 0.1.31", "futures-channel", From 1152f7f65ef8abb03191ec7f517f28a9bfd130df Mon Sep 17 00:00:00 2001 From: sim Date: Tue, 17 Dec 2024 16:49:15 +0100 Subject: [PATCH 02/21] Add redis backend --- Cargo.lock | 46 +- Cargo.toml | 2 +- autoconnect/Cargo.toml | 1 + autoconnect/autoconnect-settings/Cargo.toml | 1 + .../autoconnect-settings/src/app_state.rs | 7 +- autoendpoint/Cargo.toml | 2 + autoendpoint/src/server.rs | 7 +- autopush-common/Cargo.toml | 2 + autopush-common/build.rs | 4 +- autopush-common/src/db/mod.rs | 13 + autopush-common/src/db/redis/mod.rs | 64 ++ .../src/db/redis/redis_client/mod.rs | 780 ++++++++++++++++++ autopush-common/src/db/routing.rs | 4 + autopush-common/src/notification.rs | 3 + 14 files changed, 930 insertions(+), 6 deletions(-) create mode 100644 autopush-common/src/db/redis/mod.rs create mode 100644 autopush-common/src/db/redis/redis_client/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 280d1770..8dbfffd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -807,6 +807,7 @@ dependencies = [ "openssl", "protobuf", "rand 0.8.5", + "redis", "regex", "reqwest 0.12.8", "sentry", @@ -1102,6 +1103,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "config" version = "0.14.0" @@ -2207,6 +2218,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -2958,6 +2978,24 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "redis" +version = "0.27.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" +dependencies = [ + "arc-swap", + "combine", + "itertools 0.13.0", + "itoa", + "num-bigint", + "percent-encoding", + "ryu", + "sha1_smol", + "socket2", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -3539,6 +3577,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" @@ -4699,7 +4743,7 @@ dependencies = [ "http 0.2.12", "hyper 0.14.30", "hyper-rustls 0.24.2", - "itertools", + "itertools 0.12.1", "log", "percent-encoding", "rustls 0.22.4", diff --git a/Cargo.toml b/Cargo.toml index 4e9875a6..0f22df15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,7 +103,7 @@ autoconnect_settings = { path = "./autoconnect/autoconnect-settings" } autoconnect_web = { path = "./autoconnect/autoconnect-web" } autoconnect_ws = { path = "./autoconnect/autoconnect-ws" } autoconnect_ws_clientsm = { path = "./autoconnect/autoconnect-ws/autoconnect-ws-clientsm" } -autopush_common = { path = "./autopush-common", features = ["bigtable"] } +autopush_common = { path = "./autopush-common" } [profile.release] debug = 1 diff --git a/autoconnect/Cargo.toml b/autoconnect/Cargo.toml index c0ba4ba2..b2c95301 100644 --- a/autoconnect/Cargo.toml +++ b/autoconnect/Cargo.toml @@ -56,4 +56,5 @@ docopt = "1.1" default = ["bigtable"] bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"] emulator = ["bigtable"] +redis = ["autopush_common/redis", "autoconnect_settings/redis"] log_vapid = [] diff --git a/autoconnect/autoconnect-settings/Cargo.toml b/autoconnect/autoconnect-settings/Cargo.toml index 32c04aeb..4e0bc8bd 100644 --- a/autoconnect/autoconnect-settings/Cargo.toml +++ b/autoconnect/autoconnect-settings/Cargo.toml @@ -25,3 +25,4 @@ autopush_common.workspace = true # specify the default via the calling crate, in order to simplify default chains. bigtable = ["autopush_common/bigtable"] emulator = ["bigtable"] +redis = ["autopush_common/redis"] diff --git a/autoconnect/autoconnect-settings/src/app_state.rs b/autoconnect/autoconnect-settings/src/app_state.rs index 8383c6ee..1510571d 100644 --- a/autoconnect/autoconnect-settings/src/app_state.rs +++ b/autoconnect/autoconnect-settings/src/app_state.rs @@ -11,7 +11,7 @@ use autoconnect_common::{ broadcast::BroadcastChangeTracker, megaphone::init_and_spawn_megaphone_updater, registry::ClientRegistry, }; -use autopush_common::db::{client::DbClient, DbSettings, StorageType}; +use autopush_common::db::{client::DbClient, redis::RedisClientImpl, DbSettings, StorageType}; use crate::{Settings, ENV_PREFIX}; @@ -78,6 +78,11 @@ impl AppState { client.spawn_sweeper(Duration::from_secs(30)); Box::new(client) } + #[cfg(feature = "redis")] + StorageType::Redis => Box::new( + RedisClientImpl::new(metrics.clone(), &db_settings) + .map_err(|e| ConfigError::Message(e.to_string()))?, + ), _ => panic!( "Invalid Storage type {:?}. Check {}__DB_DSN.", storage_type, diff --git a/autoendpoint/Cargo.toml b/autoendpoint/Cargo.toml index 94586a0d..6dbde469 100644 --- a/autoendpoint/Cargo.toml +++ b/autoendpoint/Cargo.toml @@ -75,6 +75,8 @@ bigtable = ["autopush_common/bigtable"] # enable emulator to call locally run data store. emulator = ["bigtable"] +redis = ["autopush_common/redis"] + # Enable "stub" router for local testing purposes. # The "stub" will return specified error strings or success # depending on which `app_id` client is called based on the registration diff --git a/autoendpoint/src/server.rs b/autoendpoint/src/server.rs index 86444951..09afc5ba 100644 --- a/autoendpoint/src/server.rs +++ b/autoendpoint/src/server.rs @@ -14,7 +14,10 @@ use serde_json::json; #[cfg(feature = "bigtable")] use autopush_common::db::bigtable::BigTableClientImpl; use autopush_common::{ - db::{client::DbClient, spawn_pool_periodic_reporter, DbSettings, StorageType}, + db::{ + client::DbClient, redis::RedisClientImpl, spawn_pool_periodic_reporter, DbSettings, + StorageType, + }, middleware::sentry::SentryWrapper, }; @@ -77,6 +80,8 @@ impl Server { client.spawn_sweeper(Duration::from_secs(30)); Box::new(client) } + #[cfg(feature = "redis")] + StorageType::Redis => Box::new(RedisClientImpl::new(metrics.clone(), &db_settings)?), _ => { debug!("No idea what {:?} is", &db_settings.dsn); return Err(ApiErrorKind::General( diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index 053681e2..d86b448a 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -59,6 +59,7 @@ grpcio = { version = "=0.13.0", features = ["openssl"], optional = true } grpcio-sys = { version = "=0.13.0", optional = true } protobuf = { version = "=2.28.0", optional = true } # grpcio does not support protobuf 3+ form_urlencoded = { version = "1.2", optional = true } +redis = "0.27.6" [dev-dependencies] mockito = "0.31" @@ -80,3 +81,4 @@ bigtable = [ emulator = [ "bigtable", ] # used for testing big table, requires an external bigtable emulator running. +redis = [] diff --git a/autopush-common/build.rs b/autopush-common/build.rs index 06440274..a04ae546 100644 --- a/autopush-common/build.rs +++ b/autopush-common/build.rs @@ -1,5 +1,5 @@ pub fn main() { - if !cfg!(feature = "bigtable") { - panic!("No database defined! Please compile with `features=bigtable`"); + if !cfg!(feature = "bigtable") && !cfg!(feature = "redis") { + panic!("No database defined! Please compile with `features=bigtable` (or redis)"); } } diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 70a2f025..c34aafff 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -24,6 +24,8 @@ pub mod bigtable; pub mod client; pub mod error; pub mod models; +#[cfg(feature = "redis")] +pub mod redis; pub mod reporter; pub mod routing; @@ -45,6 +47,8 @@ pub enum StorageType { INVALID, #[cfg(feature = "bigtable")] BigTable, + #[cfg(feature = "redis")] + Redis, } impl From<&str> for StorageType { @@ -52,6 +56,8 @@ impl From<&str> for StorageType { match name.to_lowercase().as_str() { #[cfg(feature = "bigtable")] "bigtable" => Self::BigTable, + #[cfg(feature = "redis")] + "redis" => Self::Redis, _ => Self::INVALID, } } @@ -65,6 +71,8 @@ impl StorageType { let mut result: Vec<&str> = Vec::new(); #[cfg(feature = "bigtable")] result.push("Bigtable"); + #[cfg(feature = "redis")] + result.push("Redis"); result } @@ -90,6 +98,11 @@ impl StorageType { } return Self::BigTable; } + #[cfg(feature = "redis")] + if dsn.starts_with("redis") { + trace!("Found redis"); + return Self::Redis; + } Self::INVALID } } diff --git a/autopush-common/src/db/redis/mod.rs b/autopush-common/src/db/redis/mod.rs new file mode 100644 index 00000000..bfce28bc --- /dev/null +++ b/autopush-common/src/db/redis/mod.rs @@ -0,0 +1,64 @@ +/// This uses redis as a storage and management +/// system for Autopush Notifications and Routing information. +/// +/// Keys for the data are +/// `autopush/user/{uaid}` String to store the user data +/// `autopush/co/{uaid}` u64 to store the last time the user has interacted with the server +/// `autopush/channels/{uaid}` List to store the list of the channels of the user +/// `autopush/msgs/{uaid}` SortedSet to store the list of the pending message ids for the user +/// `autopush/msg/{uaid}/{chidmessageid}`, with `{chidmessageid} == {chid}:{version}` String to store +/// the content of the messages +/// `autopush/topic/{uaid}/{chid}/{topic}` String to store the (last) message id of a given topic +/// +mod redis_client; + +pub use redis_client::RedisClientImpl; + +use serde::Deserialize; +use std::time::Duration; + +use crate::db::error::DbError; +use crate::util::deserialize_opt_u32_to_duration; + +/// The settings for accessing the redis contents. +#[derive(Clone, Debug, Deserialize)] +pub struct RedisDbSettings { + #[serde(default)] + #[serde(deserialize_with = "deserialize_opt_u32_to_duration")] + pub timeout: Option, +} + +// Used by test, but we don't want available for release. +#[allow(clippy::derivable_impls)] +impl Default for RedisDbSettings { + fn default() -> Self { + Self { + timeout: Default::default(), + } + } +} + +impl TryFrom<&str> for RedisDbSettings { + type Error = DbError; + fn try_from(setting_string: &str) -> Result { + let me: Self = match serde_json::from_str(setting_string) { + Ok(me) => me, + Err(e) if e.is_eof() => Self::default(), + Err(e) => Err(DbError::General(format!( + "Could not parse DdbSettings: {:?}", + e + )))?, + }; + Ok(me) + } +} + +mod tests { + + #[test] + fn test_settings_parse() -> Result<(), crate::db::error::DbError> { + let settings = super::RedisDbSettings::try_from("{\"timeout\": 123}")?; + assert_eq!(settings.timeout, Some(std::time::Duration::from_secs(123))); + Ok(()) + } +} diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs new file mode 100644 index 00000000..0231c74c --- /dev/null +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -0,0 +1,780 @@ +use std::collections::HashSet; +use std::fmt; +use std::fmt::Display; +use std::str::FromStr; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +use async_trait::async_trait; +use cadence::{CountedExt, StatsdClient}; +use redis::{Commands, ConnectionLike, SetExpiry, SetOptions}; +use uuid::Uuid; + +use crate::db::NotificationRecord; +use crate::db::{ + client::{DbClient, FetchMessageResponse}, + error::{DbError, DbResult}, + DbSettings, Notification, User, MAX_ROUTER_TTL, +}; +use crate::util::ms_since_epoch; + +use super::RedisDbSettings; + +/// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently. +// TODO:Should we create something similar for ChannelID? +struct Uaid(Uuid); + +impl Display for Uaid { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0.as_simple()) + } +} + +impl From for String { + fn from(uaid: Uaid) -> String { + uaid.0.as_simple().to_string() + } +} + +#[derive(Clone)] +/// Wrapper for the Redis connection +pub struct RedisClientImpl { + /// Database connector string + pub client: redis::Client, + pub(crate) settings: RedisDbSettings, + /// Metrics client + metrics: Arc, + redis_opts: SetOptions, +} + +impl RedisClientImpl { + pub fn new(metrics: Arc, settings: &DbSettings) -> DbResult { + // let env = Arc::new(EnvBuilder::new().build()); + debug!("🏊 BT Pool new"); + let dsn = settings + .dsn + .clone() + .ok_or(DbError::General("Could not find DSN".to_owned()))?; + let client = redis::Client::open(dsn).unwrap(); + let db_settings = RedisDbSettings::try_from(settings.db_settings.as_ref())?; + info!("🉑 {:#?}", db_settings); + + Ok(Self { + client, + settings: db_settings, + metrics, + redis_opts: SetOptions::default().with_expiration(SetExpiry::EX(MAX_ROUTER_TTL)), + }) + } + + /** + Return a [ConnectionLike], which implement redis [Commands] and can be + used in pipes. + + Pools also return a ConnectionLike, so we can add support for pools later. + */ + fn connection(&self) -> DbResult { + if let Some(timeout) = self.settings.timeout { + if timeout.is_zero() { + return match self.client.get_connection_with_timeout(timeout) { + Ok(r) => Ok(r), + Err(_) => Err(DbError::ConnectionError( + "Cannot connect to redis".to_owned(), + )), + }; + } + } + match self.client.get_connection() { + Ok(r) => Ok(r), + Err(_) => Err(DbError::ConnectionError( + "Cannot connect to redis".to_owned(), + )), + } + } + + fn user_key(&self, uaid: &Uuid) -> String { + format!("autopush/user/{}", uaid.as_hyphenated()) + } + + /// This store the last connection record, but doesn't update User + fn last_co_key(&self, uaid: &Uuid) -> String { + format!("autopush/co/{}", uaid.as_hyphenated()) + } + + fn channel_list_key(&self, uaid: &Uuid) -> String { + format!("autopush/channels/{}", uaid.as_hyphenated()) + } + + fn message_list_key(&self, uaid: &Uuid) -> String { + format!("autopush/msgs/{}", uaid.as_hyphenated()) + } + + fn message_key(&self, uaid: &Uuid, chidmessageid: &str) -> String { + format!("autopush/msg/{}/{}", uaid.as_hyphenated(), chidmessageid) + } + + fn topic_key(&self, uaid: &Uuid, chan_id: &Uuid, topic: &str) -> String { + format!( + "autopush/topic/{}/{}/{}", + uaid.as_hyphenated(), + chan_id.as_hyphenated(), + topic + ) + } +} + +#[async_trait] +impl DbClient for RedisClientImpl { + /// add user to the database + async fn add_user(&self, user: &User) -> DbResult<()> { + trace!("🉑 Adding user"); + trace!("Logged at {}", &user.connected_at); + let mut con = self.connection()?; + let user_key = self.user_key(&user.uaid); + let co_key = self.last_co_key(&user.uaid); + let _: () = redis::pipe() + .set_options(co_key, ms_since_epoch(), self.redis_opts) + .ignore() + .set_options( + user_key, + serde_json::to_string(user).unwrap(), + self.redis_opts, + ) + .ignore() + .exec(&mut con) + .unwrap(); + Ok(()) + } + + /// To update the TTL of the Redis entry we just have to SET again, with the new expiry + /// + /// NOTE: This function is called by mobile during the daily + /// [autoendpoint::routes::update_token_route] handling, and by desktop + /// [autoconnect-ws-sm::get_or_create_user]` which is called + /// during the `HELLO` handler. This should be enough to ensure that the ROUTER records + /// are properly refreshed for "lively" clients. + /// + /// NOTE: There is some, very small, potential risk that a desktop client that can + /// somehow remain connected the duration of MAX_ROUTER_TTL, may be dropped as not being + /// "lively". + async fn update_user(&self, user: &mut User) -> DbResult { + trace!("🉑 Updating user"); + let mut con = self.connection()?; + let co_key = self.last_co_key(&user.uaid); + let last_co: Option = con.get(&co_key).unwrap(); + if last_co.is_some_and(|c| c < user.connected_at) { + trace!( + "🉑 Was connected at {}, now at {}", + last_co.unwrap(), + &user.connected_at + ); + self.add_user(&user).await?; + Ok(true) + } else { + Ok(false) + } + } + + async fn get_user(&self, uaid: &Uuid) -> DbResult> { + let mut con = self.connection()?; + let user_key = self.user_key(uaid); + let user: Option = con + .get::<&str, Option>(&user_key) + .unwrap() + .and_then(|s| serde_json::from_str(s.as_ref()).unwrap()); + if user.is_some() { + trace!("🉑 Found a record for {}", &uaid); + } + Ok(user) + } + + async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> { + let mut con = self.connection()?; + let user_key = self.user_key(&uaid); + let co_key = self.last_co_key(&uaid); + let chan_list_key = self.channel_list_key(&uaid); + let msg_list_key = self.message_list_key(&uaid); + redis::pipe() + .del(&user_key) + .ignore() + .del(&co_key) + .ignore() + .del(&chan_list_key) + .ignore() + .del(&msg_list_key) + .ignore() + .exec(&mut con) + .unwrap(); + Ok(()) + } + + async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> { + let mut con = self.connection()?; + let co_key = self.last_co_key(&uaid); + let chan_list_key = self.channel_list_key(&uaid); + + let _: () = redis::pipe() + .rpush(chan_list_key, channel_id.as_hyphenated().to_string()) + .ignore() + .set_options(co_key, ms_since_epoch(), self.redis_opts) + .ignore() + .exec(&mut con) + .unwrap(); + Ok(()) + } + + /// Add channels in bulk (used mostly during migration) + async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()> { + // channel_ids are stored as a set within a single redis key + let mut con = self.connection()?; + let co_key = self.last_co_key(&uaid); + let chan_list_key = self.channel_list_key(&uaid); + redis::pipe() + .set_options(co_key, ms_since_epoch(), self.redis_opts) + .ignore() + .rpush( + chan_list_key, + channels + .into_iter() + .map(|c| c.as_hyphenated().to_string()) + .collect::>(), + ) + .ignore() + .exec(&mut con) + .unwrap(); + Ok(()) + } + + async fn get_channels(&self, uaid: &Uuid) -> DbResult> { + let mut con = self.connection()?; + let chan_list_key = self.channel_list_key(&uaid); + let channels: HashSet = con + .lrange::<&str, HashSet>(&chan_list_key, 0, -1) + .unwrap() + .into_iter() + .map(|s| Uuid::from_str(&s).unwrap()) + .collect(); + trace!("🉑 Found {} channels for {}", channels.len(), &uaid); + Ok(channels) + } + + /// Delete the channel. Does not delete its associated pending messages. + async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult { + let mut con = self.connection()?; + let co_key = self.last_co_key(&uaid); + let chan_list_key = self.channel_list_key(&uaid); + // Remove {channel_id} from autopush/channel/{auid} + trace!("🉑 Removing channel {}", channel_id.as_hyphenated()); + let (status,): (bool,) = redis::pipe() + .set_options(co_key, ms_since_epoch(), self.redis_opts) + .ignore() + .lrem(&chan_list_key, 1, channel_id.as_hyphenated().to_string()) + .query(&mut con) + .unwrap(); + Ok(status) + } + + /// Remove the node_id + async fn remove_node_id( + &self, + uaid: &Uuid, + _node_id: &str, + _connected_at: u64, + _version: &Option, + ) -> DbResult { + if let Some(mut user) = self.get_user(&uaid).await? { + user.node_id = None; + self.update_user(&mut user).await?; + } + Ok(true) + } + + /** + Write the notification to storage. + + If the message contains a topic, we remove the old message + */ + async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> { + let mut con = self.connection()?; + let msg_list_key = self.message_list_key(&uaid); + let msg_key = self.message_key(&uaid, &message.chidmessageid()); + // message.ttl is already min(headers.ttl, MAX_NOTIFICATION_TTL) + // see autoendpoint/src/extractors/notification_headers.rs + let opts = SetOptions::default().with_expiration(SetExpiry::EX(message.ttl)); + + debug!("🗄️ Saving message {} :: {:?}", &msg_key, &message); + trace!( + "🉑 timestamp: {:?}", + &message.timestamp.to_be_bytes().to_vec() + ); + + // Remember, `timestamp` is effectively the time to kill the message, not the + // current time. + let expiry = SystemTime::now() + Duration::from_secs(message.ttl); + trace!( + "🉑 Message Expiry {}", + expiry + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + ); + + let mut pipe = redis::pipe(); + + let is_topic = if let Some(topic) = &message.topic { + let topic_key = self.topic_key(&uaid, &message.channel_id, &topic); + // We check if a message is already saved for this topic + let old_msg_id: Option = con.get(&topic_key).unwrap(); + // If a message is already stored for that topic, we remove it + if let Some(id) = old_msg_id { + trace!("🉑 The topic had a message: {}", &id); + pipe.zrem(&msg_list_key, &id).ignore(); + pipe.del(self.message_key(&uaid, &id)).ignore(); + } + // Setting the key replace the old one if any + pipe.set_options(&topic_key, &message.chidmessageid(), opts) + .ignore(); + true + } else { + false + }; + + // Store notification record in autopush/msg/{aud}/{chidmessageid} + // And store {chidmessageid} in autopush/msgs/{aud} + let msg_id = &message.chidmessageid(); + let msg_key = self.message_key(&uaid, &msg_id); + pipe.set_options( + msg_key, + serde_json::to_string(&NotificationRecord::from_notif(&uaid, message)).unwrap(), + opts, + ) + .ignore() + // The function [fecth_timestamp_messages] takes a timestamp in input, + // here we use the timestamp of the record (in ms) + .zadd(msg_list_key, &msg_id, ms_since_epoch()) + .ignore(); + + let _: () = pipe.exec(&mut con).unwrap(); + self.metrics + .incr_with_tags("notification.message.stored") + .with_tag("topic", &is_topic.to_string()) + .with_tag("database", &self.name()) + .send(); + Ok(()) + } + + /// Save a batch of messages to the database. + /// + /// Currently just iterating through the list and saving one at a time. There's a bulk way + /// to save messages, but there are other considerations (e.g. mutation limits) + async fn save_messages(&self, uaid: &Uuid, messages: Vec) -> DbResult<()> { + // plate simple way of solving this: + for message in messages { + self.save_message(uaid, message).await?; + } + Ok(()) + } + + /// Doesn't seem to be useful for redis + async fn increment_storage(&self, _uaid: &Uuid, _timestamp: u64) -> DbResult<()> { + Ok(()) + } + + /// Delete the notification from storage. + async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> { + trace!( + "🉑 attemping to delete {:?} :: {:?}", + uaid.to_string(), + chidmessageid + ); + let msg_key = self.message_key(&uaid, &chidmessageid); + let msg_list_key = self.message_list_key(&uaid); + debug!("🉑🔥 Deleting message {}", &msg_key); + let mut con = self.connection()?; + redis::pipe() + .zrem(&msg_list_key, &chidmessageid) + .ignore() + .del(&msg_key) + .ignore() + .exec(&mut con) + .unwrap(); + self.metrics + .incr_with_tags("notification.message.deleted") + .with_tag("database", &self.name()) + .send(); + Ok(()) + } + + /** + Topic messages are handled as other messages with redis, we return nothing. + */ + async fn fetch_topic_messages( + &self, + _uaid: &Uuid, + _limit: usize, + ) -> DbResult { + Ok(FetchMessageResponse { + messages: vec![], + timestamp: None, + }) + } + + /// Return [`limit`] messages pending for a [`uaid`] that have a record timestamp + /// after [`timestamp`] (millisecs). + /// + /// If [`limit`] = 0, we fetch all messages after [`timestamp`]. + /// + /// Note: Bigtable uses [`Notification.sortkey_timestamp`] instead, and the timestamp + /// is in seconds. + /// + /// This can return expired messages, following bigtables behavior + async fn fetch_timestamp_messages( + &self, + uaid: &Uuid, + timestamp: Option, + limit: usize, + ) -> DbResult { + trace!("Fecthing {} messages since {:?}", limit, timestamp); + let mut con = self.connection()?; + let msg_list_key = self.message_list_key(&uaid); + let (messages_id, mut scores): (Vec, Vec) = con + .zrangebyscore_limit_withscores::<&str, u64, &str, Vec<(String, u64)>>( + &msg_list_key, + timestamp.unwrap_or(0), + "+inf", + 0, + limit as isize, + ) + .unwrap() + .into_iter() + .map(|(id, s): (String, u64)| (self.message_key(&uaid, &id), s)) + .unzip(); + if messages_id.len() == 0 { + trace!("No message found"); + return Ok(FetchMessageResponse { + messages: vec![], + timestamp: None, + }); + } + let messages: Vec = if messages_id.len() == 0 { + vec![] + } else { + con.mget::<&Vec, Vec>>(&messages_id) + .unwrap() + .into_iter() + .filter_map(|opt: Option| { + if opt.is_none() { + // We return dummy expired event if we can't fetch the say event, + // it means the event has expired + Some(Notification { + timestamp: 1, + ..Default::default() + }) + } else { + opt.and_then(|m| serde_json::from_str(&m).ok()) + .and_then(|m: NotificationRecord| m.into_notif().ok()) + } + }) + .collect() + }; + let timestamp = scores.pop(); + trace!("Found {} messages until {:?}", messages.len(), timestamp); + Ok(FetchMessageResponse { + messages, + timestamp, + }) + } + + async fn health_check(&self) -> DbResult { + let status = self.connection()?.check_connection(); + Ok(status) + } + + /// Returns true, because there's no table in Redis + async fn router_table_exists(&self) -> DbResult { + Ok(true) + } + + /// Returns true, because there's no table in Redis + async fn message_table_exists(&self) -> DbResult { + Ok(true) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } + + fn name(&self) -> String { + "Redis".to_owned() + } + + fn pool_status(&self) -> Option { + None + } +} + +#[cfg(test)] +mod tests { + use crate::{logging::init_test_logging, util::ms_since_epoch}; + + use super::*; + const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB"; + const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB"; + const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB"; + + fn now() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + } + + fn new_client() -> DbResult { + let env_dsn = "redis://localhost".into(); // We force localhost to force test environment + let settings = DbSettings { + dsn: Some(env_dsn), + db_settings: "".into(), + }; + let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build()); + RedisClientImpl::new(metrics, &settings) + } + + #[actix_rt::test] + async fn health_check() { + let client = new_client().unwrap(); + + let result = client.health_check().await; + assert!(result.is_ok()); + assert!(result.unwrap()); + } + + /// run a gauntlet of testing. These are a bit linear because they need + /// to run in sequence. + #[actix_rt::test] + async fn run_gauntlet() -> DbResult<()> { + init_test_logging(); + let client = new_client()?; + + let connected_at = ms_since_epoch(); + + let uaid = Uuid::parse_str(TEST_USER).unwrap(); + let chid = Uuid::parse_str(TEST_CHID).unwrap(); + let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap(); + + let node_id = "test_node".to_owned(); + + // purge the user record if it exists. + let _ = client.remove_user(&uaid).await; + + let test_user = User { + uaid, + router_type: "webpush".to_owned(), + connected_at, + router_data: None, + node_id: Some(node_id.clone()), + ..Default::default() + }; + + // purge the old user (if present) + // in case a prior test failed for whatever reason. + let _ = client.remove_user(&uaid).await; + + // can we add the user? + client.add_user(&test_user).await?; + let fetched = client.get_user(&uaid).await?; + assert!(fetched.is_some()); + let fetched = fetched.unwrap(); + assert_eq!(fetched.router_type, "webpush".to_owned()); + + // Simulate a connected_at occuring before the following writes + let connected_at = ms_since_epoch(); + + // can we add channels? + client.add_channel(&uaid, &chid).await?; + let channels = client.get_channels(&uaid).await?; + assert!(channels.contains(&chid)); + + // can we add lots of channels? + let mut new_channels: HashSet = HashSet::new(); + new_channels.insert(chid); + for _ in 1..10 { + new_channels.insert(uuid::Uuid::new_v4()); + } + let chid_to_remove = uuid::Uuid::new_v4(); + new_channels.insert(chid_to_remove); + client.add_channels(&uaid, new_channels.clone()).await?; + let channels = client.get_channels(&uaid).await?; + assert_eq!(channels, new_channels); + + // can we remove a channel? + assert!(client.remove_channel(&uaid, &chid_to_remove).await?); + assert!(!client.remove_channel(&uaid, &chid_to_remove).await?); + new_channels.remove(&chid_to_remove); + let channels = client.get_channels(&uaid).await?; + assert_eq!(channels, new_channels); + + // now ensure that we can update a user that's after the time we set + // prior. first ensure that we can't update a user that's before the + // time we set prior to the last write + let mut updated = User { + connected_at, + ..test_user.clone() + }; + let result = client.update_user(&mut updated).await; + assert!(result.is_ok()); + assert!(!result.unwrap()); + + // Make sure that the `connected_at` wasn't modified + let fetched2 = client.get_user(&fetched.uaid).await?.unwrap(); + assert_eq!(fetched.connected_at, fetched2.connected_at); + + // and make sure we can update a record with a later connected_at time. + let mut updated = User { + connected_at: fetched.connected_at + 300, + ..fetched2 + }; + let result = client.update_user(&mut updated).await; + assert!(result.is_ok()); + assert!(result.unwrap()); + assert_ne!( + fetched2.connected_at, + client.get_user(&uaid).await?.unwrap().connected_at + ); + + // can we increment the storage for the user? + client + .increment_storage( + &fetched.uaid, + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + ) + .await?; + + let test_data = "An_encrypted_pile_of_crap".to_owned(); + let timestamp = now(); + let sort_key = now(); + // Unlike Bigtable, [fetch_timestamp_messages] uses and return a + // timestamp in milliseconds + let fetch_timestamp = ms_since_epoch(); + // Can we store a message? + let test_notification = crate::db::Notification { + channel_id: chid, + version: "test".to_owned(), + ttl: 300, + timestamp, + data: Some(test_data.clone()), + sortkey_timestamp: Some(sort_key), + ..Default::default() + }; + let res = client.save_message(&uaid, test_notification.clone()).await; + assert!(res.is_ok()); + + let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?; + assert_ne!(fetched.messages.len(), 0); + let fm = fetched.messages.pop().unwrap(); + assert_eq!(fm.channel_id, test_notification.channel_id); + assert_eq!(fm.data, Some(test_data)); + + // Grab all 1 of the messages that were submmited within the past 10 seconds. + let fetched = client + .fetch_timestamp_messages(&uaid, Some(fetch_timestamp - 10), 999) + .await?; + assert_ne!(fetched.messages.len(), 0); + + // Try grabbing a message for 10 seconds from now. + let fetched = client + .fetch_timestamp_messages(&uaid, Some(fetch_timestamp + 10), 999) + .await?; + assert_eq!(fetched.messages.len(), 0); + + // can we clean up our toys? + assert!(client + .remove_message(&uaid, &test_notification.chidmessageid()) + .await + .is_ok()); + + assert!(client.remove_channel(&uaid, &chid).await.is_ok()); + + let msgs = client + .fetch_timestamp_messages(&uaid, None, 999) + .await? + .messages; + assert!(msgs.is_empty()); + + // Now, can we do all that with topic messages + // Unlike bigtable, we don't use [fetch_topic_messages]: it always return None: + // they are handled as usuals messages. + client.add_channel(&uaid, &topic_chid).await?; + let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned(); + let timestamp = now(); + let sort_key = now(); + + // We store 2 messages, with a single topic + let test_notification_0 = crate::db::Notification { + channel_id: topic_chid, + version: "version0".to_owned(), + ttl: 300, + topic: Some("topic".to_owned()), + timestamp, + data: Some(test_data.clone()), + sortkey_timestamp: Some(sort_key), + ..Default::default() + }; + assert!(client + .save_message(&uaid, test_notification_0.clone()) + .await + .is_ok()); + + let test_notification = crate::db::Notification { + timestamp: now(), + version: "version1".to_owned(), + ..test_notification_0 + }; + + assert!(client + .save_message(&uaid, test_notification.clone()) + .await + .is_ok()); + + let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?; + assert_eq!(fetched.messages.len(), 1); + let fm = fetched.messages.pop().unwrap(); + assert_eq!(fm.channel_id, test_notification.channel_id); + assert_eq!(fm.data, Some(test_data)); + + // Grab the message that was submmited. + let fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?; + assert_ne!(fetched.messages.len(), 0); + + // can we clean up our toys? + assert!(client + .remove_message(&uaid, &test_notification.chidmessageid()) + .await + .is_ok()); + + assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok()); + + let msgs = client + .fetch_timestamp_messages(&uaid, None, 999) + .await? + .messages; + assert!(msgs.is_empty()); + + let fetched = client.get_user(&uaid).await?.unwrap(); + assert!(client + .remove_node_id(&uaid, &node_id, connected_at, &fetched.version) + .await + .is_ok()); + // did we remove it? + let fetched = client.get_user(&uaid).await?.unwrap(); + assert_eq!(fetched.node_id, None); + + assert!(client.remove_user(&uaid).await.is_ok()); + + assert!(client.get_user(&uaid).await?.is_none()); + + Ok(()) + } +} diff --git a/autopush-common/src/db/routing.rs b/autopush-common/src/db/routing.rs index 7999552a..ca40a87d 100644 --- a/autopush-common/src/db/routing.rs +++ b/autopush-common/src/db/routing.rs @@ -1,6 +1,7 @@ #[derive(Clone, Eq, PartialEq, Debug)] pub(crate) enum StorageType { BigTable, + Redis, None, } @@ -8,6 +9,8 @@ impl Default for StorageType { fn default() -> StorageType { if cfg!(feature = "bigtable") { StorageType::BigTable + } else if cfg!(feature = "redis") { + StorageType::Redis } else { StorageType::None } @@ -18,6 +21,7 @@ impl From<&str> for StorageType { fn from(str: &str) -> StorageType { match str.to_lowercase().as_str() { "bigtable" => StorageType::BigTable, + "redis" => StorageType::Redis, _ => { warn!("Using default StorageType for {str}"); StorageType::default() diff --git a/autopush-common/src/notification.rs b/autopush-common/src/notification.rs index ecbb7f42..3889a39c 100644 --- a/autopush-common/src/notification.rs +++ b/autopush-common/src/notification.rs @@ -47,6 +47,9 @@ impl Notification { /// {chid}:{message_id} pub fn chidmessageid(&self) -> String { let chid = self.channel_id.as_hyphenated(); + #[cfg(feature = "redis")] + return format!("{}:{}", chid, self.version); + if let Some(ref topic) = self.topic { format!("{TOPIC_NOTIFICATION_PREFIX}:{chid}:{topic}") } else if let Some(sortkey_timestamp) = self.sortkey_timestamp { From 26590437a8a5b4a13a623d2a5853a5c6c67b2564 Mon Sep 17 00:00:00 2001 From: sim Date: Tue, 17 Dec 2024 16:50:34 +0100 Subject: [PATCH 03/21] Add docker-compose to self host the project --- Dockerfile | 3 ++- docker-compose.yml | 51 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 docker-compose.yml diff --git a/Dockerfile b/Dockerfile index 21e3ffa9..69f3ba86 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,6 +2,7 @@ # RUST_VER FROM rust:1.81-bookworm AS builder ARG CRATE +ARG BUILD_ARGS ADD . /app WORKDIR /app @@ -16,7 +17,7 @@ RUN \ cargo --version && \ rustc --version && \ mkdir -m 755 bin && \ - cargo install --path $CRATE --locked --root /app + cargo install --path $CRATE $BUILD_ARGS --locked --root /app FROM debian:bookworm-slim diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..6819ea32 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,51 @@ +services: + autoconnect: + build: + context: . + args: + BUILD_ARGS: "--no-default-features --features redis" + CRATE: autoconnect + BINARY: autoconnect + environment: + - "AUTOCONNECT__DB_DSN=redis://redis" + - "AUTOCONNECT__CRYPTO_KEY=[tlLWgjoAT-vV4q0nR0uiU3ANhI5uQ10GH2fKCgWrxaU=]" # Replace with output of `./scripts/fernet_key.py` + - "AUTOCONNECT__ENDPOINT_SCHEME=http" # The ENDPOINT* var are for the public facing autoendpoint url + - "AUTOCONNECT__ENDPOINT_HOSTNAME=localhost" + - "AUTOCONNECT__ENDPOINT_PORT=8000" + - "AUTOCONNECT__ROUTER_HOSTNAME=autoconnect" # This is used by autoendpoint to reach this autoconnect + - "RUST_BACKTRACE=1" + - "RUST_LOG=trace" + ports: + - "8080:8080" + - "8081:8081" + depends_on: + - redis + + + autoendpoint: + build: + context: . + args: + BUILD_ARGS: "--no-default-features --features redis" + CRATE: autoendpoint + BINARY: autoendpoint + environment: + - "AUTOEND__DB_DSN=redis://redis" + - 'AUTOEND__CRYPTO_KEYS=[tlLWgjoAT-vV4q0nR0uiU3ANhI5uQ10GH2fKCgWrxaU=]' # This is the same value as AUTOCONNECT__CRYPTO_KEY + - "RUST_BACKTRACE=1" + - "RUST_LOG=trace" + - "AUTOEND__HOST=0.0.0.0" # autoendpoint must listen on 0.0.0.0 with docker + - "AUTOEND__PORT=8000" # This is the port we listen on + - "AUTOEND__ENDPOINT_URL=http://localhost:8000" # This is the public facing url to reach autoendpoint + ports: + - "8000:8000" + depends_on: + - redis + - autoconnect + + redis: + image: redis:latest + restart: unless-stopped + command: redis-server + ports: + - "6379:6379" From 7118d5c721733c041ed1471c48fed2cf5b5d4244 Mon Sep 17 00:00:00 2001 From: sim Date: Thu, 19 Dec 2024 08:53:30 +0000 Subject: [PATCH 04/21] Remove useless ignore() --- .../src/db/redis/redis_client/mod.rs | 23 ++++--------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 0231c74c..0081b776 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -134,13 +134,11 @@ impl DbClient for RedisClientImpl { let co_key = self.last_co_key(&user.uaid); let _: () = redis::pipe() .set_options(co_key, ms_since_epoch(), self.redis_opts) - .ignore() .set_options( user_key, serde_json::to_string(user).unwrap(), self.redis_opts, ) - .ignore() .exec(&mut con) .unwrap(); Ok(()) @@ -196,13 +194,9 @@ impl DbClient for RedisClientImpl { let msg_list_key = self.message_list_key(&uaid); redis::pipe() .del(&user_key) - .ignore() .del(&co_key) - .ignore() .del(&chan_list_key) - .ignore() .del(&msg_list_key) - .ignore() .exec(&mut con) .unwrap(); Ok(()) @@ -215,9 +209,7 @@ impl DbClient for RedisClientImpl { let _: () = redis::pipe() .rpush(chan_list_key, channel_id.as_hyphenated().to_string()) - .ignore() .set_options(co_key, ms_since_epoch(), self.redis_opts) - .ignore() .exec(&mut con) .unwrap(); Ok(()) @@ -231,7 +223,6 @@ impl DbClient for RedisClientImpl { let chan_list_key = self.channel_list_key(&uaid); redis::pipe() .set_options(co_key, ms_since_epoch(), self.redis_opts) - .ignore() .rpush( chan_list_key, channels @@ -239,7 +230,6 @@ impl DbClient for RedisClientImpl { .map(|c| c.as_hyphenated().to_string()) .collect::>(), ) - .ignore() .exec(&mut con) .unwrap(); Ok(()) @@ -328,12 +318,11 @@ impl DbClient for RedisClientImpl { // If a message is already stored for that topic, we remove it if let Some(id) = old_msg_id { trace!("🉑 The topic had a message: {}", &id); - pipe.zrem(&msg_list_key, &id).ignore(); - pipe.del(self.message_key(&uaid, &id)).ignore(); + pipe.zrem(&msg_list_key, &id) + .del(self.message_key(&uaid, &id)); } // Setting the key replace the old one if any - pipe.set_options(&topic_key, &message.chidmessageid(), opts) - .ignore(); + pipe.set_options(&topic_key, &message.chidmessageid(), opts); true } else { false @@ -348,11 +337,9 @@ impl DbClient for RedisClientImpl { serde_json::to_string(&NotificationRecord::from_notif(&uaid, message)).unwrap(), opts, ) - .ignore() // The function [fecth_timestamp_messages] takes a timestamp in input, // here we use the timestamp of the record (in ms) - .zadd(msg_list_key, &msg_id, ms_since_epoch()) - .ignore(); + .zadd(&msg_list_key, &msg_id, ms_since_epoch()); let _: () = pipe.exec(&mut con).unwrap(); self.metrics @@ -393,9 +380,7 @@ impl DbClient for RedisClientImpl { let mut con = self.connection()?; redis::pipe() .zrem(&msg_list_key, &chidmessageid) - .ignore() .del(&msg_key) - .ignore() .exec(&mut con) .unwrap(); self.metrics From 32098b3baf404390f4c9f6b41edf8d71cf9c3031 Mon Sep 17 00:00:00 2001 From: sim Date: Thu, 19 Dec 2024 09:21:04 +0000 Subject: [PATCH 05/21] Wipe expired messages --- autopush-common/src/db/redis/mod.rs | 1 + .../src/db/redis/redis_client/mod.rs | 103 +++++++++++++++--- 2 files changed, 91 insertions(+), 13 deletions(-) diff --git a/autopush-common/src/db/redis/mod.rs b/autopush-common/src/db/redis/mod.rs index bfce28bc..f906a0b9 100644 --- a/autopush-common/src/db/redis/mod.rs +++ b/autopush-common/src/db/redis/mod.rs @@ -6,6 +6,7 @@ /// `autopush/co/{uaid}` u64 to store the last time the user has interacted with the server /// `autopush/channels/{uaid}` List to store the list of the channels of the user /// `autopush/msgs/{uaid}` SortedSet to store the list of the pending message ids for the user +/// `autopush/msgs_exp/{uaid}` SortedSet to store the list of the pending message ids, ordered by expiry date, this is because SortedSet elements can't have independant expiry date /// `autopush/msg/{uaid}/{chidmessageid}`, with `{chidmessageid} == {chid}:{version}` String to store /// the content of the messages /// `autopush/topic/{uaid}/{chid}/{topic}` String to store the (last) message id of a given topic diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 0081b776..6ebba652 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -109,6 +109,10 @@ impl RedisClientImpl { format!("autopush/msgs/{}", uaid.as_hyphenated()) } + fn message_exp_list_key(&self, uaid: &Uuid) -> String { + format!("autopush/msgs_exp/{}", uaid.as_hyphenated()) + } + fn message_key(&self, uaid: &Uuid, chidmessageid: &str) -> String { format!("autopush/msg/{}/{}", uaid.as_hyphenated(), chidmessageid) } @@ -192,11 +196,13 @@ impl DbClient for RedisClientImpl { let co_key = self.last_co_key(&uaid); let chan_list_key = self.channel_list_key(&uaid); let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); redis::pipe() .del(&user_key) .del(&co_key) .del(&chan_list_key) .del(&msg_list_key) + .del(&exp_list_key) .exec(&mut con) .unwrap(); Ok(()) @@ -287,6 +293,7 @@ impl DbClient for RedisClientImpl { async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> { let mut con = self.connection()?; let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); let msg_key = self.message_key(&uaid, &message.chidmessageid()); // message.ttl is already min(headers.ttl, MAX_NOTIFICATION_TTL) // see autoendpoint/src/extractors/notification_headers.rs @@ -300,14 +307,11 @@ impl DbClient for RedisClientImpl { // Remember, `timestamp` is effectively the time to kill the message, not the // current time. - let expiry = SystemTime::now() + Duration::from_secs(message.ttl); - trace!( - "🉑 Message Expiry {}", - expiry - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() - ); + let expiry = (SystemTime::now() + Duration::from_secs(message.ttl)) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + trace!("🉑 Message Expiry {}", expiry); let mut pipe = redis::pipe(); @@ -318,8 +322,11 @@ impl DbClient for RedisClientImpl { // If a message is already stored for that topic, we remove it if let Some(id) = old_msg_id { trace!("🉑 The topic had a message: {}", &id); - pipe.zrem(&msg_list_key, &id) - .del(self.message_key(&uaid, &id)); + // We remove the id from the exp list at the end, to be sure + // it can't be removed from the list before the message is removed + pipe.del(self.message_key(&uaid, &id)) + .zrem(&msg_list_key, &id) + .zrem(&exp_list_key, &id); } // Setting the key replace the old one if any pipe.set_options(&topic_key, &message.chidmessageid(), opts); @@ -339,6 +346,7 @@ impl DbClient for RedisClientImpl { ) // The function [fecth_timestamp_messages] takes a timestamp in input, // here we use the timestamp of the record (in ms) + .zadd(&exp_list_key, &msg_id, expiry) .zadd(&msg_list_key, &msg_id, ms_since_epoch()); let _: () = pipe.exec(&mut con).unwrap(); @@ -362,8 +370,22 @@ impl DbClient for RedisClientImpl { Ok(()) } - /// Doesn't seem to be useful for redis - async fn increment_storage(&self, _uaid: &Uuid, _timestamp: u64) -> DbResult<()> { + /// Delete expired messages + async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> { + debug!("🉑🔥 Incrementing storage to {}", timestamp); + let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); + let mut con = self.connection()?; + let exp_id_list: Vec = con.zrangebyscore(&exp_list_key, 0, timestamp).unwrap(); + if exp_id_list.len() > 0 { + trace!("🉑🔥 Deleting {} expired msgs", exp_id_list.len()); + redis::pipe() + .del(&exp_id_list) + .zrem(&msg_list_key, &exp_id_list) + .zrem(&exp_list_key, &exp_id_list) + .exec(&mut con) + .unwrap(); + } Ok(()) } @@ -376,11 +398,15 @@ impl DbClient for RedisClientImpl { ); let msg_key = self.message_key(&uaid, &chidmessageid); let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); debug!("🉑🔥 Deleting message {}", &msg_key); let mut con = self.connection()?; + // We remove the id from the exp list at the end, to be sure + // it can't be removed from the list before the message is removed redis::pipe() - .zrem(&msg_list_key, &chidmessageid) .del(&msg_key) + .zrem(&msg_list_key, &chidmessageid) + .zrem(&exp_list_key, &chidmessageid) .exec(&mut con) .unwrap(); self.metrics @@ -533,6 +559,57 @@ mod tests { assert!(result.unwrap()); } + /// Test if [increment_storage] correctly wipe expired messages + #[actix_rt::test] + async fn wipe_expired() -> DbResult<()> { + init_test_logging(); + let client = new_client()?; + + let connected_at = ms_since_epoch(); + + let uaid = Uuid::parse_str(TEST_USER).unwrap(); + let chid = Uuid::parse_str(TEST_CHID).unwrap(); + + let node_id = "test_node".to_owned(); + + // purge the user record if it exists. + let _ = client.remove_user(&uaid).await; + + let test_user = User { + uaid, + router_type: "webpush".to_owned(), + connected_at, + router_data: None, + node_id: Some(node_id.clone()), + ..Default::default() + }; + + // purge the old user (if present) + // in case a prior test failed for whatever reason. + let _ = client.remove_user(&uaid).await; + + // can we add the user? + let timestamp = now(); + let fetch_timestamp = ms_since_epoch(); + client.add_user(&test_user).await?; + let test_notification = crate::db::Notification { + channel_id: chid, + version: "test".to_owned(), + ttl: 1, + timestamp, + data: Some("Encrypted".into()), + sortkey_timestamp: Some(timestamp), + ..Default::default() + }; + client.save_message(&uaid, test_notification).await?; + client + .increment_storage(&uaid, fetch_timestamp + 10000) + .await?; + let msgs = client.fetch_timestamp_messages(&uaid, None, 999).await?; + assert_eq!(msgs.messages.len(), 0); + Ok(()) + } + /// run a gauntlet of testing. These are a bit linear because they need /// to run in sequence. #[actix_rt::test] From 7df553360b25287cf058f629c7d30e6d264923d5 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 11:22:47 +0000 Subject: [PATCH 06/21] Don't use `unwrap()` --- autopush-common/src/db/error.rs | 4 ++ .../src/db/redis/redis_client/error.rs | 7 +++ .../src/db/redis/redis_client/mod.rs | 57 +++++++------------ 3 files changed, 33 insertions(+), 35 deletions(-) create mode 100644 autopush-common/src/db/redis/redis_client/error.rs diff --git a/autopush-common/src/db/error.rs b/autopush-common/src/db/error.rs index a30dc915..2bb82895 100644 --- a/autopush-common/src/db/error.rs +++ b/autopush-common/src/db/error.rs @@ -26,6 +26,10 @@ pub enum DbError { #[error("BigTable error: {0}")] BTError(#[from] BigTableError), + #[cfg(feature = "redis")] + #[error("Redis error {0}")] + RedisError(#[from] redis::RedisError), + #[error("Connection failure: {0}")] ConnectionError(String), diff --git a/autopush-common/src/db/redis/redis_client/error.rs b/autopush-common/src/db/redis/redis_client/error.rs new file mode 100644 index 00000000..2513eca8 --- /dev/null +++ b/autopush-common/src/db/redis/redis_client/error.rs @@ -0,0 +1,7 @@ +use crate::db::error::DbError; + +impl From for DbError { + fn from(err: serde_json::Error) -> Self { + DbError::Serialization(err.to_string()) + } +} diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 6ebba652..12ccc73b 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -18,6 +18,8 @@ use crate::db::{ }; use crate::util::ms_since_epoch; +mod error; + use super::RedisDbSettings; /// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently. @@ -55,7 +57,7 @@ impl RedisClientImpl { .dsn .clone() .ok_or(DbError::General("Could not find DSN".to_owned()))?; - let client = redis::Client::open(dsn).unwrap(); + let client = redis::Client::open(dsn)?; let db_settings = RedisDbSettings::try_from(settings.db_settings.as_ref())?; info!("🉑 {:#?}", db_settings); @@ -138,13 +140,8 @@ impl DbClient for RedisClientImpl { let co_key = self.last_co_key(&user.uaid); let _: () = redis::pipe() .set_options(co_key, ms_since_epoch(), self.redis_opts) - .set_options( - user_key, - serde_json::to_string(user).unwrap(), - self.redis_opts, - ) - .exec(&mut con) - .unwrap(); + .set_options(user_key, serde_json::to_string(user)?, self.redis_opts) + .exec(&mut con)?; Ok(()) } @@ -163,7 +160,7 @@ impl DbClient for RedisClientImpl { trace!("🉑 Updating user"); let mut con = self.connection()?; let co_key = self.last_co_key(&user.uaid); - let last_co: Option = con.get(&co_key).unwrap(); + let last_co: Option = con.get(&co_key)?; if last_co.is_some_and(|c| c < user.connected_at) { trace!( "🉑 Was connected at {}, now at {}", @@ -181,9 +178,8 @@ impl DbClient for RedisClientImpl { let mut con = self.connection()?; let user_key = self.user_key(uaid); let user: Option = con - .get::<&str, Option>(&user_key) - .unwrap() - .and_then(|s| serde_json::from_str(s.as_ref()).unwrap()); + .get::<&str, Option>(&user_key)? + .and_then(|s| serde_json::from_str(s.as_ref()).ok()); if user.is_some() { trace!("🉑 Found a record for {}", &uaid); } @@ -203,8 +199,7 @@ impl DbClient for RedisClientImpl { .del(&chan_list_key) .del(&msg_list_key) .del(&exp_list_key) - .exec(&mut con) - .unwrap(); + .exec(&mut con)?; Ok(()) } @@ -216,8 +211,7 @@ impl DbClient for RedisClientImpl { let _: () = redis::pipe() .rpush(chan_list_key, channel_id.as_hyphenated().to_string()) .set_options(co_key, ms_since_epoch(), self.redis_opts) - .exec(&mut con) - .unwrap(); + .exec(&mut con)?; Ok(()) } @@ -236,8 +230,7 @@ impl DbClient for RedisClientImpl { .map(|c| c.as_hyphenated().to_string()) .collect::>(), ) - .exec(&mut con) - .unwrap(); + .exec(&mut con)?; Ok(()) } @@ -245,10 +238,9 @@ impl DbClient for RedisClientImpl { let mut con = self.connection()?; let chan_list_key = self.channel_list_key(&uaid); let channels: HashSet = con - .lrange::<&str, HashSet>(&chan_list_key, 0, -1) - .unwrap() + .lrange::<&str, HashSet>(&chan_list_key, 0, -1)? .into_iter() - .map(|s| Uuid::from_str(&s).unwrap()) + .filter_map(|s| Uuid::from_str(&s).ok()) .collect(); trace!("🉑 Found {} channels for {}", channels.len(), &uaid); Ok(channels) @@ -265,8 +257,7 @@ impl DbClient for RedisClientImpl { .set_options(co_key, ms_since_epoch(), self.redis_opts) .ignore() .lrem(&chan_list_key, 1, channel_id.as_hyphenated().to_string()) - .query(&mut con) - .unwrap(); + .query(&mut con)?; Ok(status) } @@ -318,7 +309,7 @@ impl DbClient for RedisClientImpl { let is_topic = if let Some(topic) = &message.topic { let topic_key = self.topic_key(&uaid, &message.channel_id, &topic); // We check if a message is already saved for this topic - let old_msg_id: Option = con.get(&topic_key).unwrap(); + let old_msg_id: Option = con.get(&topic_key)?; // If a message is already stored for that topic, we remove it if let Some(id) = old_msg_id { trace!("🉑 The topic had a message: {}", &id); @@ -341,7 +332,7 @@ impl DbClient for RedisClientImpl { let msg_key = self.message_key(&uaid, &msg_id); pipe.set_options( msg_key, - serde_json::to_string(&NotificationRecord::from_notif(&uaid, message)).unwrap(), + serde_json::to_string(&NotificationRecord::from_notif(&uaid, message))?, opts, ) // The function [fecth_timestamp_messages] takes a timestamp in input, @@ -349,7 +340,7 @@ impl DbClient for RedisClientImpl { .zadd(&exp_list_key, &msg_id, expiry) .zadd(&msg_list_key, &msg_id, ms_since_epoch()); - let _: () = pipe.exec(&mut con).unwrap(); + let _: () = pipe.exec(&mut con)?; self.metrics .incr_with_tags("notification.message.stored") .with_tag("topic", &is_topic.to_string()) @@ -376,15 +367,14 @@ impl DbClient for RedisClientImpl { let msg_list_key = self.message_list_key(&uaid); let exp_list_key = self.message_exp_list_key(&uaid); let mut con = self.connection()?; - let exp_id_list: Vec = con.zrangebyscore(&exp_list_key, 0, timestamp).unwrap(); + let exp_id_list: Vec = con.zrangebyscore(&exp_list_key, 0, timestamp)?; if exp_id_list.len() > 0 { trace!("🉑🔥 Deleting {} expired msgs", exp_id_list.len()); redis::pipe() .del(&exp_id_list) .zrem(&msg_list_key, &exp_id_list) .zrem(&exp_list_key, &exp_id_list) - .exec(&mut con) - .unwrap(); + .exec(&mut con)?; } Ok(()) } @@ -407,8 +397,7 @@ impl DbClient for RedisClientImpl { .del(&msg_key) .zrem(&msg_list_key, &chidmessageid) .zrem(&exp_list_key, &chidmessageid) - .exec(&mut con) - .unwrap(); + .exec(&mut con)?; self.metrics .incr_with_tags("notification.message.deleted") .with_tag("database", &self.name()) @@ -455,8 +444,7 @@ impl DbClient for RedisClientImpl { "+inf", 0, limit as isize, - ) - .unwrap() + )? .into_iter() .map(|(id, s): (String, u64)| (self.message_key(&uaid, &id), s)) .unzip(); @@ -470,8 +458,7 @@ impl DbClient for RedisClientImpl { let messages: Vec = if messages_id.len() == 0 { vec![] } else { - con.mget::<&Vec, Vec>>(&messages_id) - .unwrap() + con.mget::<&Vec, Vec>>(&messages_id)? .into_iter() .filter_map(|opt: Option| { if opt.is_none() { From f676ee7db5dec0061111cf2c5370d63a3e457b7c Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 11:26:53 +0000 Subject: [PATCH 07/21] Move redis import under feature flag --- autoconnect/autoconnect-settings/src/app_state.rs | 4 +++- autoendpoint/src/server.rs | 7 +++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/autoconnect/autoconnect-settings/src/app_state.rs b/autoconnect/autoconnect-settings/src/app_state.rs index 1510571d..b62bef93 100644 --- a/autoconnect/autoconnect-settings/src/app_state.rs +++ b/autoconnect/autoconnect-settings/src/app_state.rs @@ -2,6 +2,8 @@ use std::{sync::Arc, time::Duration}; #[cfg(feature = "bigtable")] use autopush_common::db::bigtable::BigTableClientImpl; +#[cfg(feature = "redis")] +use autopush_common::db::redis::RedisClientImpl; use cadence::StatsdClient; use config::ConfigError; use fernet::{Fernet, MultiFernet}; @@ -11,7 +13,7 @@ use autoconnect_common::{ broadcast::BroadcastChangeTracker, megaphone::init_and_spawn_megaphone_updater, registry::ClientRegistry, }; -use autopush_common::db::{client::DbClient, redis::RedisClientImpl, DbSettings, StorageType}; +use autopush_common::db::{client::DbClient, DbSettings, StorageType}; use crate::{Settings, ENV_PREFIX}; diff --git a/autoendpoint/src/server.rs b/autoendpoint/src/server.rs index 09afc5ba..a05f3fbe 100644 --- a/autoendpoint/src/server.rs +++ b/autoendpoint/src/server.rs @@ -13,11 +13,10 @@ use serde_json::json; #[cfg(feature = "bigtable")] use autopush_common::db::bigtable::BigTableClientImpl; +#[cfg(feature = "redis")] +use autopush_common::db::redis::RedisClientImpl; use autopush_common::{ - db::{ - client::DbClient, redis::RedisClientImpl, spawn_pool_periodic_reporter, DbSettings, - StorageType, - }, + db::{client::DbClient, spawn_pool_periodic_reporter, DbSettings, StorageType}, middleware::sentry::SentryWrapper, }; From e86452c1987056b6481ca8a3e99da133dac712e9 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 11:51:57 +0000 Subject: [PATCH 08/21] Actually use Uaid and Chanid wrappers --- .../src/db/redis/redis_client/mod.rs | 85 ++++++++++++------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 12ccc73b..3fa68661 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -23,18 +23,31 @@ mod error; use super::RedisDbSettings; /// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently. -// TODO:Should we create something similar for ChannelID? -struct Uaid(Uuid); +struct Uaid<'a>(&'a Uuid); -impl Display for Uaid { +impl<'a> Display for Uaid<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0.as_simple()) + write!(f, "{}", self.0.as_hyphenated()) } } -impl From for String { +impl<'a> From> for String { fn from(uaid: Uaid) -> String { - uaid.0.as_simple().to_string() + uaid.0.as_hyphenated().to_string() + } +} + +struct Chanid<'a>(&'a Uuid); + +impl<'a> Display for Chanid<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0.as_hyphenated()) + } +} + +impl<'a> From> for String { + fn from(uaid: Chanid) -> String { + uaid.0.as_hyphenated().to_string() } } @@ -94,38 +107,33 @@ impl RedisClientImpl { } } - fn user_key(&self, uaid: &Uuid) -> String { - format!("autopush/user/{}", uaid.as_hyphenated()) + fn user_key(&self, uaid: &Uaid) -> String { + format!("autopush/user/{}", uaid) } /// This store the last connection record, but doesn't update User - fn last_co_key(&self, uaid: &Uuid) -> String { - format!("autopush/co/{}", uaid.as_hyphenated()) + fn last_co_key(&self, uaid: &Uaid) -> String { + format!("autopush/co/{}", uaid) } - fn channel_list_key(&self, uaid: &Uuid) -> String { - format!("autopush/channels/{}", uaid.as_hyphenated()) + fn channel_list_key(&self, uaid: &Uaid) -> String { + format!("autopush/channels/{}", uaid) } - fn message_list_key(&self, uaid: &Uuid) -> String { - format!("autopush/msgs/{}", uaid.as_hyphenated()) + fn message_list_key(&self, uaid: &Uaid) -> String { + format!("autopush/msgs/{}", uaid) } - fn message_exp_list_key(&self, uaid: &Uuid) -> String { - format!("autopush/msgs_exp/{}", uaid.as_hyphenated()) + fn message_exp_list_key(&self, uaid: &Uaid) -> String { + format!("autopush/msgs_exp/{}", uaid) } - fn message_key(&self, uaid: &Uuid, chidmessageid: &str) -> String { - format!("autopush/msg/{}/{}", uaid.as_hyphenated(), chidmessageid) + fn message_key(&self, uaid: &Uaid, chidmessageid: &str) -> String { + format!("autopush/msg/{}/{}", uaid, chidmessageid) } - fn topic_key(&self, uaid: &Uuid, chan_id: &Uuid, topic: &str) -> String { - format!( - "autopush/topic/{}/{}/{}", - uaid.as_hyphenated(), - chan_id.as_hyphenated(), - topic - ) + fn topic_key(&self, uaid: &Uaid, chan_id: &Chanid, topic: &str) -> String { + format!("autopush/topic/{}/{}/{}", uaid, chan_id, topic) } } @@ -136,8 +144,9 @@ impl DbClient for RedisClientImpl { trace!("🉑 Adding user"); trace!("Logged at {}", &user.connected_at); let mut con = self.connection()?; - let user_key = self.user_key(&user.uaid); - let co_key = self.last_co_key(&user.uaid); + let uaid = Uaid(&user.uaid); + let user_key = self.user_key(&uaid); + let co_key = self.last_co_key(&uaid); let _: () = redis::pipe() .set_options(co_key, ms_since_epoch(), self.redis_opts) .set_options(user_key, serde_json::to_string(user)?, self.redis_opts) @@ -159,7 +168,7 @@ impl DbClient for RedisClientImpl { async fn update_user(&self, user: &mut User) -> DbResult { trace!("🉑 Updating user"); let mut con = self.connection()?; - let co_key = self.last_co_key(&user.uaid); + let co_key = self.last_co_key(&Uaid(&user.uaid)); let last_co: Option = con.get(&co_key)?; if last_co.is_some_and(|c| c < user.connected_at) { trace!( @@ -176,7 +185,7 @@ impl DbClient for RedisClientImpl { async fn get_user(&self, uaid: &Uuid) -> DbResult> { let mut con = self.connection()?; - let user_key = self.user_key(uaid); + let user_key = self.user_key(&Uaid(uaid)); let user: Option = con .get::<&str, Option>(&user_key)? .and_then(|s| serde_json::from_str(s.as_ref()).ok()); @@ -187,6 +196,7 @@ impl DbClient for RedisClientImpl { } async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> { + let uaid = Uaid(uaid); let mut con = self.connection()?; let user_key = self.user_key(&uaid); let co_key = self.last_co_key(&uaid); @@ -204,6 +214,7 @@ impl DbClient for RedisClientImpl { } async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> { + let uaid = Uaid(uaid); let mut con = self.connection()?; let co_key = self.last_co_key(&uaid); let chan_list_key = self.channel_list_key(&uaid); @@ -217,6 +228,7 @@ impl DbClient for RedisClientImpl { /// Add channels in bulk (used mostly during migration) async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()> { + let uaid = Uaid(uaid); // channel_ids are stored as a set within a single redis key let mut con = self.connection()?; let co_key = self.last_co_key(&uaid); @@ -235,6 +247,7 @@ impl DbClient for RedisClientImpl { } async fn get_channels(&self, uaid: &Uuid) -> DbResult> { + let uaid = Uaid(uaid); let mut con = self.connection()?; let chan_list_key = self.channel_list_key(&uaid); let channels: HashSet = con @@ -248,15 +261,17 @@ impl DbClient for RedisClientImpl { /// Delete the channel. Does not delete its associated pending messages. async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult { + let uaid = Uaid(uaid); + let channel_id = Chanid(channel_id); let mut con = self.connection()?; let co_key = self.last_co_key(&uaid); let chan_list_key = self.channel_list_key(&uaid); // Remove {channel_id} from autopush/channel/{auid} - trace!("🉑 Removing channel {}", channel_id.as_hyphenated()); + trace!("🉑 Removing channel {}", channel_id); let (status,): (bool,) = redis::pipe() .set_options(co_key, ms_since_epoch(), self.redis_opts) .ignore() - .lrem(&chan_list_key, 1, channel_id.as_hyphenated().to_string()) + .lrem(&chan_list_key, 1, channel_id.to_string()) .query(&mut con)?; Ok(status) } @@ -282,6 +297,7 @@ impl DbClient for RedisClientImpl { If the message contains a topic, we remove the old message */ async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> { + let uaid = Uaid(uaid); let mut con = self.connection()?; let msg_list_key = self.message_list_key(&uaid); let exp_list_key = self.message_exp_list_key(&uaid); @@ -307,7 +323,7 @@ impl DbClient for RedisClientImpl { let mut pipe = redis::pipe(); let is_topic = if let Some(topic) = &message.topic { - let topic_key = self.topic_key(&uaid, &message.channel_id, &topic); + let topic_key = self.topic_key(&uaid, &Chanid(&message.channel_id), &topic); // We check if a message is already saved for this topic let old_msg_id: Option = con.get(&topic_key)?; // If a message is already stored for that topic, we remove it @@ -332,7 +348,7 @@ impl DbClient for RedisClientImpl { let msg_key = self.message_key(&uaid, &msg_id); pipe.set_options( msg_key, - serde_json::to_string(&NotificationRecord::from_notif(&uaid, message))?, + serde_json::to_string(&NotificationRecord::from_notif(&uaid.0, message))?, opts, ) // The function [fecth_timestamp_messages] takes a timestamp in input, @@ -363,6 +379,7 @@ impl DbClient for RedisClientImpl { /// Delete expired messages async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> { + let uaid = Uaid(uaid); debug!("🉑🔥 Incrementing storage to {}", timestamp); let msg_list_key = self.message_list_key(&uaid); let exp_list_key = self.message_exp_list_key(&uaid); @@ -381,6 +398,7 @@ impl DbClient for RedisClientImpl { /// Delete the notification from storage. async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> { + let uaid = Uaid(uaid); trace!( "🉑 attemping to delete {:?} :: {:?}", uaid.to_string(), @@ -434,6 +452,7 @@ impl DbClient for RedisClientImpl { timestamp: Option, limit: usize, ) -> DbResult { + let uaid = Uaid(uaid); trace!("Fecthing {} messages since {:?}", limit, timestamp); let mut con = self.connection()?; let msg_list_key = self.message_list_key(&uaid); From 4529e74c66c24e6a01d2a7fabeaf5079599a2661 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 12:03:45 +0000 Subject: [PATCH 09/21] Change emoji for redis logs --- .../src/db/redis/redis_client/mod.rs | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 3fa68661..908434fd 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -65,14 +65,14 @@ pub struct RedisClientImpl { impl RedisClientImpl { pub fn new(metrics: Arc, settings: &DbSettings) -> DbResult { // let env = Arc::new(EnvBuilder::new().build()); - debug!("🏊 BT Pool new"); + debug!("🐰 New redis client"); let dsn = settings .dsn .clone() .ok_or(DbError::General("Could not find DSN".to_owned()))?; let client = redis::Client::open(dsn)?; let db_settings = RedisDbSettings::try_from(settings.db_settings.as_ref())?; - info!("🉑 {:#?}", db_settings); + info!("🐰 {:#?}", db_settings); Ok(Self { client, @@ -141,8 +141,8 @@ impl RedisClientImpl { impl DbClient for RedisClientImpl { /// add user to the database async fn add_user(&self, user: &User) -> DbResult<()> { - trace!("🉑 Adding user"); - trace!("Logged at {}", &user.connected_at); + trace!("🐰 Adding user"); + trace!("🐰 Logged at {}", &user.connected_at); let mut con = self.connection()?; let uaid = Uaid(&user.uaid); let user_key = self.user_key(&uaid); @@ -166,13 +166,13 @@ impl DbClient for RedisClientImpl { /// somehow remain connected the duration of MAX_ROUTER_TTL, may be dropped as not being /// "lively". async fn update_user(&self, user: &mut User) -> DbResult { - trace!("🉑 Updating user"); + trace!("🐰 Updating user"); let mut con = self.connection()?; let co_key = self.last_co_key(&Uaid(&user.uaid)); let last_co: Option = con.get(&co_key)?; if last_co.is_some_and(|c| c < user.connected_at) { trace!( - "🉑 Was connected at {}, now at {}", + "🐰 Was connected at {}, now at {}", last_co.unwrap(), &user.connected_at ); @@ -190,7 +190,7 @@ impl DbClient for RedisClientImpl { .get::<&str, Option>(&user_key)? .and_then(|s| serde_json::from_str(s.as_ref()).ok()); if user.is_some() { - trace!("🉑 Found a record for {}", &uaid); + trace!("🐰 Found a record for {}", &uaid); } Ok(user) } @@ -255,7 +255,7 @@ impl DbClient for RedisClientImpl { .into_iter() .filter_map(|s| Uuid::from_str(&s).ok()) .collect(); - trace!("🉑 Found {} channels for {}", channels.len(), &uaid); + trace!("🐰 Found {} channels for {}", channels.len(), &uaid); Ok(channels) } @@ -267,7 +267,7 @@ impl DbClient for RedisClientImpl { let co_key = self.last_co_key(&uaid); let chan_list_key = self.channel_list_key(&uaid); // Remove {channel_id} from autopush/channel/{auid} - trace!("🉑 Removing channel {}", channel_id); + trace!("🐰 Removing channel {}", channel_id); let (status,): (bool,) = redis::pipe() .set_options(co_key, ms_since_epoch(), self.redis_opts) .ignore() @@ -306,9 +306,9 @@ impl DbClient for RedisClientImpl { // see autoendpoint/src/extractors/notification_headers.rs let opts = SetOptions::default().with_expiration(SetExpiry::EX(message.ttl)); - debug!("🗄️ Saving message {} :: {:?}", &msg_key, &message); + debug!("🐰 Saving message {} :: {:?}", &msg_key, &message); trace!( - "🉑 timestamp: {:?}", + "🐰 timestamp: {:?}", &message.timestamp.to_be_bytes().to_vec() ); @@ -318,7 +318,7 @@ impl DbClient for RedisClientImpl { .duration_since(SystemTime::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; - trace!("🉑 Message Expiry {}", expiry); + trace!("🐰 Message Expiry {}", expiry); let mut pipe = redis::pipe(); @@ -328,7 +328,7 @@ impl DbClient for RedisClientImpl { let old_msg_id: Option = con.get(&topic_key)?; // If a message is already stored for that topic, we remove it if let Some(id) = old_msg_id { - trace!("🉑 The topic had a message: {}", &id); + trace!("🐰 The topic had a message: {}", &id); // We remove the id from the exp list at the end, to be sure // it can't be removed from the list before the message is removed pipe.del(self.message_key(&uaid, &id)) @@ -380,13 +380,13 @@ impl DbClient for RedisClientImpl { /// Delete expired messages async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> { let uaid = Uaid(uaid); - debug!("🉑🔥 Incrementing storage to {}", timestamp); + debug!("🐰🔥 Incrementing storage to {}", timestamp); let msg_list_key = self.message_list_key(&uaid); let exp_list_key = self.message_exp_list_key(&uaid); let mut con = self.connection()?; let exp_id_list: Vec = con.zrangebyscore(&exp_list_key, 0, timestamp)?; if exp_id_list.len() > 0 { - trace!("🉑🔥 Deleting {} expired msgs", exp_id_list.len()); + trace!("🐰🔥 Deleting {} expired msgs", exp_id_list.len()); redis::pipe() .del(&exp_id_list) .zrem(&msg_list_key, &exp_id_list) @@ -400,14 +400,14 @@ impl DbClient for RedisClientImpl { async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> { let uaid = Uaid(uaid); trace!( - "🉑 attemping to delete {:?} :: {:?}", + "🐰 attemping to delete {:?} :: {:?}", uaid.to_string(), chidmessageid ); let msg_key = self.message_key(&uaid, &chidmessageid); let msg_list_key = self.message_list_key(&uaid); let exp_list_key = self.message_exp_list_key(&uaid); - debug!("🉑🔥 Deleting message {}", &msg_key); + debug!("🐰🔥 Deleting message {}", &msg_key); let mut con = self.connection()?; // We remove the id from the exp list at the end, to be sure // it can't be removed from the list before the message is removed @@ -453,7 +453,7 @@ impl DbClient for RedisClientImpl { limit: usize, ) -> DbResult { let uaid = Uaid(uaid); - trace!("Fecthing {} messages since {:?}", limit, timestamp); + trace!("🐰 Fecthing {} messages since {:?}", limit, timestamp); let mut con = self.connection()?; let msg_list_key = self.message_list_key(&uaid); let (messages_id, mut scores): (Vec, Vec) = con @@ -468,7 +468,7 @@ impl DbClient for RedisClientImpl { .map(|(id, s): (String, u64)| (self.message_key(&uaid, &id), s)) .unzip(); if messages_id.len() == 0 { - trace!("No message found"); + trace!("🐰 No message found"); return Ok(FetchMessageResponse { messages: vec![], timestamp: None, @@ -495,7 +495,7 @@ impl DbClient for RedisClientImpl { .collect() }; let timestamp = scores.pop(); - trace!("Found {} messages until {:?}", messages.len(), timestamp); + trace!("🐰 Found {} messages until {:?}", messages.len(), timestamp); Ok(FetchMessageResponse { messages, timestamp, From 193defb025c701cb687f7128f655dfb7f6740eb3 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 12:05:18 +0000 Subject: [PATCH 10/21] Clean up some comments --- autopush-common/src/db/redis/redis_client/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 908434fd..1a48ea37 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -64,7 +64,6 @@ pub struct RedisClientImpl { impl RedisClientImpl { pub fn new(metrics: Arc, settings: &DbSettings) -> DbResult { - // let env = Arc::new(EnvBuilder::new().build()); debug!("🐰 New redis client"); let dsn = settings .dsn @@ -442,9 +441,6 @@ impl DbClient for RedisClientImpl { /// /// If [`limit`] = 0, we fetch all messages after [`timestamp`]. /// - /// Note: Bigtable uses [`Notification.sortkey_timestamp`] instead, and the timestamp - /// is in seconds. - /// /// This can return expired messages, following bigtables behavior async fn fetch_timestamp_messages( &self, From 1d34450cf65b8116a9104e6b0aeaeed8f7849312 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 12:09:39 +0000 Subject: [PATCH 11/21] Use standard chidmessageid --- autopush-common/src/notification.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/autopush-common/src/notification.rs b/autopush-common/src/notification.rs index 3889a39c..298960a4 100644 --- a/autopush-common/src/notification.rs +++ b/autopush-common/src/notification.rs @@ -47,8 +47,6 @@ impl Notification { /// {chid}:{message_id} pub fn chidmessageid(&self) -> String { let chid = self.channel_id.as_hyphenated(); - #[cfg(feature = "redis")] - return format!("{}:{}", chid, self.version); if let Some(ref topic) = self.topic { format!("{TOPIC_NOTIFICATION_PREFIX}:{chid}:{topic}") From 03bdbe86b85f22e669071053170262f0f6ebbd51 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 12:10:35 +0000 Subject: [PATCH 12/21] Rename docker-compose for redis --- docker-compose.yml => redis-docker-compose.yml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename docker-compose.yml => redis-docker-compose.yml (100%) diff --git a/docker-compose.yml b/redis-docker-compose.yml similarity index 100% rename from docker-compose.yml rename to redis-docker-compose.yml From a8167cd75224a6e824cc720e228b2e06a0e2cf87 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 12:24:40 +0000 Subject: [PATCH 13/21] Fix settings tests with redis --- autoconnect/autoconnect-settings/src/lib.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/autoconnect/autoconnect-settings/src/lib.rs b/autoconnect/autoconnect-settings/src/lib.rs index 6ac34f8e..530b624c 100644 --- a/autoconnect/autoconnect-settings/src/lib.rs +++ b/autoconnect/autoconnect-settings/src/lib.rs @@ -218,6 +218,7 @@ impl Settings { Ok(()) } + #[cfg(feature = "bigtable")] pub fn test_settings() -> Self { let db_dsn = Some("grpc://localhost:8086".to_string()); // BigTable DB_SETTINGS. @@ -234,6 +235,17 @@ impl Settings { ..Default::default() } } + + #[cfg(all(feature = "redis", not(feature = "bigtable")))] + pub fn test_settings() -> Self { + let db_dsn = Some("redis://localhost".to_string()); + let db_settings = "".to_string(); + Self { + db_dsn, + db_settings, + ..Default::default() + } + } } fn deserialize_f64_to_duration<'de, D>(deserializer: D) -> Result From 12c741dd5b78e5690249213f9f5f8d2a9c5cd732 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 13:23:42 +0000 Subject: [PATCH 14/21] Use multiplexed connection with redis --- Cargo.lock | 10 ++ autopush-common/Cargo.toml | 2 +- .../src/db/redis/redis_client/mod.rs | 101 ++++++++++-------- 3 files changed, 65 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8dbfffd3..6c2c9f9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1110,7 +1110,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" dependencies = [ "bytes", + "futures-core", "memchr", + "pin-project-lite", + "tokio", + "tokio-util", ] [[package]] @@ -2985,14 +2989,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" dependencies = [ "arc-swap", + "async-trait", + "bytes", "combine", + "futures-util", "itertools 0.13.0", "itoa", "num-bigint", "percent-encoding", + "pin-project-lite", "ryu", "sha1_smol", "socket2", + "tokio", + "tokio-util", "url", ] diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index d86b448a..3af94abd 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -59,7 +59,7 @@ grpcio = { version = "=0.13.0", features = ["openssl"], optional = true } grpcio-sys = { version = "=0.13.0", optional = true } protobuf = { version = "=2.28.0", optional = true } # grpcio does not support protobuf 3+ form_urlencoded = { version = "1.2", optional = true } -redis = "0.27.6" +redis = { version = "0.27.6", features = ["aio", "tokio-comp"]} [dev-dependencies] mockito = "0.31" diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 1a48ea37..621c72dc 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime}; use async_trait::async_trait; use cadence::{CountedExt, StatsdClient}; -use redis::{Commands, ConnectionLike, SetExpiry, SetOptions}; +use redis::{AsyncCommands, SetExpiry, SetOptions}; use uuid::Uuid; use crate::db::NotificationRecord; @@ -87,23 +87,18 @@ impl RedisClientImpl { Pools also return a ConnectionLike, so we can add support for pools later. */ - fn connection(&self) -> DbResult { - if let Some(timeout) = self.settings.timeout { - if timeout.is_zero() { - return match self.client.get_connection_with_timeout(timeout) { - Ok(r) => Ok(r), - Err(_) => Err(DbError::ConnectionError( - "Cannot connect to redis".to_owned(), - )), - }; - } - } - match self.client.get_connection() { - Ok(r) => Ok(r), - Err(_) => Err(DbError::ConnectionError( - "Cannot connect to redis".to_owned(), - )), - } + async fn connection(&self) -> DbResult { + let config = if self.settings.timeout.is_some_and(|t| !t.is_zero()) { + redis::AsyncConnectionConfig::new() + .set_connection_timeout(self.settings.timeout.unwrap()) + } else { + redis::AsyncConnectionConfig::new() + }; + Ok(self + .client + .get_multiplexed_async_connection_with_config(&config) + .await + .map_err(|e| DbError::ConnectionError(format!("Cannot connect to redis: {}", e)))?) } fn user_key(&self, uaid: &Uaid) -> String { @@ -142,14 +137,15 @@ impl DbClient for RedisClientImpl { async fn add_user(&self, user: &User) -> DbResult<()> { trace!("🐰 Adding user"); trace!("🐰 Logged at {}", &user.connected_at); - let mut con = self.connection()?; + let mut con = self.connection().await?; let uaid = Uaid(&user.uaid); let user_key = self.user_key(&uaid); let co_key = self.last_co_key(&uaid); let _: () = redis::pipe() .set_options(co_key, ms_since_epoch(), self.redis_opts) .set_options(user_key, serde_json::to_string(user)?, self.redis_opts) - .exec(&mut con)?; + .exec_async(&mut con) + .await?; Ok(()) } @@ -166,9 +162,9 @@ impl DbClient for RedisClientImpl { /// "lively". async fn update_user(&self, user: &mut User) -> DbResult { trace!("🐰 Updating user"); - let mut con = self.connection()?; + let mut con = self.connection().await?; let co_key = self.last_co_key(&Uaid(&user.uaid)); - let last_co: Option = con.get(&co_key)?; + let last_co: Option = con.get(&co_key).await?; if last_co.is_some_and(|c| c < user.connected_at) { trace!( "🐰 Was connected at {}, now at {}", @@ -183,10 +179,11 @@ impl DbClient for RedisClientImpl { } async fn get_user(&self, uaid: &Uuid) -> DbResult> { - let mut con = self.connection()?; + let mut con = self.connection().await?; let user_key = self.user_key(&Uaid(uaid)); let user: Option = con - .get::<&str, Option>(&user_key)? + .get::<&str, Option>(&user_key) + .await? .and_then(|s| serde_json::from_str(s.as_ref()).ok()); if user.is_some() { trace!("🐰 Found a record for {}", &uaid); @@ -196,7 +193,7 @@ impl DbClient for RedisClientImpl { async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> { let uaid = Uaid(uaid); - let mut con = self.connection()?; + let mut con = self.connection().await?; let user_key = self.user_key(&uaid); let co_key = self.last_co_key(&uaid); let chan_list_key = self.channel_list_key(&uaid); @@ -208,20 +205,22 @@ impl DbClient for RedisClientImpl { .del(&chan_list_key) .del(&msg_list_key) .del(&exp_list_key) - .exec(&mut con)?; + .exec_async(&mut con) + .await?; Ok(()) } async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> { let uaid = Uaid(uaid); - let mut con = self.connection()?; + let mut con = self.connection().await?; let co_key = self.last_co_key(&uaid); let chan_list_key = self.channel_list_key(&uaid); let _: () = redis::pipe() .rpush(chan_list_key, channel_id.as_hyphenated().to_string()) .set_options(co_key, ms_since_epoch(), self.redis_opts) - .exec(&mut con)?; + .exec_async(&mut con) + .await?; Ok(()) } @@ -229,7 +228,7 @@ impl DbClient for RedisClientImpl { async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()> { let uaid = Uaid(uaid); // channel_ids are stored as a set within a single redis key - let mut con = self.connection()?; + let mut con = self.connection().await?; let co_key = self.last_co_key(&uaid); let chan_list_key = self.channel_list_key(&uaid); redis::pipe() @@ -241,16 +240,19 @@ impl DbClient for RedisClientImpl { .map(|c| c.as_hyphenated().to_string()) .collect::>(), ) - .exec(&mut con)?; + .exec_async(&mut con) + .await?; Ok(()) } async fn get_channels(&self, uaid: &Uuid) -> DbResult> { let uaid = Uaid(uaid); - let mut con = self.connection()?; + let mut con = self.client.get_multiplexed_async_connection().await?; + //let mut con = self.connection().await?; let chan_list_key = self.channel_list_key(&uaid); let channels: HashSet = con - .lrange::<&str, HashSet>(&chan_list_key, 0, -1)? + .lrange::<&str, HashSet>(&chan_list_key, 0, -1) + .await? .into_iter() .filter_map(|s| Uuid::from_str(&s).ok()) .collect(); @@ -262,7 +264,7 @@ impl DbClient for RedisClientImpl { async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult { let uaid = Uaid(uaid); let channel_id = Chanid(channel_id); - let mut con = self.connection()?; + let mut con = self.connection().await?; let co_key = self.last_co_key(&uaid); let chan_list_key = self.channel_list_key(&uaid); // Remove {channel_id} from autopush/channel/{auid} @@ -271,7 +273,8 @@ impl DbClient for RedisClientImpl { .set_options(co_key, ms_since_epoch(), self.redis_opts) .ignore() .lrem(&chan_list_key, 1, channel_id.to_string()) - .query(&mut con)?; + .query_async(&mut con) + .await?; Ok(status) } @@ -297,7 +300,7 @@ impl DbClient for RedisClientImpl { */ async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> { let uaid = Uaid(uaid); - let mut con = self.connection()?; + let mut con = self.connection().await?; let msg_list_key = self.message_list_key(&uaid); let exp_list_key = self.message_exp_list_key(&uaid); let msg_key = self.message_key(&uaid, &message.chidmessageid()); @@ -324,7 +327,7 @@ impl DbClient for RedisClientImpl { let is_topic = if let Some(topic) = &message.topic { let topic_key = self.topic_key(&uaid, &Chanid(&message.channel_id), &topic); // We check if a message is already saved for this topic - let old_msg_id: Option = con.get(&topic_key)?; + let old_msg_id: Option = con.get(&topic_key).await?; // If a message is already stored for that topic, we remove it if let Some(id) = old_msg_id { trace!("🐰 The topic had a message: {}", &id); @@ -355,7 +358,7 @@ impl DbClient for RedisClientImpl { .zadd(&exp_list_key, &msg_id, expiry) .zadd(&msg_list_key, &msg_id, ms_since_epoch()); - let _: () = pipe.exec(&mut con)?; + let _: () = pipe.exec_async(&mut con).await?; self.metrics .incr_with_tags("notification.message.stored") .with_tag("topic", &is_topic.to_string()) @@ -382,15 +385,16 @@ impl DbClient for RedisClientImpl { debug!("🐰🔥 Incrementing storage to {}", timestamp); let msg_list_key = self.message_list_key(&uaid); let exp_list_key = self.message_exp_list_key(&uaid); - let mut con = self.connection()?; - let exp_id_list: Vec = con.zrangebyscore(&exp_list_key, 0, timestamp)?; + let mut con = self.connection().await?; + let exp_id_list: Vec = con.zrangebyscore(&exp_list_key, 0, timestamp).await?; if exp_id_list.len() > 0 { trace!("🐰🔥 Deleting {} expired msgs", exp_id_list.len()); redis::pipe() .del(&exp_id_list) .zrem(&msg_list_key, &exp_id_list) .zrem(&exp_list_key, &exp_id_list) - .exec(&mut con)?; + .exec_async(&mut con) + .await?; } Ok(()) } @@ -407,14 +411,15 @@ impl DbClient for RedisClientImpl { let msg_list_key = self.message_list_key(&uaid); let exp_list_key = self.message_exp_list_key(&uaid); debug!("🐰🔥 Deleting message {}", &msg_key); - let mut con = self.connection()?; + let mut con = self.connection().await?; // We remove the id from the exp list at the end, to be sure // it can't be removed from the list before the message is removed redis::pipe() .del(&msg_key) .zrem(&msg_list_key, &chidmessageid) .zrem(&exp_list_key, &chidmessageid) - .exec(&mut con)?; + .exec_async(&mut con) + .await?; self.metrics .incr_with_tags("notification.message.deleted") .with_tag("database", &self.name()) @@ -450,7 +455,7 @@ impl DbClient for RedisClientImpl { ) -> DbResult { let uaid = Uaid(uaid); trace!("🐰 Fecthing {} messages since {:?}", limit, timestamp); - let mut con = self.connection()?; + let mut con = self.connection().await?; let msg_list_key = self.message_list_key(&uaid); let (messages_id, mut scores): (Vec, Vec) = con .zrangebyscore_limit_withscores::<&str, u64, &str, Vec<(String, u64)>>( @@ -459,7 +464,8 @@ impl DbClient for RedisClientImpl { "+inf", 0, limit as isize, - )? + ) + .await? .into_iter() .map(|(id, s): (String, u64)| (self.message_key(&uaid, &id), s)) .unzip(); @@ -473,7 +479,8 @@ impl DbClient for RedisClientImpl { let messages: Vec = if messages_id.len() == 0 { vec![] } else { - con.mget::<&Vec, Vec>>(&messages_id)? + con.mget::<&Vec, Vec>>(&messages_id) + .await? .into_iter() .filter_map(|opt: Option| { if opt.is_none() { @@ -499,8 +506,8 @@ impl DbClient for RedisClientImpl { } async fn health_check(&self) -> DbResult { - let status = self.connection()?.check_connection(); - Ok(status) + let _ = self.connection().await?; + Ok(true) } /// Returns true, because there's no table in Redis From 8a9b1466e99158ce3091f1f23c87add169906e22 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 13:34:21 +0000 Subject: [PATCH 15/21] Remove unused dep --- autoconnect/autoconnect-settings/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/autoconnect/autoconnect-settings/src/lib.rs b/autoconnect/autoconnect-settings/src/lib.rs index 530b624c..7fbe8e9d 100644 --- a/autoconnect/autoconnect-settings/src/lib.rs +++ b/autoconnect/autoconnect-settings/src/lib.rs @@ -11,7 +11,6 @@ use config::{Config, ConfigError, Environment, File}; use fernet::Fernet; use lazy_static::lazy_static; use serde::{Deserialize, Deserializer}; -use serde_json::json; use autopush_common::util::deserialize_u32_to_duration; From e133a00cad3ac9b77b16cb118fb5c1d06d95bd87 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 13:44:15 +0000 Subject: [PATCH 16/21] Set potentially different chidmessageid for tests notifs --- autopush-common/src/db/redis/redis_client/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 621c72dc..5d0d1481 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -801,6 +801,7 @@ mod tests { let test_notification = crate::db::Notification { timestamp: now(), version: "version1".to_owned(), + sortkey_timestamp: Some(sort_key + 10), ..test_notification_0 }; From 83ebc3c0a72f18c96c8c46a5290ef1151eea8433 Mon Sep 17 00:00:00 2001 From: sim Date: Sat, 21 Dec 2024 14:02:57 +0000 Subject: [PATCH 17/21] Remove unnecessary topic entry If this is a topic message: zadd(msg_list_key) and zadd(exp_list_key) will replace their old entry in the sorted sets if one already exists and set(msg_key, message) will override it too --- autopush-common/src/db/redis/mod.rs | 1 - .../src/db/redis/redis_client/mod.rs | 32 ++++--------------- 2 files changed, 7 insertions(+), 26 deletions(-) diff --git a/autopush-common/src/db/redis/mod.rs b/autopush-common/src/db/redis/mod.rs index f906a0b9..2921590b 100644 --- a/autopush-common/src/db/redis/mod.rs +++ b/autopush-common/src/db/redis/mod.rs @@ -9,7 +9,6 @@ /// `autopush/msgs_exp/{uaid}` SortedSet to store the list of the pending message ids, ordered by expiry date, this is because SortedSet elements can't have independant expiry date /// `autopush/msg/{uaid}/{chidmessageid}`, with `{chidmessageid} == {chid}:{version}` String to store /// the content of the messages -/// `autopush/topic/{uaid}/{chid}/{topic}` String to store the (last) message id of a given topic /// mod redis_client; diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 5d0d1481..0e58405b 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -125,10 +125,6 @@ impl RedisClientImpl { fn message_key(&self, uaid: &Uaid, chidmessageid: &str) -> String { format!("autopush/msg/{}/{}", uaid, chidmessageid) } - - fn topic_key(&self, uaid: &Uaid, chan_id: &Chanid, topic: &str) -> String { - format!("autopush/topic/{}/{}/{}", uaid, chan_id, topic) - } } #[async_trait] @@ -303,7 +299,8 @@ impl DbClient for RedisClientImpl { let mut con = self.connection().await?; let msg_list_key = self.message_list_key(&uaid); let exp_list_key = self.message_exp_list_key(&uaid); - let msg_key = self.message_key(&uaid, &message.chidmessageid()); + let msg_id = &message.chidmessageid(); + let msg_key = self.message_key(&uaid, &msg_id); // message.ttl is already min(headers.ttl, MAX_NOTIFICATION_TTL) // see autoendpoint/src/extractors/notification_headers.rs let opts = SetOptions::default().with_expiration(SetExpiry::EX(message.ttl)); @@ -324,29 +321,14 @@ impl DbClient for RedisClientImpl { let mut pipe = redis::pipe(); - let is_topic = if let Some(topic) = &message.topic { - let topic_key = self.topic_key(&uaid, &Chanid(&message.channel_id), &topic); - // We check if a message is already saved for this topic - let old_msg_id: Option = con.get(&topic_key).await?; - // If a message is already stored for that topic, we remove it - if let Some(id) = old_msg_id { - trace!("🐰 The topic had a message: {}", &id); - // We remove the id from the exp list at the end, to be sure - // it can't be removed from the list before the message is removed - pipe.del(self.message_key(&uaid, &id)) - .zrem(&msg_list_key, &id) - .zrem(&exp_list_key, &id); - } - // Setting the key replace the old one if any - pipe.set_options(&topic_key, &message.chidmessageid(), opts); - true - } else { - false - }; + // If this is a topic message: + // zadd(msg_list_key) and zadd(exp_list_key) will replace their old entry + // in the hashset if one already exists + // and set(msg_key, message) will override it too: nothing to do. + let is_topic = message.topic.is_some(); // Store notification record in autopush/msg/{aud}/{chidmessageid} // And store {chidmessageid} in autopush/msgs/{aud} - let msg_id = &message.chidmessageid(); let msg_key = self.message_key(&uaid, &msg_id); pipe.set_options( msg_key, From 4dfd29defb9360cbd02e7733cb928c0dd63ac06f Mon Sep 17 00:00:00 2001 From: sim Date: Sun, 22 Dec 2024 16:46:19 +0000 Subject: [PATCH 18/21] Store MultiplexedConnection as expected by redis --- .../src/db/redis/redis_client/mod.rs | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 0e58405b..c3b6f2f0 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -2,11 +2,12 @@ use std::collections::HashSet; use std::fmt; use std::fmt::Display; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; use async_trait::async_trait; use cadence::{CountedExt, StatsdClient}; +use redis::aio::MultiplexedConnection; use redis::{AsyncCommands, SetExpiry, SetOptions}; use uuid::Uuid; @@ -56,6 +57,7 @@ impl<'a> From> for String { pub struct RedisClientImpl { /// Database connector string pub client: redis::Client, + pub conn: Arc>>, pub(crate) settings: RedisDbSettings, /// Metrics client metrics: Arc, @@ -75,6 +77,7 @@ impl RedisClientImpl { Ok(Self { client, + conn: Arc::new(Mutex::new(None)), settings: db_settings, metrics, redis_opts: SetOptions::default().with_expiration(SetExpiry::EX(MAX_ROUTER_TTL)), @@ -88,17 +91,34 @@ impl RedisClientImpl { Pools also return a ConnectionLike, so we can add support for pools later. */ async fn connection(&self) -> DbResult { + { + let conn = self + .conn + .lock() + .map_err(|e| DbError::General(e.to_string()))? + .clone(); + + if let Some(co) = conn { + return Ok(co); + } + } let config = if self.settings.timeout.is_some_and(|t| !t.is_zero()) { redis::AsyncConnectionConfig::new() .set_connection_timeout(self.settings.timeout.unwrap()) } else { redis::AsyncConnectionConfig::new() }; - Ok(self + let co = self .client .get_multiplexed_async_connection_with_config(&config) .await - .map_err(|e| DbError::ConnectionError(format!("Cannot connect to redis: {}", e)))?) + .map_err(|e| DbError::ConnectionError(format!("Cannot connect to redis: {}", e)))?; + let mut conn = self + .conn + .lock() + .map_err(|e| DbError::General(e.to_string()))?; + *conn = Some(co.clone()); + Ok(co) } fn user_key(&self, uaid: &Uaid) -> String { From f2f6d032c690ff1d09aec51296f4d2a9be9fc5ae Mon Sep 17 00:00:00 2001 From: sim Date: Sun, 22 Dec 2024 20:10:02 +0000 Subject: [PATCH 19/21] Fix loop when fetching pending messages Exclude lower band from redis range --- autopush-common/src/db/redis/redis_client/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index c3b6f2f0..6a57d6d9 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -459,10 +459,11 @@ impl DbClient for RedisClientImpl { trace!("🐰 Fecthing {} messages since {:?}", limit, timestamp); let mut con = self.connection().await?; let msg_list_key = self.message_list_key(&uaid); + // ZRANGE Key (x +inf LIMIT 0 limit let (messages_id, mut scores): (Vec, Vec) = con - .zrangebyscore_limit_withscores::<&str, u64, &str, Vec<(String, u64)>>( + .zrangebyscore_limit_withscores::<&str, &str, &str, Vec<(String, u64)>>( &msg_list_key, - timestamp.unwrap_or(0), + &format!("({}", timestamp.unwrap_or(0)), "+inf", 0, limit as isize, From 00866acddd4bc1a87b2f27dee0176e9c809c209b Mon Sep 17 00:00:00 2001 From: sim Date: Tue, 14 Jan 2025 09:05:25 +0100 Subject: [PATCH 20/21] Fix comments --- .../src/db/redis/redis_client/mod.rs | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index 6a57d6d9..f0743c8f 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -84,12 +84,10 @@ impl RedisClientImpl { }) } - /** - Return a [ConnectionLike], which implement redis [Commands] and can be - used in pipes. - - Pools also return a ConnectionLike, so we can add support for pools later. - */ + /// Return a [ConnectionLike], which implement redis [Commands] and can be + /// used in pipes. + /// + /// Pools also return a ConnectionLike, so we can add support for pools later. async fn connection(&self) -> DbResult { { let conn = self @@ -309,11 +307,9 @@ impl DbClient for RedisClientImpl { Ok(true) } - /** - Write the notification to storage. - - If the message contains a topic, we remove the old message - */ + /// Write the notification to storage. + /// + /// If the message contains a topic, we remove the old message async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> { let uaid = Uaid(uaid); let mut con = self.connection().await?; @@ -429,9 +425,7 @@ impl DbClient for RedisClientImpl { Ok(()) } - /** - Topic messages are handled as other messages with redis, we return nothing. - */ + /// Topic messages are handled as other messages with redis, we return nothing. async fn fetch_topic_messages( &self, _uaid: &Uuid, @@ -487,7 +481,7 @@ impl DbClient for RedisClientImpl { .into_iter() .filter_map(|opt: Option| { if opt.is_none() { - // We return dummy expired event if we can't fetch the say event, + // We return dummy expired event if we can't fetch the said event, // it means the event has expired Some(Notification { timestamp: 1, From f82bc8ab65515e48ea1245dbcb49756673b500a6 Mon Sep 17 00:00:00 2001 From: sim Date: Thu, 16 Jan 2025 13:44:41 +0100 Subject: [PATCH 21/21] Fix redis healthcheck --- Cargo.lock | 17 +++-------------- autopush-common/Cargo.toml | 2 +- .../src/db/redis/redis_client/mod.rs | 3 ++- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0cb0f32..882439ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2386,15 +2386,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.14" @@ -3137,16 +3128,14 @@ dependencies = [ [[package]] name = "redis" -version = "0.27.6" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" +checksum = "9f89727cba9cec05cc579942321ff6dd09fe57a8b3217f52f952301efa010da5" dependencies = [ "arc-swap", - "async-trait", "bytes", "combine", "futures-util", - "itertools 0.13.0", "itoa", "num-bigint", "percent-encoding", @@ -4962,7 +4951,7 @@ dependencies = [ "http 0.2.12", "hyper 0.14.32", "hyper-rustls 0.24.2", - "itertools 0.12.1", + "itertools", "log", "percent-encoding", "rustls 0.22.4", diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index 3af94abd..6c9d473f 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -59,7 +59,7 @@ grpcio = { version = "=0.13.0", features = ["openssl"], optional = true } grpcio-sys = { version = "=0.13.0", optional = true } protobuf = { version = "=2.28.0", optional = true } # grpcio does not support protobuf 3+ form_urlencoded = { version = "1.2", optional = true } -redis = { version = "0.27.6", features = ["aio", "tokio-comp"]} +redis = { version = "0.28.1", features = ["aio", "tokio-comp"]} [dev-dependencies] mockito = "0.31" diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs index f0743c8f..e247e36a 100644 --- a/autopush-common/src/db/redis/redis_client/mod.rs +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -503,7 +503,8 @@ impl DbClient for RedisClientImpl { } async fn health_check(&self) -> DbResult { - let _ = self.connection().await?; + let mut con = self.connection().await?; + let _: () = con.ping().await?; Ok(true) }