diff --git a/Cargo.lock b/Cargo.lock index e3e32aeb0..7876663f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -819,6 +819,7 @@ dependencies = [ "openssl", "protobuf", "rand 0.8.5", + "redis", "regex", "reqwest 0.12.9", "sentry", @@ -1114,6 +1115,16 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "config" version = "0.14.1" @@ -2371,6 +2382,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" @@ -2549,9 +2569,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +checksum = "a2ef2593ffb6958c941575cee70c8e257438749971869c4ae5acf6f91a168a61" dependencies = [ "adler2", ] @@ -3111,6 +3131,24 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "redis" +version = "0.27.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" +dependencies = [ + "arc-swap", + "combine", + "itertools 0.13.0", + "itoa", + "num-bigint", + "percent-encoding", + "ryu", + "sha1_smol", + "socket2", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -3692,6 +3730,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" @@ -4908,7 +4952,7 @@ dependencies = [ "http 0.2.12", "hyper 0.14.32", "hyper-rustls 0.24.2", - "itertools", + "itertools 0.12.1", "log", "percent-encoding", "rustls 0.22.4", diff --git a/Makefile b/Makefile index 756d27c0e..6ca9fac06 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,11 @@ CARGO = cargo # Let's be very explicit about it for now. TESTS_DIR := `pwd`/tests TEST_RESULTS_DIR ?= workspace/test-results -PYTEST_ARGS ?= $(if $(SKIP_SENTRY),-m "not sentry") $(if $(TEST_STUB),,-m "not stub") # Stub tests do not work in CI +# NOTE: Do not be clever. +# The integration tests (and a few others) use pytest markers to control +# the tests that are being run. These markers are set and defined within +# the `./pyproject.toml`. That is the single source of truth. +PYTEST_ARGS := ${PYTEST_ARGS} INTEGRATION_TEST_DIR := $(TESTS_DIR)/integration INTEGRATION_TEST_FILE := $(INTEGRATION_TEST_DIR)/test_integration_all_rust.py NOTIFICATION_TEST_DIR := $(TESTS_DIR)/notification @@ -46,17 +50,17 @@ integration-test-clean: $(DOCKER_COMPOSE) -f $(INTEGRATION_TEST_DIR)/docker-compose.yml down docker rm integration-tests -integration-test-legacy: +integration-test-legacy: ## pytest markers are stored in `tests/pytest.ini` $(POETRY) -V $(POETRY) install --without dev,load,notification --no-root $(POETRY) run pytest $(INTEGRATION_TEST_FILE) \ --junit-xml=$(TEST_RESULTS_DIR)/integration_test_legacy_results.xml \ -v $(PYTEST_ARGS) -integration-test-local: +integration-test-local: ## pytest markers are stored in `tests/pytest.ini` $(POETRY) -V $(POETRY) install --without dev,load,notification --no-root - $(POETRY) run pytest $(INTEGRATION_TEST_FILE) \ + $(POETRY) run pytest $(INTEGRATION_TEST_FILE) \ --junit-xml=$(TEST_RESULTS_DIR)/integration_test_results.xml \ -v $(PYTEST_ARGS) diff --git a/autoconnect/Cargo.toml b/autoconnect/Cargo.toml index c0ba4ba27..927959b97 100644 --- a/autoconnect/Cargo.toml +++ b/autoconnect/Cargo.toml @@ -53,7 +53,13 @@ actix-service = "2.0" docopt = "1.1" [features] -default = ["bigtable"] +default = ["bigtable", "reliable_report"] bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"] emulator = ["bigtable"] log_vapid = [] +reliable_report = [ + "autoconnect_settings/reliable_report", + "autoconnect_web/reliable_report", + "autoconnect_ws/reliable_report", + "autopush_common/reliable_report", +] diff --git a/autoconnect/autoconnect-common/src/broadcast.rs b/autoconnect/autoconnect-common/src/broadcast.rs index b5f589aeb..63ce0a12a 100644 --- a/autoconnect/autoconnect-common/src/broadcast.rs +++ b/autoconnect/autoconnect-common/src/broadcast.rs @@ -199,11 +199,11 @@ impl BroadcastChangeTracker { } *ver = broadcast.version; } else { - trace!("ðŸ“Ē Not found: {}", &b_id); + trace!("ðŸ“Ē Not found: {b_id}"); return Err(ApcErrorKind::BroadcastError("Broadcast not found".into()).into()); } - trace!("ðŸ“Ē New version of {}", &b_id); + trace!("ðŸ“Ē New version of {b_id}"); // Check to see if this broadcast has been updated since initialization let bcast_index = self .broadcast_list diff --git a/autoconnect/autoconnect-settings/Cargo.toml b/autoconnect/autoconnect-settings/Cargo.toml index 32c04aeb0..5b1e10239 100644 --- a/autoconnect/autoconnect-settings/Cargo.toml +++ b/autoconnect/autoconnect-settings/Cargo.toml @@ -25,3 +25,4 @@ autopush_common.workspace = true # specify the default via the calling crate, in order to simplify default chains. bigtable = ["autopush_common/bigtable"] emulator = ["bigtable"] +reliable_report = ["autopush_common/reliable_report"] diff --git a/autoconnect/autoconnect-settings/src/app_state.rs b/autoconnect/autoconnect-settings/src/app_state.rs index 8383c6ee8..7999ac604 100644 --- a/autoconnect/autoconnect-settings/src/app_state.rs +++ b/autoconnect/autoconnect-settings/src/app_state.rs @@ -12,6 +12,8 @@ use autoconnect_common::{ registry::ClientRegistry, }; use autopush_common::db::{client::DbClient, DbSettings, StorageType}; +#[cfg(feature = "reliable_report")] +use autopush_common::reliability::PushReliability; use crate::{Settings, ENV_PREFIX}; @@ -32,6 +34,9 @@ pub struct AppState { pub settings: Settings, pub router_url: String, pub endpoint_url: String, + + #[cfg(feature = "reliable_report")] + pub reliability: Arc, } impl AppState { @@ -84,6 +89,13 @@ impl AppState { ENV_PREFIX.to_uppercase() ), }; + + #[cfg(feature = "reliable_report")] + let reliability = Arc::new( + PushReliability::new(&settings.reliability_dsn, db.clone()).map_err(|e| { + ConfigError::Message(format!("Could not start Reliability connection: {:?}", e)) + })?, + ); let http = reqwest::Client::builder() .timeout(Duration::from_secs(1)) .build() @@ -103,6 +115,8 @@ impl AppState { settings, router_url, endpoint_url, + #[cfg(feature = "reliable_report")] + reliability, }) } diff --git a/autoconnect/autoconnect-settings/src/lib.rs b/autoconnect/autoconnect-settings/src/lib.rs index f022fe161..a37f71a35 100644 --- a/autoconnect/autoconnect-settings/src/lib.rs +++ b/autoconnect/autoconnect-settings/src/lib.rs @@ -108,6 +108,11 @@ pub struct Settings { /// /// By default, the number of available physical CPUs is used as the worker count. pub actix_workers: Option, + #[cfg(feature = "reliable_report")] + /// The DNS for the reliability data store. This is normally a Redis compatible + /// storage system. See [Connection Parameters](https://docs.rs/redis/latest/redis/#connection-parameters) + /// for details. + pub reliability_dsn: Option, } impl Default for Settings { @@ -139,6 +144,8 @@ impl Default for Settings { msg_limit: 150, actix_max_connections: None, actix_workers: None, + #[cfg(feature = "reliable_report")] + reliability_dsn: None, } } } diff --git a/autoconnect/autoconnect-web/Cargo.toml b/autoconnect/autoconnect-web/Cargo.toml index 08f5f4ba8..2df9d5aba 100644 --- a/autoconnect/autoconnect-web/Cargo.toml +++ b/autoconnect/autoconnect-web/Cargo.toml @@ -36,3 +36,7 @@ tokio.workspace = true autoconnect_common = { workspace = true, features = ["test-support"] } [features] +reliable_report = [ + "autopush_common/reliable_report", + "autoconnect_ws/reliable_report", +] diff --git a/autoconnect/autoconnect-web/src/routes.rs b/autoconnect/autoconnect-web/src/routes.rs index fbb59ef05..62f06389e 100644 --- a/autoconnect/autoconnect-web/src/routes.rs +++ b/autoconnect/autoconnect-web/src/routes.rs @@ -16,21 +16,45 @@ pub async fn ws_route( } /// Deliver a Push notification directly to a connected client +#[allow(unused_mut)] pub async fn push_route( uaid: web::Path, - notif: web::Json, + mut notif: web::Json, app_state: web::Data, ) -> HttpResponse { trace!( - "âĐ push_route, uaid: {} channel_id: {}", + "âĐ in push_route, uaid: {} channel_id: {}", uaid, - notif.channel_id + notif.channel_id, ); + #[cfg(feature = "reliable_report")] + { + notif + .record_reliability( + &app_state.reliability, + autopush_common::reliability::ReliabilityState::IntAccepted, + ) + .await; + notif + .record_reliability( + &app_state.reliability, + autopush_common::reliability::ReliabilityState::Transmitted, + ) + .await; + } + // Attempt to send the notification to the UA using WebSocket protocol, or store on failure. let result = app_state .clients - .notify(uaid.into_inner(), notif.into_inner()) + .notify(uaid.into_inner(), notif.clone()) .await; if result.is_ok() { + #[cfg(feature = "reliable_report")] + notif + .record_reliability( + &app_state.reliability, + autopush_common::reliability::ReliabilityState::Accepted, + ) + .await; HttpResponse::Ok().finish() } else { HttpResponse::NotFound().body("Client not available") diff --git a/autoconnect/autoconnect-ws/Cargo.toml b/autoconnect/autoconnect-ws/Cargo.toml index 194d79742..d4561dcaa 100644 --- a/autoconnect/autoconnect-ws/Cargo.toml +++ b/autoconnect/autoconnect-ws/Cargo.toml @@ -35,3 +35,7 @@ ctor.workspace = true autoconnect_common = { workspace = true, features = ["test-support"] } [features] +reliable_report = [ + "autopush_common/reliable_report", + "autoconnect_ws_sm/reliable_report", +] diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml b/autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml index bc76baf62..0f02874e9 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml @@ -32,3 +32,4 @@ serde_json.workspace = true autoconnect_common = { workspace = true, features = ["test-support"] } [features] +reliable_report = [] diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/mod.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/mod.rs index d7488a248..136335d16 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/mod.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/mod.rs @@ -138,6 +138,11 @@ impl WebPushClient { &self.app_state.settings } + #[cfg(feature = "reliable_report")] + pub fn app_reliability(&self) -> &autopush_common::reliability::PushReliability { + &self.app_state.reliability + } + /// Connect this `WebPushClient` to the `ClientRegistry` /// /// Returning a `Stream` of `ServerNotification`s from the `ClientRegistry` @@ -233,6 +238,7 @@ impl WebPushClient { let connected_at = self.connected_at; rt::spawn(async move { app_state.db.save_messages(&uaid, notifs).await?; + // XXX: record reliability debug!("Finished saving unacked direct notifs, checking for reconnect"); let Some(user) = app_state.db.get_user(&uaid).await? else { return Err(SMErrorKind::Internal(format!( diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_server_notif.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_server_notif.rs index 7e6b6f62f..06094dbf5 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_server_notif.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_server_notif.rs @@ -124,6 +124,7 @@ impl WebPushClient { if msg.sortkey_timestamp.is_none() { expired_topic_sort_keys.push(msg.chidmessageid()); } + // XXX: record ReliabilityState::Expired? false }); // TODO: A batch remove_messages would be nicer @@ -163,6 +164,22 @@ impl WebPushClient { Ok(smsgs) } + #[cfg(feature = "reliable_report")] + /// Record and transition the state for trackable messages. + async fn record_state( + &self, + messages: &mut Vec, + state: autopush_common::reliability::ReliabilityState, + ) { + // *Note* because `.map()` is sync + // we can't call the async func without additional hoops. + for message in messages { + message + .record_reliability(&self.app_state.reliability, state) + .await; + } + } + /// Read a chunk (max count 10 returned) of Notifications from storage /// /// This alternates between reading Topic Notifications and Timestamp @@ -186,10 +203,20 @@ impl WebPushClient { let topic_resp = if self.flags.include_topic { trace!("🗄ïļ WebPushClient::do_check_storage: fetch_topic_messages"); // Get the most recent max 11 messages. - self.app_state + #[allow(unused_mut)] + let mut messages = self + .app_state .db .fetch_topic_messages(&self.uaid, 11) - .await? + .await?; + #[cfg(feature = "reliable_report")] + // Since we pulled these from storage, mark them as "retrieved" + self.record_state( + &mut messages.messages, + autopush_common::reliability::ReliabilityState::Retrieved, + ) + .await; + messages } else { Default::default() }; @@ -226,7 +253,8 @@ impl WebPushClient { "🗄ïļ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}", timestamp ); - let timestamp_resp = self + #[allow(unused_mut)] + let mut timestamp_resp = self .app_state .db .fetch_timestamp_messages(&self.uaid, timestamp, 10) @@ -244,6 +272,13 @@ impl WebPushClient { ) .with_tag("topic", "false") .send(); + #[cfg(feature = "reliable_report")] + // Since we pulled these from storage, mark them as "retrieved" + self.record_state( + &mut timestamp_resp.messages, + autopush_common::reliability::ReliabilityState::Retrieved, + ) + .await; } Ok(CheckStorageResponse { diff --git a/autoendpoint/Cargo.toml b/autoendpoint/Cargo.toml index a541fc444..32d991043 100644 --- a/autoendpoint/Cargo.toml +++ b/autoendpoint/Cargo.toml @@ -68,7 +68,7 @@ tempfile = "3.2.0" tokio = { workspace = true, features = ["fs", "macros"] } [features] -default = ["bigtable"] +default = ["bigtable", "reliable_report"] # data store types bigtable = ["autopush_common/bigtable"] @@ -82,3 +82,5 @@ emulator = ["bigtable"] stub = [] # Verbosely log vapid assertions (NOT ADVISED FOR WIDE PRODUCTION USE) log_vapid = [] + +reliable_report = ["autopush_common/reliable_report"] diff --git a/autoendpoint/src/extractors/notification.rs b/autoendpoint/src/extractors/notification.rs index f29143c8e..45c359734 100644 --- a/autoendpoint/src/extractors/notification.rs +++ b/autoendpoint/src/extractors/notification.rs @@ -26,6 +26,12 @@ pub struct Notification { pub sort_key_timestamp: u64, /// The encrypted notification body pub data: Option, + #[cfg(feature = "reliable_report")] + /// The current state the message was in (if tracked) + pub reliable_state: Option, + #[cfg(feature = "reliable_report")] + #[cfg(feature = "reliable_report")] + pub reliability_id: Option, } impl FromRequest for Notification { @@ -68,9 +74,35 @@ impl FromRequest for Notification { sort_key_timestamp, ); + #[cfg(feature = "reliable_report")] + let reliability_id = subscription.reliability_id.clone(); + + #[allow(unused_mut)] + let mut notif = Notification { + message_id, + subscription, + headers, + timestamp, + sort_key_timestamp, + data, + #[cfg(feature = "reliable_report")] + reliable_state: None, + #[cfg(feature = "reliable_report")] + reliability_id, + }; + + #[cfg(feature = "reliable_report")] + // Brand new notification, so record it as "Received" + notif + .record_reliability( + &app_state.reliability, + autopush_common::reliability::ReliabilityState::Received, + ) + .await; + // Record the encoding if we have an encrypted payload - if let Some(encoding) = &headers.encoding { - if data.is_some() { + if let Some(encoding) = ¬if.headers.encoding { + if notif.data.is_some() { app_state .metrics .incr(&format!("updates.notification.encoding.{encoding}")) @@ -78,14 +110,7 @@ impl FromRequest for Notification { } } - Ok(Notification { - message_id, - subscription, - headers, - timestamp, - sort_key_timestamp, - data, - }) + Ok(notif) } .boxed_local() } @@ -112,6 +137,8 @@ impl From for autopush_common::notification::Notification { Some(headers) } }, + #[cfg(feature = "reliable_report")] + reliable_state: notification.reliable_state, } } } @@ -172,10 +199,18 @@ impl Notification { map.insert("ttl", serde_json::to_value(self.headers.ttl)?); map.insert("topic", serde_json::to_value(&self.headers.topic)?); map.insert("timestamp", serde_json::to_value(self.timestamp)?); - if let Some(reliability_id) = &self.subscription.reliability_id { - map.insert("reliability_id", serde_json::to_value(reliability_id)?); + #[cfg(feature = "reliable_report")] + { + if let Some(reliability_id) = &self.subscription.reliability_id { + map.insert("reliability_id", serde_json::to_value(reliability_id)?); + } + if let Some(reliable_state) = self.reliable_state { + map.insert( + "reliable_state", + serde_json::to_value(reliable_state.to_string())?, + ); + } } - if let Some(data) = &self.data { map.insert("data", serde_json::to_value(data)?); @@ -185,4 +220,20 @@ impl Notification { Ok(map) } + + #[cfg(feature = "reliable_report")] + pub async fn record_reliability( + &mut self, + reliability: &autopush_common::reliability::PushReliability, + state: autopush_common::reliability::ReliabilityState, + ) { + self.reliable_state = reliability + .record( + &self.reliability_id, + state, + &self.reliable_state, + Some(self.timestamp + self.headers.ttl as u64), + ) + .await; + } } diff --git a/autoendpoint/src/extractors/routers.rs b/autoendpoint/src/extractors/routers.rs index c4c012a29..5ef4e3b00 100644 --- a/autoendpoint/src/extractors/routers.rs +++ b/autoendpoint/src/extractors/routers.rs @@ -79,6 +79,8 @@ impl FromRequest for Routers { metrics: app_state.metrics.clone(), http: app_state.http.clone(), endpoint_url: app_state.settings.endpoint_url(), + #[cfg(feature = "reliable_report")] + reliability: app_state.reliability.clone(), }, fcm: app_state.fcm_router.clone(), apns: app_state.apns_router.clone(), diff --git a/autoendpoint/src/extractors/subscription.rs b/autoendpoint/src/extractors/subscription.rs index 767088d33..23433d4fc 100644 --- a/autoendpoint/src/extractors/subscription.rs +++ b/autoendpoint/src/extractors/subscription.rs @@ -7,6 +7,7 @@ use autopush_common::{ tags::Tags, util::{b64_decode_std, b64_decode_url}, }; + use cadence::{CountedExt, StatsdClient}; use futures::{future::LocalBoxFuture, FutureExt}; use jsonwebtoken::{Algorithm, DecodingKey, Validation}; @@ -22,7 +23,6 @@ use crate::headers::{ crypto_key::CryptoKeyHeader, vapid::{VapidClaims, VapidError, VapidHeader, VapidHeaderWithKey, VapidVersionData}, }; -use crate::metrics::Metrics; use crate::server::AppState; use crate::settings::Settings; @@ -54,7 +54,6 @@ impl FromRequest for Subscription { trace!("🔐 Token info: {:?}", &token_info); let app_state: Data = Data::extract(&req).await.expect("No server state found"); - let metrics = Metrics::from(&app_state); // Decrypt the token let token = app_state @@ -71,20 +70,30 @@ impl FromRequest for Subscription { let vapid: Option = parse_vapid(&token_info, &app_state.metrics)? .map(|vapid| extract_public_key(vapid, &token_info)) .transpose()?; - trace!("raw vapid: {:?}", &vapid); + // Validate the VAPID JWT token, fetch the claims, and record the version + if let Some(with_key) = &vapid { + // Validate the VAPID JWT token and record the version + validate_vapid_jwt(with_key, &app_state.settings, &app_state.metrics)?; + app_state.metrics.incr(&format!( + "updates.vapid.draft{:02}", + with_key.vapid.version() + ))?; + }; + // If this is a known VAPID key, create a reliability_id from + // either the content of the vapid assertions, or the request + // header value, or just make one up. let reliability_id: Option = vapid.as_ref().and_then(|v| { app_state - .vapid_tracker + .reliability_filter .is_trackable(v) - .then(|| app_state.vapid_tracker.get_id(req.headers())) + .then(|| app_state.reliability_filter.get_id(req.headers())) }); - debug!("🔍 Assigning Reliability ID: {reliability_id:?}"); - + trace!("🔍 track_id: {:?}", reliability_id); // Capturing the vapid sub right now will cause too much cardinality. Instead, // let's just capture if we have a valid VAPID, as well as what sort of bad sub // values we get. - if let Some(ref header) = vapid { + if let Some(header) = &vapid { let sub = header .vapid .insecure_sub() @@ -93,13 +102,15 @@ impl FromRequest for Subscription { let mut tags = Tags::default(); tags.tags .insert("error".to_owned(), e.as_metric().to_owned()); - metrics - .clone() - .incr_with_tags("notification.auth.error", Some(tags)); + app_state + .metrics + .incr_with_tags("notification.auth.error") + .with_tag("error", e.as_metric()) + .send(); }) .unwrap_or_default(); // For now, record that we had a good (?) VAPID sub, - metrics.clone().incr("notification.auth.ok"); + app_state.metrics.incr("notification.auth.ok")?; info!("VAPID sub: {:?}", sub) }; @@ -127,7 +138,7 @@ impl FromRequest for Subscription { // Validate the VAPID JWT token and record the version if let Some(vapid) = &vapid { - validate_vapid_jwt(vapid, &app_state.settings, &metrics)?; + validate_vapid_jwt(vapid, &app_state.settings, &app_state.metrics)?; app_state .metrics @@ -284,8 +295,8 @@ fn term_to_label(term: &str) -> String { fn validate_vapid_jwt( vapid: &VapidHeaderWithKey, settings: &Settings, - metrics: &Metrics, -) -> ApiResult<()> { + metrics: &StatsdClient, +) -> ApiResult { let VapidHeaderWithKey { vapid, public_key } = vapid; let public_key = decode_public_key(public_key)?; @@ -304,20 +315,18 @@ fn validate_vapid_jwt( Err(e) => match e.kind() { // NOTE: This will fail if `exp` is specified as anything instead of a numeric or if a required field is empty jsonwebtoken::errors::ErrorKind::Json(e) => { - let mut tags = Tags::default(); - tags.tags.insert( - "error".to_owned(), - match e.classify() { - serde_json::error::Category::Io => "IO_ERROR", - serde_json::error::Category::Syntax => "SYNTAX_ERROR", - serde_json::error::Category::Data => "DATA_ERROR", - serde_json::error::Category::Eof => "EOF_ERROR", - } - .to_owned(), - ); metrics - .clone() - .incr_with_tags("notification.auth.bad_vapid.json", Some(tags)); + .incr_with_tags("notification.auth.bad_vapid.json") + .with_tag( + "error", + match e.classify() { + serde_json::error::Category::Io => "IO_ERROR", + serde_json::error::Category::Syntax => "SYNTAX_ERROR", + serde_json::error::Category::Data => "DATA_ERROR", + serde_json::error::Category::Eof => "EOF_ERROR", + }, + ) + .send(); if e.is_data() { debug!("VAPID data warning: {:?}", e); return Err(VapidError::InvalidVapid( @@ -339,7 +348,6 @@ fn validate_vapid_jwt( // Attempt to match up the majority of ErrorKind variants. // The third-party errors all defer to the source, so we can // use that to differentiate for actual errors. - let mut tags = Tags::default(); let label = if e.source().is_none() { // These two have the most cardinality, so we need to handle // them separately. @@ -358,10 +366,10 @@ fn validate_vapid_jwt( // If you need to dig into these, there's always the logs. "Other".to_owned() }; - tags.tags.insert("error".to_owned(), label); metrics - .clone() - .incr_with_tags("notification.auth.bad_vapid.other", Some(tags)); + .incr_with_tags("notification.auth.bad_vapid.other") + .with_tag("error", &label) + .send(); error!("Bad Aud: Unexpected VAPID error: {:?}", &e); return Err(e.into()); } @@ -390,7 +398,7 @@ fn validate_vapid_jwt( return Err(VapidError::FutureExpirationToken.into()); } - Ok(()) + Ok(token_data.claims) } #[cfg(test)] @@ -467,7 +475,7 @@ pub mod tests { VapidClaims::default_exp() - 100, public_key, ); - let result = validate_vapid_jwt(&header, &test_settings, &Metrics::noop()); + let result = validate_vapid_jwt(&header, &test_settings, &Metrics::sink()); assert!(result.is_ok()); } @@ -485,7 +493,7 @@ pub mod tests { PUB_KEY.to_owned(), ); assert!(matches!( - validate_vapid_jwt(&header, &test_settings, &Metrics::noop()) + validate_vapid_jwt(&header, &test_settings, &Metrics::sink()) .unwrap_err() .kind, ApiErrorKind::VapidError(VapidError::InvalidAudience) @@ -505,7 +513,7 @@ pub mod tests { VapidClaims::default_exp() - 100, PUB_KEY.to_owned(), ); - let result = validate_vapid_jwt(&header, &test_settings, &Metrics::noop()); + let result = validate_vapid_jwt(&header, &test_settings, &Metrics::sink()); assert!(result.is_ok()); } @@ -539,7 +547,7 @@ pub mod tests { version_data: VapidVersionData::Version1, }, }; - let vv = validate_vapid_jwt(&header, &test_settings, &Metrics::noop()) + let vv = validate_vapid_jwt(&header, &test_settings, &Metrics::sink()) .unwrap_err() .kind; assert!(matches![ @@ -582,7 +590,7 @@ pub mod tests { version_data: VapidVersionData::Version1, }, }; - assert!(validate_vapid_jwt(&header, &test_settings, &Metrics::noop()).is_ok()); + assert!(validate_vapid_jwt(&header, &test_settings, &Metrics::sink()).is_ok()); // try standard form with no padding let header = VapidHeaderWithKey { public_key: public_key_standard.trim_end_matches('=').to_owned(), @@ -592,7 +600,7 @@ pub mod tests { version_data: VapidVersionData::Version1, }, }; - assert!(validate_vapid_jwt(&header, &test_settings, &Metrics::noop()).is_ok()); + assert!(validate_vapid_jwt(&header, &test_settings, &Metrics::sink()).is_ok()); // try URL safe form with padding let header = VapidHeaderWithKey { public_key: public_key_url_safe.clone(), @@ -602,7 +610,7 @@ pub mod tests { version_data: VapidVersionData::Version1, }, }; - assert!(validate_vapid_jwt(&header, &test_settings, &Metrics::noop()).is_ok()); + assert!(validate_vapid_jwt(&header, &test_settings, &Metrics::sink()).is_ok()); // try URL safe form without padding let header = VapidHeaderWithKey { public_key: public_key_url_safe.trim_end_matches('=').to_owned(), @@ -612,7 +620,7 @@ pub mod tests { version_data: VapidVersionData::Version1, }, }; - assert!(validate_vapid_jwt(&header, &test_settings, &Metrics::noop()).is_ok()); + assert!(validate_vapid_jwt(&header, &test_settings, &Metrics::sink()).is_ok()); } #[test] @@ -645,7 +653,7 @@ pub mod tests { version_data: VapidVersionData::Version1, }, }; - let vv = validate_vapid_jwt(&header, &test_settings, &Metrics::noop()) + let vv = validate_vapid_jwt(&header, &test_settings, &Metrics::sink()) .unwrap_err() .kind; assert!(matches![ diff --git a/autoendpoint/src/headers/vapid.rs b/autoendpoint/src/headers/vapid.rs index e46bed6c8..a255c98bd 100644 --- a/autoendpoint/src/headers/vapid.rs +++ b/autoendpoint/src/headers/vapid.rs @@ -1,3 +1,4 @@ +use core::str; use std::collections::HashMap; use std::fmt; @@ -127,6 +128,8 @@ impl VapidHeader { (data, VapidVersionData::Version1) }; + // Validate the JWT here + Ok(Self { scheme, token, @@ -164,6 +167,7 @@ impl VapidHeader { info!("🔐 Vapid: sub: {:?}", sub); return Ok(sub.to_owned()); } + Err(VapidError::SubMissing) } @@ -267,4 +271,13 @@ mod tests { Ok("mailto:admin@example.com".to_owned()) ) } + + #[test] + fn extract_sub() { + let header = VapidHeader::parse(VALID_HEADER).unwrap(); + assert_eq!( + header.insecure_sub().unwrap(), + "mailto:admin@example.com".to_string() + ); + } } diff --git a/autoendpoint/src/routers/apns/router.rs b/autoendpoint/src/routers/apns/router.rs index 48c497a43..1a47f4b12 100644 --- a/autoendpoint/src/routers/apns/router.rs +++ b/autoendpoint/src/routers/apns/router.rs @@ -1,4 +1,6 @@ use autopush_common::db::client::DbClient; +#[cfg(feature = "reliable_report")] +use autopush_common::reliability::{PushReliability, ReliabilityState}; use crate::error::{ApiError, ApiResult}; use crate::extractors::notification::Notification; @@ -34,6 +36,8 @@ pub struct ApnsRouter { endpoint_url: Url, metrics: Arc, db: Box, + #[cfg(feature = "reliable_report")] + reliability: Arc, } struct ApnsClientData { @@ -115,6 +119,7 @@ impl ApnsRouter { endpoint_url: Url, metrics: Arc, db: Box, + #[cfg(feature = "reliable_report")] reliability: Arc, ) -> Result { let channels = settings.channels()?; @@ -130,6 +135,8 @@ impl ApnsRouter { endpoint_url, metrics, db, + #[cfg(feature = "reliable_report")] + reliability, }) } @@ -397,7 +404,11 @@ impl Router for ApnsRouter { Ok(router_data) } - async fn route_notification(&self, notification: &Notification) -> ApiResult { + #[allow(unused_mut)] + async fn route_notification( + &self, + mut notification: Notification, + ) -> ApiResult { debug!( "Sending APNS notification to UAID {}", notification.subscription.user.uaid @@ -420,7 +431,7 @@ impl Router for ApnsRouter { .and_then(Value::as_str) .ok_or(ApnsError::NoReleaseChannel)?; let aps_json = router_data.get("aps").cloned(); - let mut message_data = build_message_data(notification)?; + let mut message_data = build_message_data(¬ification)?; message_data.insert("ver", notification.message_id.clone()); // Get client and build payload @@ -473,9 +484,18 @@ impl Router for ApnsRouter { .await); } - // Sent successfully, update metrics and make response trace!("APNS request was successful"); - incr_success_metrics(&self.metrics, "apns", channel, notification); + incr_success_metrics(&self.metrics, "apns", channel, ¬ification); + #[cfg(feature = "reliable_report")] + { + // Record that we've sent the message out to APNS. + // We can't set the state here because the notification isn't + // mutable, but we are also essentially consuming the + // notification nothing else should modify it. + notification + .record_reliability(&self.reliability, ReliabilityState::Transmitted) + .await; + } Ok(RouterResponse::success( self.endpoint_url @@ -501,6 +521,8 @@ mod tests { use async_trait::async_trait; use autopush_common::db::client::DbClient; use autopush_common::db::mock::MockDbClient; + #[cfg(feature = "reliable_report")] + use autopush_common::reliability::PushReliability; use cadence::StatsdClient; use mockall::predicate; use std::collections::HashMap; @@ -561,7 +583,9 @@ mod tests { settings: ApnsSettings::default(), endpoint_url: Url::parse("http://localhost:8080/").unwrap(), metrics: Arc::new(StatsdClient::from_sink("autopush", cadence::NopMetricSink)), - db, + db: db.clone(), + #[cfg(feature = "reliable_report")] + reliability: Arc::new(PushReliability::new(&None, db.clone()).unwrap()), } } @@ -607,7 +631,7 @@ mod tests { let router = make_router(client, db); let notification = make_notification(default_router_data(), None, RouterType::APNS); - let result = router.route_notification(¬ification).await; + let result = router.route_notification(notification).await; assert!(result.is_ok(), "result = {result:?}"); assert_eq!( result.unwrap(), @@ -646,7 +670,7 @@ mod tests { let data = "test-data".to_string(); let notification = make_notification(default_router_data(), Some(data), RouterType::APNS); - let result = router.route_notification(¬ification).await; + let result = router.route_notification(notification).await; assert!(result.is_ok(), "result = {result:?}"); assert_eq!( result.unwrap(), @@ -668,7 +692,7 @@ mod tests { ); let notification = make_notification(router_data, None, RouterType::APNS); - let result = router.route_notification(¬ification).await; + let result = router.route_notification(notification).await; assert!(result.is_err()); assert!( matches!( @@ -701,7 +725,7 @@ mod tests { .return_once(|_| Ok(())); let router = make_router(client, db.into_boxed_arc()); - let result = router.route_notification(¬ification).await; + let result = router.route_notification(notification).await; assert!(result.is_err()); assert!( matches!( @@ -729,7 +753,7 @@ mod tests { let router = make_router(client, db); let notification = make_notification(default_router_data(), None, RouterType::APNS); - let result = router.route_notification(¬ification).await; + let result = router.route_notification(notification).await; assert!(result.is_err()); assert!( matches!( @@ -762,7 +786,7 @@ mod tests { ); let notification = make_notification(router_data, None, RouterType::APNS); - let result = router.route_notification(¬ification).await; + let result = router.route_notification(notification).await; assert!(result.is_err()); assert!( matches!( diff --git a/autoendpoint/src/routers/common.rs b/autoendpoint/src/routers/common.rs index 3a0cf49fd..fe8393aa6 100644 --- a/autoendpoint/src/routers/common.rs +++ b/autoendpoint/src/routers/common.rs @@ -23,7 +23,8 @@ pub fn build_message_data(notification: &Notification) -> ApiResult, /// A map from application ID to an authenticated FCM client clients: HashMap, + #[cfg(feature = "reliable_report")] + reliability: Arc, } impl FcmRouter { @@ -34,6 +38,7 @@ impl FcmRouter { http: reqwest::Client, metrics: Arc, db: Box, + #[cfg(feature = "reliable_report")] reliability: Arc, ) -> Result { let server_credentials = settings.credentials()?; let clients = Self::create_clients(&settings, server_credentials, http.clone()) @@ -45,6 +50,8 @@ impl FcmRouter { metrics, db, clients, + #[cfg(feature = "reliable_report")] + reliability, }) } @@ -137,7 +144,11 @@ impl Router for FcmRouter { Ok(router_data) } - async fn route_notification(&self, notification: &Notification) -> ApiResult { + #[allow(unused_mut)] + async fn route_notification( + &self, + mut notification: Notification, + ) -> ApiResult { debug!( "Sending FCM notification to UAID {}", notification.subscription.user.uaid @@ -162,7 +173,7 @@ impl Router for FcmRouter { .get(&app_id) .ok_or_else(|| FcmError::InvalidAppId(app_id.clone()))?; - let message_data = build_message_data(notification)?; + let message_data = build_message_data(¬ification)?; let platform = "fcmv1"; trace!("Sending message to {platform}: [{:?}]", &app_id); if let Err(e) = client.send(message_data, routing_token, ttl).await { @@ -177,7 +188,15 @@ impl Router for FcmRouter { ) .await); }; - incr_success_metrics(&self.metrics, platform, &app_id, notification); + incr_success_metrics(&self.metrics, platform, &app_id, ¬ification); + #[cfg(feature = "reliable_report")] + // Record that we've sent the message out to FCM. + // We can't set the state here because the notification isn't + // mutable, but we are also essentially consuming the + // notification nothing else should modify it. + notification + .record_reliability(&self.reliability, ReliabilityState::Transmitted) + .await; // Sent successfully, update metrics and make response trace!("Send request was successful"); @@ -207,6 +226,8 @@ mod tests { use crate::routers::{Router, RouterResponse}; use autopush_common::db::client::DbClient; use autopush_common::db::mock::MockDbClient; + #[cfg(feature = "reliable_report")] + use autopush_common::reliability::PushReliability; use std::sync::Arc; use cadence::StatsdClient; @@ -244,7 +265,9 @@ mod tests { Url::parse("http://localhost:8080/").unwrap(), reqwest::Client::new(), Arc::new(StatsdClient::from_sink("autopush", cadence::NopMetricSink)), - db, + db.clone(), + #[cfg(feature = "reliable_report")] + Arc::new(PushReliability::new(&None, db.clone()).unwrap()), ) .await .unwrap() @@ -291,7 +314,7 @@ mod tests { .create(); let notification = make_notification(default_router_data(), None, RouterType::FCM); - let result = router.route_notification(¬ification).await; + let result = router.route_notification(notification).await; assert!(result.is_ok(), "result = {result:?}"); assert_eq!( result.unwrap(), @@ -335,7 +358,7 @@ mod tests { let data = "test-data".to_string(); let notification = make_notification(default_router_data(), Some(data), RouterType::FCM); - let result = router.route_notification(¬ification).await; + let result = router.route_notification(notification).await; assert!(result.is_ok(), "result = {result:?}"); assert_eq!( result.unwrap(), @@ -366,7 +389,7 @@ mod tests { ); let notification = make_notification(router_data, None, RouterType::FCM); - let result = router.route_notification(¬ification).await; + let result = router.route_notification(notification).await; assert!(result.is_err()); assert!( matches!( @@ -405,7 +428,7 @@ mod tests { .create_async() .await; - let result = router.route_notification(¬ification).await; + let result = router.route_notification(notification).await; assert!(result.is_err()); assert!( matches!( diff --git a/autoendpoint/src/routers/mod.rs b/autoendpoint/src/routers/mod.rs index 9fd6ce48d..310995bad 100644 --- a/autoendpoint/src/routers/mod.rs +++ b/autoendpoint/src/routers/mod.rs @@ -35,7 +35,7 @@ pub trait Router { ) -> Result, RouterError>; /// Route a notification to the user - async fn route_notification(&self, notification: &Notification) -> ApiResult; + async fn route_notification(&self, notification: Notification) -> ApiResult; } /// The response returned when a router routes a notification diff --git a/autoendpoint/src/routers/stub/router.rs b/autoendpoint/src/routers/stub/router.rs index 94fbf9f2c..fcc6b5392 100644 --- a/autoendpoint/src/routers/stub/router.rs +++ b/autoendpoint/src/routers/stub/router.rs @@ -89,7 +89,7 @@ impl Router for StubRouter { Ok(router_data) } - async fn route_notification(&self, notification: &Notification) -> ApiResult { + async fn route_notification(&self, notification: Notification) -> ApiResult { debug!( "Sending Test notification to UAID {}", notification.subscription.user.uaid diff --git a/autoendpoint/src/routers/webpush.rs b/autoendpoint/src/routers/webpush.rs index bdcd915c5..bd06355f2 100644 --- a/autoendpoint/src/routers/webpush.rs +++ b/autoendpoint/src/routers/webpush.rs @@ -1,4 +1,6 @@ use async_trait::async_trait; +#[cfg(feature = "reliable_report")] +use autopush_common::reliability::PushReliability; use cadence::{Counted, CountedExt, StatsdClient, Timed}; use reqwest::{Response, StatusCode}; use serde_json::Value; @@ -24,6 +26,8 @@ pub struct WebPushRouter { pub metrics: Arc, pub http: reqwest::Client, pub endpoint_url: Url, + #[cfg(feature = "reliable_report")] + pub reliability: Arc, } #[async_trait(?Send)] @@ -37,12 +41,17 @@ impl Router for WebPushRouter { Ok(HashMap::new()) } - async fn route_notification(&self, notification: &Notification) -> ApiResult { + async fn route_notification( + &self, + mut notification: Notification, + ) -> ApiResult { // The notification contains the original subscription information - let user = ¬ification.subscription.user; + let user = ¬ification.subscription.user.clone(); + // A clone of the notification used only for the responses + // The canonical Notification is consumed by the various functions. debug!( - "✉ Routing WebPush notification to UAID {}", - notification.subscription.user.uaid + "✉ Routing WebPush notification to UAID {} :: {:?}", + notification.subscription.user.uaid, notification.subscription.reliability_id, ); trace!("✉ Notification = {:?}", notification); @@ -53,16 +62,23 @@ impl Router for WebPushRouter { &node_id ); - // Try to send the notification to the node - match self.send_notification(notification, node_id).await { + #[cfg(feature = "reliable_report")] + let revert_state = notification.reliable_state; + #[cfg(feature = "reliable_report")] + notification + .record_reliability( + &self.reliability, + autopush_common::reliability::ReliabilityState::IntTransmitted, + ) + .await; + match self.send_notification(¬ification, node_id).await { Ok(response) => { // The node might be busy, make sure it accepted the notification if response.status() == 200 { // The node has received the notification trace!("✉ Node received notification"); - return Ok(self.make_delivered_response(notification)); + return Ok(self.make_delivered_response(¬ification)); } - trace!( "✉ Node did not receive the notification, response = {:?}", response @@ -81,6 +97,20 @@ impl Router for WebPushRouter { self.remove_node_id(user, node_id).await? } } + + #[cfg(feature = "reliable_report")] + // Couldn't send the message! So revert to the prior state if we have one + if let Some(revert_state) = revert_state { + trace!( + "🔎 Revert {:?} from {:?} to {:?}", + ¬ification.reliability_id, + ¬ification.reliable_state, + revert_state + ); + notification + .record_reliability(&self.reliability, revert_state) + .await; + } } if notification.headers.ttl == 0 { @@ -94,12 +124,19 @@ impl Router for WebPushRouter { // TODO: include `internal` if meta is set. .with_tag("topic", &topic) .send(); - return Ok(self.make_delivered_response(notification)); + #[cfg(feature = "reliable_report")] + notification + .record_reliability( + &self.reliability, + autopush_common::reliability::ReliabilityState::Expired, + ) + .await; + return Ok(self.make_delivered_response(¬ification)); } // Save notification, node is not present or busy trace!("✉ Node is not present or busy, storing notification"); - self.store_notification(notification).await?; + self.store_notification(&mut notification).await?; // Retrieve the user data again, they may have reconnected or the node // is no longer busy. @@ -115,7 +152,7 @@ impl Router for WebPushRouter { Err(e) => { // Database error, but we already stored the message so it's ok debug!("✉ Database error while re-fetching user: {}", e); - return Ok(self.make_stored_response(notification)); + return Ok(self.make_stored_response(¬ification)); } }; @@ -125,7 +162,7 @@ impl Router for WebPushRouter { // The user is not connected to a node, nothing more to do None => { trace!("✉ User is not connected to a node, returning stored response"); - return Ok(self.make_stored_response(notification)); + return Ok(self.make_stored_response(¬ification)); } }; @@ -146,17 +183,17 @@ impl Router for WebPushRouter { .with_tag("app_id", "direct") .send(); - Ok(self.make_delivered_response(notification)) + Ok(self.make_delivered_response(¬ification)) } else { trace!("✉ Node has not delivered the message, returning stored response"); - Ok(self.make_stored_response(notification)) + Ok(self.make_stored_response(¬ification)) } } Err(error) => { // Can't communicate with the node, attempt to stop using it debug!("✉ Error while triggering notification check: {}", error); self.remove_node_id(&user, node_id).await?; - Ok(self.make_stored_response(notification)) + Ok(self.make_stored_response(¬ification)) } } } @@ -176,16 +213,23 @@ impl WebPushRouter { err } - /// Send the notification to the node + /// Consume and send the notification to the node async fn send_notification( &self, notification: &Notification, node_id: &str, ) -> ApiResult { let url = format!("{}/push/{}", node_id, notification.subscription.user.uaid); - let notification = notification.serialize_for_delivery()?; - Ok(self.http.put(&url).json(¬ification).send().await?) + let notification_out = notification.serialize_for_delivery()?; + + trace!( + "âĐ out: Notification: {}, channel_id: {} :: {:?}", + ¬ification.subscription.user.uaid, + ¬ification.subscription.channel_id, + ¬ification_out, + ); + Ok(self.http.put(&url).json(¬ification_out).send().await?) } /// Notify the node to check for notifications for the user @@ -200,8 +244,9 @@ impl WebPushRouter { } /// Store a notification in the database - async fn store_notification(&self, notification: &Notification) -> ApiResult<()> { - self.db + async fn store_notification(&self, notification: &mut Notification) -> ApiResult<()> { + let result = self + .db .save_message( ¬ification.subscription.user.uaid, notification.clone().into(), @@ -223,7 +268,15 @@ impl WebPushRouter { )), notification.subscription.vapid.clone(), ) - }) + }); + #[cfg(feature = "reliable_report")] + notification + .record_reliability( + &self.reliability, + autopush_common::reliability::ReliabilityState::Stored, + ) + .await; + result } /// Remove the node ID from a user. This is done if the user is no longer @@ -296,22 +349,27 @@ mod test { use crate::extractors::subscription::tests::{make_vapid, PUB_KEY}; use crate::headers::vapid::VapidClaims; use autopush_common::errors::ReportableError; + #[cfg(feature = "reliable_report")] + use autopush_common::reliability::PushReliability; use super::*; use autopush_common::db::mock::MockDbClient; fn make_router(db: Box) -> WebPushRouter { WebPushRouter { - db, + db: db.clone(), metrics: Arc::new(StatsdClient::from_sink("autopush", cadence::NopMetricSink)), http: reqwest::Client::new(), endpoint_url: Url::parse("http://localhost:8080/").unwrap(), + #[cfg(feature = "reliable_report")] + reliability: Arc::new(PushReliability::new(&None, db).unwrap()), } } #[tokio::test] async fn pass_extras() { - let router = make_router(Box::new(MockDbClient::new())); + let db = MockDbClient::new().into_boxed_arc(); + let router = make_router(db); let sub = "foo@example.com"; let vapid = make_vapid( sub, diff --git a/autoendpoint/src/routes/mod.rs b/autoendpoint/src/routes/mod.rs index 76e10aeda..0cd8ad202 100644 --- a/autoendpoint/src/routes/mod.rs +++ b/autoendpoint/src/routes/mod.rs @@ -1,3 +1,5 @@ pub mod health; pub mod registration; +#[cfg(feature = "reliable_report")] +pub mod reliability; pub mod webpush; diff --git a/autoendpoint/src/routes/reliability.rs b/autoendpoint/src/routes/reliability.rs new file mode 100644 index 000000000..27bc4824a --- /dev/null +++ b/autoendpoint/src/routes/reliability.rs @@ -0,0 +1,28 @@ +use actix_web::{web::Data, HttpResponse}; +use serde_json::json; + +use crate::server::AppState; + +pub async fn report_handler(app_state: Data) -> HttpResponse { + let reliability = app_state.reliability.clone(); + match reliability.report().await { + Ok(Some(v)) => { + debug!("🔍 Reporting {:?}", &v); + HttpResponse::Ok() + .content_type("application/json") + .body(json!(v).to_string()) + } + Ok(None) => { + debug!("🔍 Reporting, but nothing to report"); + HttpResponse::Ok() + .content_type("application/json") + .body(json!({"error": "No data"}).to_string()) + } + Err(e) => { + debug!("🔍ðŸŸĨ Reporting, Error {:?}", &e); + HttpResponse::InternalServerError() + .content_type("application/json") + .body(json!({"error": e.to_string()}).to_string()) + } + } +} diff --git a/autoendpoint/src/routes/webpush.rs b/autoendpoint/src/routes/webpush.rs index 44f0a2d8b..161bcd55c 100644 --- a/autoendpoint/src/routes/webpush.rs +++ b/autoendpoint/src/routes/webpush.rs @@ -9,12 +9,12 @@ use actix_web::web::Data; use actix_web::HttpResponse; /// Handle the `POST /wpush/{api_version}/{token}` and `POST /wpush/{token}` routes +/// This is the endpoint for all incoming Push subscription updates. pub async fn webpush_route( notification: Notification, routers: Routers, _app_state: Data, ) -> ApiResult { - // TODO: sentry::configure_scope(|scope| { scope.set_extra( "uaid", @@ -25,7 +25,7 @@ pub async fn webpush_route( RouterType::from_str(¬ification.subscription.user.router_type) .map_err(|_| ApiErrorKind::InvalidRouterType)?, ); - Ok(router.route_notification(¬ification).await?.into()) + Ok(router.route_notification(notification).await?.into()) } /// Handle the `DELETE /m/{message_id}` route diff --git a/autoendpoint/src/server.rs b/autoendpoint/src/server.rs index 864449510..25c2b5702 100644 --- a/autoendpoint/src/server.rs +++ b/autoendpoint/src/server.rs @@ -13,11 +13,14 @@ use serde_json::json; #[cfg(feature = "bigtable")] use autopush_common::db::bigtable::BigTableClientImpl; +#[cfg(feature = "reliable_report")] +use autopush_common::reliability::PushReliability; use autopush_common::{ db::{client::DbClient, spawn_pool_periodic_reporter, DbSettings, StorageType}, middleware::sentry::SentryWrapper, }; +use crate::error::{ApiError, ApiErrorKind, ApiResult}; use crate::metrics; #[cfg(feature = "stub")] use crate::routers::stub::router::StubRouter; @@ -31,10 +34,7 @@ use crate::routes::{ webpush::{delete_notification_route, webpush_route}, }; use crate::settings::Settings; -use crate::{ - error::{ApiError, ApiErrorKind, ApiResult}, - settings::VapidTracker, -}; +use crate::settings::VapidTracker; #[derive(Clone)] pub struct AppState { @@ -48,7 +48,9 @@ pub struct AppState { pub apns_router: Arc, #[cfg(feature = "stub")] pub stub_router: Arc, - pub vapid_tracker: Arc, + #[cfg(feature = "reliable_report")] + pub reliability: Arc, + pub reliability_filter: VapidTracker, } pub struct Server; @@ -59,6 +61,7 @@ impl Server { let bind_address = format!("{}:{}", settings.host, settings.port); let fernet = settings.make_fernet(); let endpoint_url = settings.endpoint_url(); + let reliability_filter = VapidTracker(settings.tracking_keys()); let db_settings = DbSettings { dsn: settings.db_dsn.clone(), db_settings: if settings.db_settings.is_empty() { @@ -85,6 +88,12 @@ impl Server { .into()); } }; + #[cfg(feature = "reliable_report")] + let reliability = Arc::new( + PushReliability::new(&settings.reliability_dsn, db.clone()).map_err(|e| { + ApiErrorKind::General(format!("Could not initialize Reliability Report: {:?}", e)) + })?, + ); let http = reqwest::ClientBuilder::new() .connect_timeout(Duration::from_millis(settings.connection_timeout_millis)) .timeout(Duration::from_millis(settings.request_timeout_millis)) @@ -97,6 +106,8 @@ impl Server { http.clone(), metrics.clone(), db.clone(), + #[cfg(feature = "reliable_report")] + reliability.clone(), ) .await?, ); @@ -106,10 +117,11 @@ impl Server { endpoint_url.clone(), metrics.clone(), db.clone(), + #[cfg(feature = "reliable_report")] + reliability.clone(), ) .await?, ); - let vapid_tracker = Arc::new(VapidTracker(settings.tracking_keys())); #[cfg(feature = "stub")] let stub_router = Arc::new(StubRouter::new(settings.stub.clone())?); let app_state = AppState { @@ -122,7 +134,9 @@ impl Server { apns_router, #[cfg(feature = "stub")] stub_router, - vapid_tracker, + #[cfg(feature = "reliable_report")] + reliability, + reliability_filter, }; spawn_pool_periodic_reporter( @@ -143,7 +157,7 @@ impl Server { actix_web::http::Method::PUT, ]) .max_age(3600); - App::new() + let app = App::new() // Actix 4 recommends wrapping structures wtih web::Data (internally an Arc) .app_data(Data::new(app_state.clone())) // Extractor configuration @@ -196,7 +210,13 @@ impl Server { // Dockerflow .service(web::resource("/__heartbeat__").route(web::get().to(health_route))) .service(web::resource("/__lbheartbeat__").route(web::get().to(lb_heartbeat_route))) - .service(web::resource("/__version__").route(web::get().to(version_route))) + .service(web::resource("/__version__").route(web::get().to(version_route))); + #[cfg(feature = "reliable_report")] + let app = app.service( + web::resource("/__milestones__") + .route(web::get().to(crate::routes::reliability::report_handler)), + ); + app }) .bind(bind_address)? .run(); diff --git a/autoendpoint/src/settings.rs b/autoendpoint/src/settings.rs index 9d5997185..b374fb803 100644 --- a/autoendpoint/src/settings.rs +++ b/autoendpoint/src/settings.rs @@ -54,7 +54,14 @@ pub struct Settings { pub fcm: FcmSettings, pub apns: ApnsSettings, #[cfg(feature = "stub")] + /// "Stub" is a predictable Mock bridge that allows us to "send" data and return an expected + /// result. pub stub: StubSettings, + #[cfg(feature = "reliable_report")] + /// The DNS for the reliability data store. This is normally a Redis compatible + /// storage system. See [Connection Parameters](https://docs.rs/redis/latest/redis/#connection-parameters) + /// for details. + pub reliability_dsn: Option, } impl Default for Settings { @@ -86,6 +93,8 @@ impl Default for Settings { apns: ApnsSettings::default(), #[cfg(feature = "stub")] stub: StubSettings::default(), + #[cfg(feature = "reliable_report")] + reliability_dsn: None, } } } diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index 053681e2d..339f067da 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -60,6 +60,8 @@ grpcio-sys = { version = "=0.13.0", optional = true } protobuf = { version = "=2.28.0", optional = true } # grpcio does not support protobuf 3+ form_urlencoded = { version = "1.2", optional = true } +redis = { version = "0.27", optional = true } + [dev-dependencies] mockito = "0.31" tempfile = "3.2.0" @@ -80,3 +82,4 @@ bigtable = [ emulator = [ "bigtable", ] # used for testing big table, requires an external bigtable emulator running. +reliable_report = ["dep:redis"] diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 3cf7297fe..15304d45f 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -51,6 +51,10 @@ pub type FamilyId = String; const ROUTER_FAMILY: &str = "router"; const MESSAGE_FAMILY: &str = "message"; // The default family for messages const MESSAGE_TOPIC_FAMILY: &str = "message_topic"; +#[cfg(feature = "reliable_report")] +const RELIABLE_LOG_FAMILY: &str = "reliability"; +#[cfg(feature = "reliable_report")] +const RELIABLE_LOG_TTL: u64 = crate::db::MAX_NOTIFICATION_TTL * 2; pub(crate) const RETRY_COUNT: usize = 5; @@ -751,6 +755,26 @@ impl BigTableClientImpl { if let Some(cell) = row.take_cell("data") { notif.data = Some(to_string(cell.value, "data")?); } + #[cfg(feature = "reliable_report")] + { + if let Some(cell) = row.take_cell("reliability_id") { + notif.reliability_id = Some(to_string(cell.value, "reliability_id")?); + } + if let Some(cell) = row.take_cell("reliable_state") { + notif.reliable_state = Some( + crate::reliability::ReliabilityState::from_str(&to_string( + cell.value, + "reliable_state", + )?) + .map_err(|e| { + DbError::DeserializeString(format!( + "Could not parse reliable_state {:?}", + e + )) + })?, + ); + } + } if let Some(cell) = row.take_cell("headers") { notif.headers = Some( serde_json::from_str::>(&to_string(cell.value, "headers")?) @@ -1149,6 +1173,7 @@ impl DbClient for BigTableClientImpl { // Remember, `timestamp` is effectively the time to kill the message, not the // current time. + // TODO: use message.expiry() let expiry = SystemTime::now() + Duration::from_secs(message.ttl); trace!( "🉑 Message Expiry {}", @@ -1196,6 +1221,26 @@ impl DbClient for BigTableClientImpl { }); } } + #[cfg(feature = "reliable_report")] + { + if let Some(reliability_id) = message.reliability_id { + trace!("🔍 FOUND RELIABILITY ID: {}", reliability_id); + cells.push(cell::Cell { + qualifier: "reliability_id".to_owned(), + value: reliability_id.into_bytes(), + timestamp: expiry, + ..Default::default() + }); + } + if let Some(reliable_state) = message.reliable_state { + cells.push(cell::Cell { + qualifier: "reliable_state".to_owned(), + value: reliable_state.to_string().into_bytes(), + timestamp: expiry, + ..Default::default() + }); + } + } if let Some(data) = message.data { cells.push(cell::Cell { qualifier: "data".to_owned(), @@ -1205,14 +1250,6 @@ impl DbClient for BigTableClientImpl { }); } - if let Some(reliability_id) = message.reliability_id { - cells.push(cell::Cell { - qualifier: "reliability_id".to_owned(), - value: reliability_id.into_bytes(), - timestamp: expiry, - ..Default::default() - }); - } row.add_cells(family, cells); trace!("🉑 Adding row"); self.write_row(row).await?; @@ -1431,6 +1468,32 @@ impl DbClient for BigTableClientImpl { Ok(true) } + #[cfg(feature = "reliable_report")] + async fn log_report( + &self, + reliability_id: &str, + new_state: crate::reliability::ReliabilityState, + ) -> DbResult<()> { + let row_key = reliability_id.to_owned(); + + let mut row = Row::new(row_key); + let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL); + + // Log the latest transition time for this id. + let cells: Vec = vec![cell::Cell { + qualifier: new_state.to_string(), + value: crate::util::ms_since_epoch().to_be_bytes().to_vec(), + timestamp: expiry, + ..Default::default() + }]; + + row.add_cells(RELIABLE_LOG_FAMILY, cells); + + self.write_row(row).await?; + + Ok(()) + } + fn box_clone(&self) -> Box { Box::new(self.clone()) } diff --git a/autopush-common/src/db/client.rs b/autopush-common/src/db/client.rs index f447b26ea..bac32d32a 100644 --- a/autopush-common/src/db/client.rs +++ b/autopush-common/src/db/client.rs @@ -107,6 +107,14 @@ pub trait DbClient: Send + Sync { None } + /// Record the Reliability Report to long term storage. + #[cfg(feature = "reliable_report")] + async fn log_report( + &self, + reliability_id: &str, + state: crate::reliability::ReliabilityState, + ) -> DbResult<()>; + fn box_clone(&self) -> Box; } diff --git a/autopush-common/src/db/mock.rs b/autopush-common/src/db/mock.rs index a3d5db7b6..cdad4ddd2 100644 --- a/autopush-common/src/db/mock.rs +++ b/autopush-common/src/db/mock.rs @@ -95,6 +95,15 @@ impl DbClient for Arc { Arc::as_ref(self).remove_message(uaid, sort_key).await } + #[cfg(feature = "reliable_report")] + async fn log_report( + &self, + reliability_id: &str, + state: crate::reliability::ReliabilityState, + ) -> DbResult<()> { + Arc::as_ref(self).log_report(reliability_id, state).await + } + async fn router_table_exists(&self) -> DbResult { Arc::as_ref(self).router_table_exists().await } diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 70a2f0254..e265d68ea 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -338,6 +338,8 @@ impl NotificationRecord { headers: self.headers.map(|m| m.into()), sortkey_timestamp: key.sortkey_timestamp, reliability_id: None, + #[cfg(feature = "reliable_report")] + reliable_state: None, }) } diff --git a/autopush-common/src/lib.rs b/autopush-common/src/lib.rs index f59243981..3ef611afb 100644 --- a/autopush-common/src/lib.rs +++ b/autopush-common/src/lib.rs @@ -13,6 +13,8 @@ pub mod logging; pub mod metrics; pub mod middleware; pub mod notification; +#[cfg(feature = "reliable_report")] +pub mod reliability; pub mod sentry; pub mod tags; pub mod test_support; diff --git a/autopush-common/src/notification.rs b/autopush-common/src/notification.rs index ecbb7f423..043702396 100644 --- a/autopush-common/src/notification.rs +++ b/autopush-common/src/notification.rs @@ -29,6 +29,8 @@ pub struct Notification { pub headers: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub reliability_id: Option, + #[cfg(feature = "reliable_report")] + pub reliable_state: Option, } pub const TOPIC_NOTIFICATION_PREFIX: &str = "01"; @@ -66,10 +68,30 @@ impl Notification { } } + pub fn expiry(&self) -> u64 { + self.timestamp + self.ttl + } + /// Convenience function to determine if the notification /// has aged out. pub fn expired(&self, at_sec: u64) -> bool { - at_sec >= self.timestamp + self.ttl + at_sec >= self.expiry() + } + + #[cfg(feature = "reliable_report")] + pub async fn record_reliability( + &mut self, + reliability: &crate::reliability::PushReliability, + state: crate::reliability::ReliabilityState, + ) { + self.reliable_state = reliability + .record( + &self.reliability_id, + state, + &self.reliable_state, + Some(self.expiry()), + ) + .await; } } diff --git a/autopush-common/src/reliability.rs b/autopush-common/src/reliability.rs new file mode 100644 index 000000000..b3e04d164 --- /dev/null +++ b/autopush-common/src/reliability.rs @@ -0,0 +1,181 @@ +/// Push Reliability Recorder +/// +/// This allows us to track messages from select, known parties (currently, just +/// mozilla generated and consumed) so that we can identify potential trouble spots +/// and where messages expire early. Message expiration can lead to message loss +use std::collections::HashMap; +use std::sync::Arc; + +use redis::Commands; + +use crate::db::client::DbClient; +use crate::errors::{ApcError, ApcErrorKind, Result}; + +pub const COUNTS: &str = "state_counts"; +pub const EXPIRY: &str = "expiry"; + +/// The various states that a message may transit on the way from reception to delivery. +// Note: "Message" in this context refers to the Subscription Update. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Deserialize)] +pub enum ReliabilityState { + #[serde(rename = "received")] + Received, // Message was received by the Push Server + #[serde(rename = "stored")] + Stored, // Message was stored because it could not be delivered immediately + #[serde(rename = "retrieved")] + Retrieved, // Message was taken from storage for delivery + #[serde(rename = "transmitted_webpush")] + IntTransmitted, // Message was handed off between autoendpoint and autoconnect + #[serde(rename = "accepted_webpush")] + IntAccepted, // Message was accepted by autoconnect from autopendpoint + #[serde(rename = "transmitted")] + Transmitted, // Message was handed off for delivery to the UA + #[serde(rename = "accepted")] + Accepted, // Message was accepted for delivery by the UA + #[serde(rename = "delivered")] + Delivered, // Message was provided to the WebApp recipient by the UA + #[serde(rename = "expired")] + Expired, // Message expired naturally (e.g. TTL=0) +} + +// TODO: Differentiate between "transmitted via webpush" and "transmitted via bridge"? +impl std::fmt::Display for ReliabilityState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Self::Received => "received", + Self::Stored => "stored", + Self::Retrieved => "retrieved", + Self::Transmitted => "transmitted", + Self::IntTransmitted => "transmitted_webpush", + Self::IntAccepted => "accepted_webpush", + Self::Accepted => "accepted", + Self::Delivered => "delivered", + Self::Expired => "expired", + }) + } +} + +impl std::str::FromStr for ReliabilityState { + type Err = ApcError; + + fn from_str(s: &str) -> std::result::Result { + Ok(match s.to_lowercase().as_str() { + "received" => Self::Received, + "stored" => Self::Stored, + "retrieved" => Self::Retrieved, + "transmitted" => Self::Transmitted, + "accepted" => Self::Accepted, + "transmitted_webpush" => Self::IntTransmitted, + "accepted_webpush" => Self::IntAccepted, + "delivered" => Self::Delivered, + "expired" => Self::Expired, + _ => { + return Err( + ApcErrorKind::GeneralError(format!("Unknown tracker state \"{}\"", s)).into(), + ); + } + }) + } +} + +impl serde::Serialize for ReliabilityState { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +#[derive(Clone)] +pub struct PushReliability { + client: Option>, + db: Box, +} + +impl PushReliability { + // Do the magic to make a report instance, whatever that will be. + pub fn new(reliability_dsn: &Option, db: Box) -> Result { + if reliability_dsn.is_none() { + debug!("🔍 No reliability DSN declared."); + return Ok(Self { + client: None, + db: db.clone(), + }); + }; + + let client = if let Some(dsn) = reliability_dsn { + let rclient = redis::Client::open(dsn.clone()).map_err(|e| { + ApcErrorKind::GeneralError(format!("Could not connect to redis server: {:?}", e)) + })?; + Some(Arc::new(rclient)) + } else { + None + }; + + Ok(Self { + client, + db: db.clone(), + }) + } + + // Record the record state change to storage. + pub async fn record( + &self, + reliability_id: &Option, + new: ReliabilityState, + old: &Option, + expr: Option, + ) -> Option { + let Some(id) = reliability_id else { + return None; + }; + if let Some(client) = &self.client { + debug!( + "🔍 {} from {} to {}", + id, + old.map(|v| v.to_string()) + .unwrap_or_else(|| "None".to_owned()), + new + ); + if let Ok(mut con) = client.get_connection() { + let mut pipeline = redis::Pipeline::new(); + let pipeline = pipeline.hincr(COUNTS, new.to_string(), 1); + let pipeline = if let Some(old) = old { + pipeline + .hincr(COUNTS, old.to_string(), -1) + .zrem(EXPIRY, format!("{}#{}", &old, id)) + } else { + pipeline + }; + // Errors are not fatal, and should not impact message flow, but + // we should record them somewhere. + let _ = pipeline + .zadd(EXPIRY, format!("{}#{}", new, id), expr.unwrap_or_default()) + .exec(&mut con) + .inspect_err(|e| { + warn!("🔍 Failed to write to storage: {:?}", e); + }); + } + }; + // Errors are not fatal, and should not impact message flow, but + // we should record them somewhere. + let _ = self.db.log_report(id, new).await.inspect_err(|e| { + warn!("🔍 Unable to record reliability state: {:?}", e); + }); + Some(new) + } + + // Return a snapshot of milestone states + // This will probably not be called directly, but useful for debugging. + pub async fn report(&self) -> Result>> { + if let Some(client) = &self.client { + if let Ok(mut conn) = client.get_connection() { + return Ok(Some(conn.hgetall(COUNTS).map_err(|e| { + ApcErrorKind::GeneralError(format!("Could not read report {:?}", e)) + })?)); + } + } + Ok(None) + } +} diff --git a/docs/src/reliability.md b/docs/src/reliability.md new file mode 100644 index 000000000..6bab56650 --- /dev/null +++ b/docs/src/reliability.md @@ -0,0 +1,29 @@ +# Push Reliability Tracing + +AutoPush does a lot to ensure that messages and subscriptions stay private. There are many reasons for this, aside from simply respecting user privacy, which include the high cost of data capture and retention. Tracking this information can easily increase costs by thousands of dollars a month. + +Traditionally, AutoPush has relied on "external" tracking to determine how reliably it can deliver a message. "External" in this case, means a Mozilla internal group that tracks how "successful" a Send Tab to Device is. There are some complications with this, as that group has a different qualification for "success" than AutoPush does (delivery within a very short timeframe). Autopush should be able to determine if there are any points of loss internally without incurring the sizable costs for full tracking. + +This can be done via sampling, but it's important to only sample push messages that have agreed to this. Fortunately, we can use those same Mozilla generated and consumed push notification messages for "Send Tab". (The action of sending a tab is done by the user, but the use of the AutoPush system is an internal detail. It would be like tracking a dollar bill through a vending machine. We don't know or care who put the bill into the machine, nor do we care what they purchased. We are only watching to make sure the bill doesn't get stuck or fall out somewhere.) + +## Configuration + +In order to start the process, the AutoEndpoint configuration file gets a list of VAPID public keys to look for. The key format is not a standard PEM format, so the `convert_pem_to_x962.py` file is used to convert the public key format to something that's more scan-able. + +That key is registered using the `autoendpoint.tracking_keys` configuration setting. + +Push Reliability requires a Redis like memory storage system to manage the various milestone transactions. Milestones are tracked using two internal stores, the first being a Hash Incrementor (HINCR) "state_counts" table, which records the count of message at a given state. and an "expiry" table to record the expiration timestamp for the subscription. + +Push Reliablity also includes a bigtable `reliability` column family, which is used to create a long term record which can be used for more "in depth" analysis of a given message's path. + +## Operation + +If an incoming subscription is validated and contains a matching VAPID public key, then a `reliability_id` is assigned to the message. This ID is a random UUID that is attached to the message. All tracking is tied to this ID alone, and only performed if the `reliable_report` feature is enabled in the code. + +Various milestones have been set along a given messages path. These are defined in `autopush_common::reliability::PushReliabilityState`. When a message transitions from one state to the next, the new `state_count` is incremented, and the old `state_count` is decremented. In addition, the "expiry" table which is a scored hash table records the message expiration. The "expiry" table uses a combo of the "`state`#`reliability_id`" as the key. with the expiration timestamp as the value. This table is scanned by the separate `scripts/reliability_cron.py` script, which looks for expired keys, decrements their counts, and logs the expired status to bigtable. This script is required because there are no known data stores that provide this function automatically (recommendations are very welcome). The use of an outside script is to ensure that only a single application decrements values and logs data, preventing possible race conditions. + +"Live" counts can be displayed using any tool that can read the Redis data store, or the AutoEndpoint `/__milestones__` endpoint can be queried. Historical tracking is not part of this, and should be done using external tooling and the `reliability_cron.py` script. + +## Data Retention + +Push Subscpritions have a maximum lifetime of 30 days. The Redis Reliability tracking information will last as long as the message TTL. The Bigtable Reliability Log information will be retained for twice the maximum subscription lifetime, of 60 days to allow for some longer term trend analysis. (Note, this reliability information does not have any record of the user, subscription provider, or message content. It only includes the milestone, and timestamp when the message crossed that milestone.) diff --git a/docs/src/testing.md b/docs/src/testing.md index b5688caa2..1c9da86b8 100644 --- a/docs/src/testing.md +++ b/docs/src/testing.md @@ -85,18 +85,20 @@ $ pyenv activate push-312 ### Running Integration Tests To run the integration tests, simply run `make integration-tests-local` from your terminal at the root of the project. -You can alter the verbosity and logging output by adding command line flags to the `PYTEST_ARGS ?=` variable in the root project Makefile. For example, for greater verbosity and stdout printing, add `-vv -s`. +You can alter the verbosity and logging output by adding command line flags to the `PYTEST_ARGS ?=` variable in the root project Makefile. For example, for greater verbosity and stdout printing, add `-vv -s`. (Note: This may be unreliable due to several hand-offs during the +make / docker build process. For reliability, you may which to modify the `pyproject.toml`:`[tool.pytest.ini_options]` file to include the options.) The test output is then emitted in your terminal instance. This includes the name of the tests, whether they pass or fail and any exceptions that are triggered during the test run. The integration tests make use of [pytest markers][pytest_markers] for filtering tests. These can be -used with the `-m` pytest option, or can be used through the following environment variables and -`integration-test-local` make command. - -| ENVIRONMENT VARIABLE | RELATED MARKER | DESCRIPTION | -|----------------------|----------------|-------------------------------------------------------------------| -| SKIP_SENTRY | sentry | If set will exclude all tests marked with `sentry` from execution | -| TEST_STUB | stub | If set will include all tests marked with `stub` in execution | +used with the `-m` pytest option specified in the `pyproject.toml`:`[tool.pytest.ini_options]` file , or can be used through the +following environment variables and `integration-test-local` make command. + +| ENVIRONMENT VARIABLE | RELATED MARKER | DESCRIPTION | +|----------------------|-----------------|--------------------------------------------------------------------------| +| SKIP_SENTRY | sentry | If set will exclude all tests marked with `sentry` from execution | +| TEST_STUB | stub | If set will include all tests marked with `stub` in execution | +| TEST_RELIABILITY | reliable_report | If set will include all tests marked with `reliable_report` in execution | Integration tests in CI will be triggered automatically whenever a commit is pushed to a branch as a part of the CI PR workflow. diff --git a/scripts/reliablity_cron.py b/scripts/reliablity_cron.py new file mode 100644 index 000000000..a2d7f0ece --- /dev/null +++ b/scripts/reliablity_cron.py @@ -0,0 +1,258 @@ +#! python3 + +""" +This program reaps expired records and adjusts counts. + +Specifying "--nap=0" will cause this app to only run once. + +""" + +import argparse +import asyncio +import json +import logging +import os +import time +from typing import cast + +import redis +import statsd +import toml +from google.cloud.bigtable.data import ( + BigtableDataClientAsync, + RowMutationEntry, + SetCell, +) + + +class Counter: + """Manage Redis-like storage counts + + Current milestone counts are managed in a Redis-like storage system. + There are two parts required, one is the active milestone count (as + an HINCR). The other is a ZHash that contains the expiration + timestamp for records. + Our 'garbage collection' goes through the ZHash looking for expired + records and removes them while decrementing the associated HINCR count, + indicating that the record expired "in place". + + We also update the Bigtable message log indicating that a message + failed to be delivered. + """ + + def __init__(self, log: logging.Logger, settings: argparse.Namespace): + try: + self.redis = redis.Redis.from_url(settings.reliability_dsn) + self.bigtable = BigtableDataClientAsync( + project=settings.bigtable["project"] + ) + self.log = log + self.settings = settings + except Exception as e: + log.error(e) + + async def gc(self) -> dict[str, int | float]: + """Prune expired elements, decrementing counters and logging result""" + start = time.time() + # The table of counts + counts = self.settings.count_table + # The table of expirations + expiry = self.settings.expiry_table + # the BigTable reliability family + log_family = self.settings.log_family + + # Fetch the candidates to purge. + mutations = list() + purged = cast( + list[bytes], self.redis.zrange(expiry, -1, int(start), byscore=True) + ) + # Fix up the counts + with self.redis.pipeline() as pipeline: + for key in purged: + # clean up the counts. + parts = key.split(b"#", 2) + state = parts[0] + self.log.debug(f"ðŸŠĶ decr {state.decode()}") + pipeline.hincrby(counts, state.decode(), -1) + pipeline.zrem(expiry, key) + # and add the log info. + mutations.append( + RowMutationEntry( + key, SetCell(log_family, "expired", int(start * 1000)) + ) + ) + mutations.append( + RowMutationEntry( + key, + SetCell( + log_family, + "error", + "expired", + ), + ) + ) + if len(purged) > 0: + # make the changes to redis, + pipeline.execute() + # then add the bigtable logs + table = self.bigtable.get_table( + self.settings.bigtable.get("instance"), + self.settings.bigtable.get("table"), + ) + await table.bulk_mutate_rows(mutations) + + result = { + "trimmed": len(purged), + "time": int(start * 1000) - (time.time() * 1000), + } + if len(purged): + self.log.info( + f"ðŸŠĶ Trimmed {result.get("trimmed")} in {result.get("time")}ms" + ) + return result + + def counts(self) -> dict[str, int]: + """Return the current milestone counts (this should happen shortly after a gc)""" + return cast(dict[str, int], self.redis.hgetall(self.settings.count_table)) + + +def record_metrics( + log: logging.Logger, settings: argparse.Namespace, counts: dict[str, int] +): + """Record the counts to metrics""" + log.info(f"📈 Recording metrics: {counts}") + for label, count in counts.items(): + cast(statsd.StatsClient, settings.metric).gauge(label, count) + + +def config(env_args: os._Environ = os.environ) -> argparse.Namespace: + """Read the configuration from the args and environment.""" + parser = argparse.ArgumentParser( + description="Manage Autopush Reliability Tracking Redis data." + ) + parser.add_argument("-c", "--config", help="configuration_file", action="append") + parser.add_argument( + "--reliability_dsn", + "-r", + help="DSN to connect to the Redis like service.", + default=env_args.get( + "AUTOEND_RELIABILITY_DSN", env_args.get("AUTOCONNECT_RELIABILITY_DSN") + ), + ) + parser.add_argument( + "--db_dsn", + "-b", + help="User Agent ID", + default=env_args.get("AUTOEND_DB_DSN", env_args.get("AUTOCONNECT_DB_DSN")), + ) + parser.add_argument( + "--db_settings", + "-s", + help="User Agent ID", + default=env_args.get( + "AUTOEND_DB_SETTINGS", env_args.get("AUTOCONNECT_DB_SETTINGS") + ), + ) + parser.add_argument( + "--count_table", + help="Name of Redis table of milestone counts", + default=env_args.get("AUTOTRACK_COUNTS", "state_counts"), + ) + parser.add_argument( + "--expiry_table", + help="Name of Redis table of milestone expirations", + default=env_args.get("AUTOTRACK_EXPIRY", "expiry"), + ) + parser.add_argument( + "--log_family", + help="Name of Bigtable log family", + default=env_args.get("AUTOTRACK_EXPIRY", "reliability"), + ) + parser.add_argument( + "--statsd_host", + help="Metric host name", + default=env_args.get( + "AUTOEND_STATSD_HOST", env_args.get("AUTOCONNECT_STATSD_HOST") + ), + ) + parser.add_argument( + "--statsd_port", + help="Metric host port", + default=env_args.get( + "AUTOEND_STATSD_HOST", env_args.get("AUTOCONNECT_STATSD_HOST", 8125) + ), + ) + parser.add_argument( + "--statsd_label", + help="Metric root namespace", + default=env_args.get( + "AUTOEND_STATSD_LABEL", + env_args.get("AUTOCONNECT_STATSD_LABEL", "autotrack"), + ), + ) + parser.add_argument( + "--nap", + help="seconds to nap between each gc cycle (smaller number is more accurate measurements)", + default=60, + ) + args = parser.parse_args() + + # if we have a config file, read from that and then reload. + if args.config is not None: + for filename in args.config: + with open(filename, "r") as f: + args = parser.set_defaults(**toml.load(f)) + args = parser.parse_args() + + # fixup the bigtable settings so that they're easier for this script to deal with. + if args.db_settings is not None: + bt_settings = json.loads(args.db_settings) + parts = bt_settings.get("table_name").split("/") + for i in range(0, len(parts), 2): + # remember: the `tablename` dsn uses plurals for + # `projects`, `instances`, & `tables` + bt_settings[parts[i].rstrip("s")] = parts[i + 1] + args.bigtable = bt_settings + + if args.statsd_host or args.statsd_port: + args.metrics = statsd.StatsClient( + args.statsd_host, args.statsd_port, prefix=args.statsd_label + ) + else: + args.metrics = None + + return args + + +def init_logs(): + """Initialize logging (based on `PYTHON_LOG` environ)""" + level = getattr(logging, os.environ.get("PYTHON_LOG", "INFO").upper(), None) + logging.basicConfig(level=level) + log = logging.getLogger("autotrack") + return log + + +async def amain(log: logging.Logger, settings: argparse.Namespace): + """Async main loop""" + counter = Counter(log, settings) + while True: + _result = await counter.gc() + record_metrics(log, settings, counter.counts()) + # TODO: adjust timing loop based on result time. + # Ideally, this would have a loop that it runs on that + # becomes tighter the more items were purged, and adjusts + # based on the time it took to run. + if settings.nap == 0: + return + time.sleep(settings.nap) + + +def main(): + """Configure and start async main loop""" + log = init_logs() + log.info("Starting up...") + asyncio.run(amain(log, config())) + + +if __name__ == "__main__": + main() diff --git a/scripts/setup_bt.sh b/scripts/setup_bt.sh index 938bb53c6..ff8f73b6f 100755 --- a/scripts/setup_bt.sh +++ b/scripts/setup_bt.sh @@ -12,11 +12,15 @@ TABLE_NAME=${TABLE_NAME:-"autopush"} MESSAGE_FAMILY=${MESSAGE_FAMILY:-"message"} MESSAGE_TOPIC_FAMILY=${MESSAGE_TOPIC_FAMILY:-"message_topic"} ROUTER_FAMILY=${ROUTER_FAMILY:-"router"} +RELIABILITY_FAMILY=${RELIABILITY_FAMILY:-"reliability"} cbt -project $PROJECT -instance $INSTANCE createtable $TABLE_NAME cbt -project $PROJECT -instance $INSTANCE createfamily $TABLE_NAME $MESSAGE_FAMILY cbt -project $PROJECT -instance $INSTANCE createfamily $TABLE_NAME $MESSAGE_TOPIC_FAMILY cbt -project $PROJECT -instance $INSTANCE createfamily $TABLE_NAME $ROUTER_FAMILY +cbt -project $PROJECT -instance $INSTANCE createfamily $TABLE_NAME $RELIABILITY_FAMILY + cbt -project $PROJECT -instance $INSTANCE setgcpolicy $TABLE_NAME $MESSAGE_FAMILY "maxage=1s or maxversions=1" cbt -project $PROJECT -instance $INSTANCE setgcpolicy $TABLE_NAME $MESSAGE_TOPIC_FAMILY "maxage=1s or maxversions=1" cbt -project $PROJECT -instance $INSTANCE setgcpolicy $TABLE_NAME $ROUTER_FAMILY maxversions=1 +cbt -project $PROJECT -instance $INSTANCE setgcpolicy $TABLE_NAME $RELIABILITY_FAMILY "maxage=1s or maxversions=1" diff --git a/tests/integration/Dockerfile b/tests/integration/Dockerfile index 939474be9..279f85a8f 100644 --- a/tests/integration/Dockerfile +++ b/tests/integration/Dockerfile @@ -24,13 +24,13 @@ FROM chef AS cacher COPY --from=planner /app/recipe.json recipe.json RUN \ - apt-get -qq update && \ - apt-get -qq install --no-install-recommends -y \ - cmake \ - libssl-dev \ - ca-certificates \ - pkg-config \ - build-essential + apt-get -qq update && \ + apt-get -qq install --no-install-recommends -y \ + cmake \ + libssl-dev \ + ca-certificates \ + pkg-config \ + build-essential RUN cargo chef cook --recipe-path recipe.json @@ -41,15 +41,15 @@ ARG APT_CACHE_BUST RUN mkdir -m 755 bin RUN apt-get -qq update && \ - apt-get -qq upgrade && apt-get -qq install --no-install-recommends -y \ - cmake \ - libssl-dev \ - ca-certificates \ - libstdc++6 \ - libstdc++-12-dev + apt-get -qq upgrade && apt-get -qq install --no-install-recommends -y \ + cmake \ + libssl-dev \ + ca-certificates \ + libstdc++6 \ + libstdc++-12-dev RUN cargo --version && \ - rustc --version + rustc --version COPY . . COPY --from=cacher /app/target target COPY --from=cacher $CARGO_HOME $CARGO_HOME @@ -68,17 +68,16 @@ ENV PYTHONUNBUFFERED=1 ENV PATH=$PATH:/root/.cargo/bin ENV PYTHON_VENV=/venv -ENV PYTEST_ARGS="" ENV RUST_LOG="autopush=debug,autopush_common=debug,autoendpoint=debug,autoconnect=debug,slog_mozlog_json=info,warn" ENV DB_DSN=grpc://localhost:8086 # Add gcc since there are no wheels for some packages for arm64/aarch64 # (g++/make for gevent on pypy) RUN apt-get update && apt install -y --no-install-recommends \ - git \ - build-essential \ - curl \ - gpg + git \ + build-essential \ + curl \ + gpg RUN python -m venv ${PYTHON_VENV} ENV PATH="${PYTHON_VENV}/bin:${PATH}" @@ -106,4 +105,5 @@ WORKDIR /code RUN chmod +x scripts/setup_bt.sh -CMD ["sh", "-c", "./scripts/setup_bt.sh && poetry run pytest tests/integration/test_integration_all_rust.py --junit-xml=integration_test_results.xml -v -m 'not stub' ${PYTEST_ARGS}"] +# Pytest markers are defined in the `pyproject.toml`:`[tool.pytest.ini_options]` file. +CMD ["sh", "-c", "./scripts/setup_bt.sh && poetry run pytest tests/integration/test_integration_all_rust.py --junit-xml=integration_test_results.xml -v"] diff --git a/tests/integration/test_integration_all_rust.py b/tests/integration/test_integration_all_rust.py index 8dde7d4c0..df0cdaf45 100644 --- a/tests/integration/test_integration_all_rust.py +++ b/tests/integration/test_integration_all_rust.py @@ -154,6 +154,11 @@ def base64url_encode(value: bytes | str) -> str: # new autoconnect db_dsn=os.environ.get("DB_DSN", "grpc://localhost:8086"), db_settings=get_db_settings(), + tracking_keys="[{}]".format( + base64.urlsafe_b64encode( + cast(ecdsa.VerifyingKey, TRACKING_KEY.get_verifying_key()).to_string() + ).decode() + ), ) """Connection Megaphone Config: @@ -190,6 +195,15 @@ def base64url_encode(value: bytes | str) -> str: tracking_keys=f"[{base64.urlsafe_b64encode((b"\4" + TRACKING_PUB_KEY.to_string())).decode()}]", ) +# Note: These are only used by the `reliable_report` feature, however, specifying them +# will trigger autopush to attempt to connect, and may fail if the Redis server is not present. +# Since pytest marks do not provide a way to examine mark states within the python executable, +# we have to rely on an environment variable. +# Once this feature goes stable, we can move these lines to the appropriate configurations. +if os.environ.get("RELIABLE_REPORT") is not None: + CONNECTION_CONFIG["reliability_dsn"] = "redis://localhost:6379" + ENDPOINT_CONFIG["reliability_dsn"] = "redis://localhost:6379" + def _get_vapid( key: ecdsa.SigningKey | None = None, @@ -800,6 +814,26 @@ async def test_basic_delivery_with_vapid( ) -> None: """Test delivery of a basic push message with a VAPID header.""" uuid_data: str = str(uuid.uuid4()) + # Since we are not explicity setting the TRACKING_KEY, we should not + # track this message. + vapid_info = _get_vapid(payload=vapid_payload) + result = await registered_test_client.send_notification(data=uuid_data, vapid=vapid_info) + # the following presumes that only `salt` is padded. + clean_header = registered_test_client._crypto_key.replace('"', "").rstrip("=") + assert result["headers"]["encryption"] == clean_header + assert result["data"] == base64url_encode(uuid_data) + assert result["messageType"] == ClientMessageType.NOTIFICATION.value + + +@pytest.mark.reliable_report +async def test_basic_delivery_with_vapid_reliable( + registered_test_client: AsyncPushTestClient, + vapid_payload: dict[str, int | str], +) -> None: + """Test delivery of a basic push message with a VAPID header.""" + uuid_data: str = str(uuid.uuid4()) + # Since we are not explicity setting the TRACKING_KEY, we should not + # track this message. vapid_info = _get_vapid(payload=vapid_payload) result = await registered_test_client.send_notification(data=uuid_data, vapid=vapid_info) # the following presumes that only `salt` is padded. @@ -809,15 +843,16 @@ async def test_basic_delivery_with_vapid( assert result["messageType"] == ClientMessageType.NOTIFICATION.value # The key we used should not have been registered, so no tracking should # be occurring. - log.debug(f"🔍 Reliability: {result.get("reliability_id")}") - assert result.get("reliability_id") is None + assert result.get("reliability_id") is None, "Tracking unknown message" +@pytest.mark.reliable_report async def test_basic_delivery_with_tracked_vapid( registered_test_client: AsyncPushTestClient, vapid_payload: dict[str, int | str], ) -> None: """Test delivery of a basic push message with a VAPID header.""" + # TODO: connect to test redis server and redis.flushall() uuid_data: str = str(uuid.uuid4()) vapid_info = _get_vapid(key=TRACKING_KEY, payload=vapid_payload) # quick sanity check to ensure that the keys match. @@ -840,8 +875,15 @@ async def test_basic_delivery_with_tracked_vapid( assert result["headers"]["encryption"] == clean_header assert result["data"] == base64url_encode(uuid_data) assert result["messageType"] == ClientMessageType.NOTIFICATION.value - log.debug(f"🔍 reliability {result}") - assert result["reliability_id"] is not None + assert result.get("reliability_id") is not None, "missing reliability_id" + endpoint = registered_test_client.get_host_client_endpoint() + async with httpx.AsyncClient() as httpx_client: + resp = await httpx_client.get(f"{endpoint}/__milestones__", timeout=5) + log.debug(f"🔍 Milestones: {resp.text}") + jresp = json.loads(resp.text) + assert jresp["accepted"] == 1 + for other in ["accepted_webpush", "received", "transmitted_webpush", "transmitted"]: + assert jresp[other] == 0, f"reliablity state '{other}' was not 0" async def test_basic_delivery_with_invalid_vapid( @@ -1293,8 +1335,6 @@ async def test_big_message(registered_test_client: AsyncPushTestClient) -> None: block that was 5624 bytes long. We'll skip the binary bit for a 4216 block of "text" we then b64 encode to send. """ - import base64 - bulk = "".join( random.choice(string.ascii_letters + string.digits + string.punctuation) for _ in range(0, 4216) diff --git a/tests/load/locustfiles/stored.py b/tests/load/locustfiles/stored.py index 7febf8af3..f2f12b400 100644 --- a/tests/load/locustfiles/stored.py +++ b/tests/load/locustfiles/stored.py @@ -230,7 +230,7 @@ def recv_message(self) -> None: assert self.ws data = self.ws.recv() if not isinstance(data, str): - logger.error("recv_message unexpectedly recieved bytes") + logger.error("recv_message unexpectedly received bytes") data = str(data) self.on_ws_message(self.ws, data) diff --git a/tests/pyproject.toml b/tests/pyproject.toml index 7db34a580..47d357d1a 100644 --- a/tests/pyproject.toml +++ b/tests/pyproject.toml @@ -40,7 +40,7 @@ add-select = ["D212"] # D203 as it conflicts with D211 https://github.com/PyCQA/pydocstyle/issues/141 # D205 1 blank line required between summary line and description, awkward spacing # D400 First line should end with a period, doesn't work when sentence spans 2 lines -add-ignore = ["D105","D107","D203", "D205", "D400"] +add-ignore = ["D105", "D107", "D203", "D205", "D400"] [tool.poetry] name = "tests" @@ -75,7 +75,7 @@ pytest-order = "^1.3.0" python-jose = "^3.3.0" httpx = "^0.27.2" fastapi = "^0.111.1" -uvicorn = {extras = ["standard"], version="^0.29.0"} +uvicorn = { extras = ["standard"], version = "^0.29.0" } [tool.poetry.group.load.dependencies] locust = "^2.32.1" @@ -95,9 +95,16 @@ requires = ["poetry-core>=1.9.1"] build-backend = "poetry.core.masonry.api" [tool.pytest.ini_options] +addopts = "-m 'not stub and not sentry and not reliable_report'" asyncio_mode = "auto" +faulthandler_timeout = 99999 +log_cli = 1 +# 10 DEBUG +# 20 INFO +log_cli_level = 10 # Pytest marker documentation: https://docs.pytest.org/en/7.1.x/example/markers.html markers = [ "stub: mark a test for the stub system", - "sentry: mark a test for the sentry integration" + "sentry: mark a test for the sentry integration", + "reliable_report: enable tests for reliability reporting", ]