diff --git a/core/notifications/src/executor/error.rs b/core/notifications/src/executor/error.rs index bc74703460..8c0471b2c5 100644 --- a/core/notifications/src/executor/error.rs +++ b/core/notifications/src/executor/error.rs @@ -5,7 +5,7 @@ use crate::user_notification_settings::error::*; #[derive(Error, Debug)] pub enum ExecutorError { - #[error("ExecutorError - Novu: {0}")] + #[error("ExecutorError - FcmError: {0}")] Fcm(#[from] FcmError), #[error("ExecutorError - UserNotificationSettingsError: {0}")] UserNotificationSettingsError(#[from] UserNotificationSettingsError), diff --git a/core/notifications/src/executor/fcm/error.rs b/core/notifications/src/executor/fcm/error.rs index c2eaefaf36..39bf4af6b8 100644 --- a/core/notifications/src/executor/fcm/error.rs +++ b/core/notifications/src/executor/fcm/error.rs @@ -5,5 +5,5 @@ pub enum FcmError { #[error("FcmError - I/O Error: {0}")] IOError(#[from] std::io::Error), #[error("FcmError - GoogleFcm1Error: {0}")] - GoogleFcm1Error(#[from] google_fcm1::Error), // should rename to a better error + GoogleFcm1Error(#[from] google_fcm1::Error), } diff --git a/core/notifications/src/executor/fcm/mod.rs b/core/notifications/src/executor/fcm/mod.rs index 6c2d8ea055..66a3fe64e6 100644 --- a/core/notifications/src/executor/fcm/mod.rs +++ b/core/notifications/src/executor/fcm/mod.rs @@ -9,7 +9,7 @@ use google_fcm1::{ FirebaseCloudMessaging, }; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use crate::{messages::LocalizedMessage, notification_event::*, primitives::PushDeviceToken}; @@ -62,38 +62,36 @@ impl FcmClient { pub async fn send( &self, - device_tokens: HashSet, - msg: LocalizedMessage, + device_token: &PushDeviceToken, + msg: &LocalizedMessage, deep_link: DeepLink, ) -> Result<(), FcmError> { let mut data = HashMap::new(); deep_link.add_to_data(&mut data); - for device_token in device_tokens { - let notification = Notification { - title: Some(msg.title.clone()), - body: Some(msg.body.clone()), - ..Default::default() - }; - let message = Message { - notification: Some(notification), - token: Some(device_token.into_inner()), - data: Some(data.clone()), - ..Default::default() - }; + let notification = Notification { + title: Some(msg.title.clone()), + body: Some(msg.body.clone()), + ..Default::default() + }; + let message = Message { + notification: Some(notification), + token: Some(device_token.clone().into_inner()), + data: Some(data.clone()), + ..Default::default() + }; - let parent = format!("projects/{}", self.fcm_project_id); - let request = SendMessageRequest { - message: Some(message), - ..Default::default() - }; - let _response = self - .client - .projects() - .messages_send(request, &parent) - .doit() - .await?; - } + let parent = format!("projects/{}", self.fcm_project_id); + let request = SendMessageRequest { + message: Some(message), + ..Default::default() + }; + let _response = self + .client + .projects() + .messages_send(request, &parent) + .doit() + .await?; Ok(()) } } diff --git a/core/notifications/src/executor/mod.rs b/core/notifications/src/executor/mod.rs index 2c19d48b44..fa37beb76f 100644 --- a/core/notifications/src/executor/mod.rs +++ b/core/notifications/src/executor/mod.rs @@ -2,7 +2,8 @@ mod config; pub mod error; mod fcm; -use fcm::FcmClient; +use fcm::{error::FcmError, FcmClient}; +use tracing::{error, instrument}; use crate::{notification_event::*, primitives::*, user_notification_settings::*}; @@ -28,21 +29,53 @@ impl Executor { }) } + #[instrument( + name = "executor.notify", + skip(self), + fields(n_errors, n_removed_tokens), + err + )] pub async fn notify(&self, event: &T) -> Result<(), ExecutorError> { - let settings = self.settings.find_for_user_id(event.user_id()).await?; - if !settings.should_send_notification( - UserNotificationChannel::Push, - UserNotificationCategory::Circles, - ) { + let mut settings = self.settings.find_for_user_id(event.user_id()).await?; + if !settings.should_send_notification(UserNotificationChannel::Push, event.category()) { return Ok(()); } let msg = event.to_localized_msg(settings.locale().unwrap_or_default()); - self.fcm - .send(settings.push_device_tokens(), msg, event.deep_link()) - .await?; + let mut should_persist = false; + let mut last_err = None; + let mut n_errs = 0; + let mut n_removed_tokens = 0; + for device_token in settings.push_device_tokens() { + match self.fcm.send(&device_token, &msg, event.deep_link()).await { + Err(FcmError::GoogleFcm1Error(google_fcm1::Error::BadRequest(e))) => { + n_errs += 1; + n_removed_tokens += 1; + error!("BadRequest sending to device: {}", e); + should_persist = true; + settings.remove_push_device_token(device_token) + } + Err(e) => { + n_errs += 1; + error!("Unexpected error sending to device: {}", e); + last_err = Some(e.into()) + } + _ => continue, + } + } + + if should_persist { + let _ = self.settings.persist(&mut settings).await; + } - Ok(()) + tracing::Span::current().record("n_errors", n_errs); + tracing::Span::current().record("n_removed_tokens", n_removed_tokens); + + if let Some(e) = last_err { + Err(e) + } else { + Ok(()) + } } } diff --git a/core/notifications/src/notification_event.rs b/core/notifications/src/notification_event.rs index e30447310a..cf55cf5073 100644 --- a/core/notifications/src/notification_event.rs +++ b/core/notifications/src/notification_event.rs @@ -8,6 +8,7 @@ pub enum DeepLink { } pub trait NotificationEvent: std::fmt::Debug + Into { + fn category(&self) -> UserNotificationCategory; fn user_id(&self) -> &GaloyUserId; fn deep_link(&self) -> DeepLink; fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage; @@ -24,6 +25,16 @@ pub enum NotificationEventPayload { } impl NotificationEvent for NotificationEventPayload { + fn category(&self) -> UserNotificationCategory { + match self { + NotificationEventPayload::CircleGrew(e) => e.category(), + NotificationEventPayload::CircleThresholdReached(e) => e.category(), + NotificationEventPayload::IdentityVerificationApproved(e) => e.category(), + NotificationEventPayload::IdentityVerificationDeclined(e) => e.category(), + NotificationEventPayload::IdentityVerificationReviewPending(e) => e.category(), + } + } + fn user_id(&self) -> &GaloyUserId { match self { NotificationEventPayload::CircleGrew(event) => event.user_id(), @@ -72,6 +83,10 @@ pub struct CircleGrew { } impl NotificationEvent for CircleGrew { + fn category(&self) -> UserNotificationCategory { + UserNotificationCategory::Circles + } + fn user_id(&self) -> &GaloyUserId { &self.user_id } @@ -100,6 +115,10 @@ pub struct CircleThresholdReached { } impl NotificationEvent for CircleThresholdReached { + fn category(&self) -> UserNotificationCategory { + UserNotificationCategory::Circles + } + fn user_id(&self) -> &GaloyUserId { &self.user_id } @@ -125,6 +144,10 @@ pub struct IdentityVerificationApproved { } impl NotificationEvent for IdentityVerificationApproved { + fn category(&self) -> UserNotificationCategory { + UserNotificationCategory::AdminNotification + } + fn user_id(&self) -> &GaloyUserId { &self.user_id } @@ -157,6 +180,10 @@ pub struct IdentityVerificationDeclined { } impl NotificationEvent for IdentityVerificationDeclined { + fn category(&self) -> UserNotificationCategory { + UserNotificationCategory::AdminNotification + } + fn user_id(&self) -> &GaloyUserId { &self.user_id } @@ -182,6 +209,10 @@ pub struct IdentityVerificationReviewPending { } impl NotificationEvent for IdentityVerificationReviewPending { + fn category(&self) -> UserNotificationCategory { + UserNotificationCategory::AdminNotification + } + fn user_id(&self) -> &GaloyUserId { &self.user_id }