diff --git a/Cargo.lock b/Cargo.lock index 83b1b6c951..b373eabede 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1856,6 +1856,7 @@ dependencies = [ "es-entity", "futures", "google-fcm1", + "job-executor", "jsonwebtoken", "mongodb", "prost 0.12.3", @@ -1867,6 +1868,7 @@ dependencies = [ "serde_with 3.6.1", "serde_yaml 0.9.31", "sqlx", + "sqlxmq", "thiserror", "tokio", "tonic 0.10.2", diff --git a/core/notifications/Cargo.toml b/core/notifications/Cargo.toml index 0d9690e6e2..204b23c160 100644 --- a/core/notifications/Cargo.toml +++ b/core/notifications/Cargo.toml @@ -15,6 +15,7 @@ path = "src/bin/write_sdl.rs" fail-on-warnings = [] [dependencies] +job-executor = { path = "../../lib/job-executor-rs" } tracing = { path = "../../lib/tracing-rs" } es-entity = { path = "../../lib/es-entity-rs" } @@ -34,6 +35,7 @@ thiserror = { workspace = true } chrono = { workspace = true } futures = { workspace = true } sqlx = { workspace = true } +sqlxmq = { workspace = true } mongodb = { workspace = true } rand = { workspace = true } uuid = { workspace = true } diff --git a/core/notifications/locales/en.yml b/core/notifications/locales/en.yml index bf558fd3c3..8040f1f186 100644 --- a/core/notifications/locales/en.yml +++ b/core/notifications/locales/en.yml @@ -14,3 +14,6 @@ circle_threshold_reached.inner.body: "You have welcomed %{threshold} people to B circle_threshold_reached.outer.title: "Outer Circle gains 💪" circle_threshold_reached.outer.body: "Your Outer Circle reached %{threshold} people. You are driving Bitcoin adoption!" + +documents_submitted.title: "Documents Received" +documents_submitted.body: "The documents for your verification are being processed." diff --git a/core/notifications/proto/notifications.proto b/core/notifications/proto/notifications.proto index 960df5f2c0..3ac4c58689 100644 --- a/core/notifications/proto/notifications.proto +++ b/core/notifications/proto/notifications.proto @@ -133,6 +133,10 @@ message NotificationEvent { oneof data { CircleGrew circle_grew = 1; CircleThresholdReached circle_threshold_reached = 2; + DocumentsSubmitted documents_submitted = 3; + DocumentsApproved documents_approved = 4; + DocumentsRejected documents_rejected = 5; + DocumentsReviewPending documents_review_pending = 6; } } @@ -159,3 +163,19 @@ message CircleThresholdReached { CircleTimeFrame time_frame = 3; uint32 threshold = 4; } + +message DocumentsSubmitted { + string user_id = 1; +} + +message DocumentsApproved { + string user_id = 1; +} + +message DocumentsRejected { + string user_id = 1; +} + +message DocumentsReviewPending { + string user_id = 1; +} diff --git a/core/notifications/src/app/error.rs b/core/notifications/src/app/error.rs index 2f73796453..a51e51c027 100644 --- a/core/notifications/src/app/error.rs +++ b/core/notifications/src/app/error.rs @@ -1,11 +1,18 @@ use thiserror::Error; -use crate::{executor::error::*, user_notification_settings::error::*}; +use crate::{ + executor::error::ExecutorError, job::error::JobError, + user_notification_settings::error::UserNotificationSettingsError, +}; #[derive(Error, Debug)] pub enum ApplicationError { #[error("{0}")] UserNotificationSettingsError(#[from] UserNotificationSettingsError), #[error("{0}")] - Novu(#[from] ExecutorError), + JobError(#[from] JobError), + #[error("{0}")] + ExecutorError(#[from] ExecutorError), + #[error("{0}")] + Sqlx(#[from] sqlx::Error), } diff --git a/core/notifications/src/app/mod.rs b/core/notifications/src/app/mod.rs index bf3ad72926..ab96ba1d5b 100644 --- a/core/notifications/src/app/mod.rs +++ b/core/notifications/src/app/mod.rs @@ -2,9 +2,14 @@ mod config; pub mod error; use sqlx::{Pool, Postgres}; +use sqlxmq::JobRunnerHandle; use tracing::instrument; -use crate::{executor::*, notification_event::*, primitives::*, user_notification_settings::*}; +use std::sync::Arc; + +use crate::{ + executor::*, job, notification_event::*, primitives::*, user_notification_settings::*, +}; pub use config::*; use error::*; @@ -13,19 +18,20 @@ use error::*; pub struct NotificationsApp { _config: AppConfig, settings: UserNotificationSettingsRepo, - executor: Executor, - _pool: Pool, + pool: Pool, + _runner: Arc, } impl NotificationsApp { pub async fn init(pool: Pool, config: AppConfig) -> Result { let settings = UserNotificationSettingsRepo::new(&pool); let executor = Executor::init(config.executor.clone(), settings.clone()).await?; + let runner = job::start_job_runner(&pool, executor).await?; Ok(Self { _config: config, - _pool: pool, - executor, + pool, settings, + _runner: Arc::new(runner), }) } @@ -141,18 +147,14 @@ impl NotificationsApp { Ok(user_settings) } - #[instrument(name = "app.handle_circle_grew", skip(self), err)] - pub async fn handle_circle_grew(&self, event: CircleGrew) -> Result<(), ApplicationError> { - self.executor.notify(event).await?; - Ok(()) - } - - #[instrument(name = "app.handle_threshold_reached", skip(self), err)] - pub async fn handle_threshold_reached( + #[instrument(name = "app.handle_notification_event", skip(self), err)] + pub async fn handle_notification_event( &self, - event: CircleThresholdReached, + event: T, ) -> Result<(), ApplicationError> { - self.executor.notify(event).await?; + let mut tx = self.pool.begin().await?; + job::spawn_send_push_notification(&mut tx, event.into()).await?; + tx.commit().await?; Ok(()) } } diff --git a/core/notifications/src/executor/mod.rs b/core/notifications/src/executor/mod.rs index c95c4e7428..2c19d48b44 100644 --- a/core/notifications/src/executor/mod.rs +++ b/core/notifications/src/executor/mod.rs @@ -28,7 +28,7 @@ impl Executor { }) } - pub async fn notify(&self, event: T) -> Result<(), ExecutorError> { + pub async fn notify(&self, event: &T) -> Result<(), ExecutorError> { let settings = self.settings.find_for_user_id(event.user_id()).await?; if !settings.should_send_notification( UserNotificationChannel::Push, diff --git a/core/notifications/src/grpc/server/mod.rs b/core/notifications/src/grpc/server/mod.rs index 9dbed186e4..76024d8797 100644 --- a/core/notifications/src/grpc/server/mod.rs +++ b/core/notifications/src/grpc/server/mod.rs @@ -251,7 +251,7 @@ impl NotificationsService for Notifications { .map(primitives::CircleType::from) .map_err(|e| Status::invalid_argument(e.to_string()))?; self.app - .handle_circle_grew(notification_event::CircleGrew { + .handle_notification_event(notification_event::CircleGrew { user_id: GaloyUserId::from(user_id), circle_type, this_month_circle_size, @@ -277,7 +277,7 @@ impl NotificationsService for Notifications { .map(primitives::CircleTimeFrame::from) .map_err(|e| Status::invalid_argument(e.to_string()))?; self.app - .handle_threshold_reached(notification_event::CircleThresholdReached { + .handle_notification_event(notification_event::CircleThresholdReached { user_id: GaloyUserId::from(user_id), circle_type, time_frame, @@ -285,6 +285,54 @@ impl NotificationsService for Notifications { }) .await? } + Some(proto::NotificationEvent { + data: + Some(proto::notification_event::Data::DocumentsSubmitted( + proto::DocumentsSubmitted { user_id }, + )), + }) => { + self.app + .handle_notification_event(notification_event::DocumentsSubmitted { + user_id: GaloyUserId::from(user_id), + }) + .await? + } + Some(proto::NotificationEvent { + data: + Some(proto::notification_event::Data::DocumentsApproved(proto::DocumentsApproved { + user_id, + })), + }) => { + self.app + .handle_notification_event(notification_event::DocumentsApproved { + user_id: GaloyUserId::from(user_id), + }) + .await? + } + Some(proto::NotificationEvent { + data: + Some(proto::notification_event::Data::DocumentsRejected(proto::DocumentsRejected { + user_id, + })), + }) => { + self.app + .handle_notification_event(notification_event::DocumentsRejected { + user_id: GaloyUserId::from(user_id), + }) + .await? + } + Some(proto::NotificationEvent { + data: + Some(proto::notification_event::Data::DocumentsReviewPending( + proto::DocumentsReviewPending { user_id }, + )), + }) => { + self.app + .handle_notification_event(notification_event::DocumentsReviewPending { + user_id: GaloyUserId::from(user_id), + }) + .await? + } _ => return Err(Status::invalid_argument("event is required")), } diff --git a/core/notifications/src/job/error.rs b/core/notifications/src/job/error.rs new file mode 100644 index 0000000000..8c7bd5ec3c --- /dev/null +++ b/core/notifications/src/job/error.rs @@ -0,0 +1,13 @@ +use thiserror::Error; + +use crate::executor::error::ExecutorError; + +#[derive(Error, Debug)] +pub enum JobError { + #[error("JobError - Sqlx: {0}")] + Sqlx(#[from] sqlx::Error), + #[error("JobError - ExecutorError: {0}")] + Executor(#[from] ExecutorError), +} + +impl job_executor::JobExecutionError for JobError {} diff --git a/core/notifications/src/job/mod.rs b/core/notifications/src/job/mod.rs new file mode 100644 index 0000000000..55e86bbb63 --- /dev/null +++ b/core/notifications/src/job/mod.rs @@ -0,0 +1,63 @@ +mod send_push_notification; + +pub mod error; + +use sqlxmq::{job, CurrentJob, JobRegistry, JobRunnerHandle}; +use tracing::instrument; + +use job_executor::JobExecutor; + +use crate::executor::Executor; + +use error::JobError; + +use send_push_notification::SendPushNotificationData; + +pub async fn start_job_runner( + pool: &sqlx::PgPool, + executor: Executor, +) -> Result { + let mut registry = JobRegistry::new(&[send_push_notification]); + registry.set_context(executor); + + Ok(registry.runner(pool).set_keep_alive(false).run().await?) +} + +#[job( + name = "send_push_notification", + channel_name = "send_push_notification" +)] +async fn send_push_notification( + mut current_job: CurrentJob, + executor: Executor, +) -> Result<(), JobError> { + JobExecutor::builder(&mut current_job) + .build() + .expect("couldn't build JobExecutor") + .execute(|data| async move { + let data: SendPushNotificationData = + data.expect("no SendPushNotificationData available"); + send_push_notification::execute(data, executor).await + }) + .await?; + Ok(()) +} + +#[instrument(name = "job.spawn_send_push_notification", skip_all, fields(error, error.level, error.message), err)] +pub async fn spawn_send_push_notification( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + data: impl Into, +) -> Result<(), JobError> { + let data = data.into(); + if let Err(e) = send_push_notification + .builder() + .set_json(&data) + .expect("Couldn't set json") + .spawn(&mut **tx) + .await + { + tracing::insert_error_fields(tracing::Level::WARN, &e); + return Err(e.into()); + } + Ok(()) +} diff --git a/core/notifications/src/job/send_push_notification.rs b/core/notifications/src/job/send_push_notification.rs new file mode 100644 index 0000000000..2a0d33aa45 --- /dev/null +++ b/core/notifications/src/job/send_push_notification.rs @@ -0,0 +1,32 @@ +use serde::{Deserialize, Serialize}; +use tracing::instrument; + +use std::collections::HashMap; + +use super::error::JobError; +use crate::{executor::Executor, notification_event::NotificationEventPayload}; + +#[derive(Debug, Serialize, Deserialize)] +pub(super) struct SendPushNotificationData { + payload: NotificationEventPayload, + #[serde(flatten)] + pub(super) tracing_data: HashMap, +} + +impl From for SendPushNotificationData { + fn from(payload: NotificationEventPayload) -> Self { + Self { + payload, + tracing_data: tracing::extract_tracing_data(), + } + } +} + +#[instrument(name = "job.send_push_notification", skip(executor), err)] +pub async fn execute( + data: SendPushNotificationData, + executor: Executor, +) -> Result { + executor.notify(&data.payload).await?; + Ok(data) +} diff --git a/core/notifications/src/lib.rs b/core/notifications/src/lib.rs index 400f5b4092..1cb5b71a63 100644 --- a/core/notifications/src/lib.rs +++ b/core/notifications/src/lib.rs @@ -5,6 +5,7 @@ rust_i18n::i18n!("locales", fallback = "en"); mod app; mod data_import; +mod job; mod messages; pub mod cli; diff --git a/core/notifications/src/messages/mod.rs b/core/notifications/src/messages/mod.rs index 0b34a43539..19e977a5cc 100644 --- a/core/notifications/src/messages/mod.rs +++ b/core/notifications/src/messages/mod.rs @@ -55,6 +55,32 @@ impl Messages { .to_string(); LocalizedMessage { title, body } } + + pub fn documents_submitted(locale: &str, _event: &DocumentsSubmitted) -> LocalizedMessage { + let title = t!("documents_submitted.title", locale = locale).to_string(); + let body = t!("documents_submitted.body", locale = locale).to_string(); + LocalizedMessage { title, body } + } + + pub fn documents_approved(locale: &str, _event: &DocumentsApproved) -> LocalizedMessage { + let title = t!("documents_approved.title", locale = locale).to_string(); + let body = t!("documents_approved.body", locale = locale).to_string(); + LocalizedMessage { title, body } + } + pub fn documents_rejected(locale: &str, _event: &DocumentsRejected) -> LocalizedMessage { + let title = t!("documents_rejected.title", locale = locale).to_string(); + let body = t!("documents_rejected.body", locale = locale).to_string(); + LocalizedMessage { title, body } + } + + pub fn documents_review_pending( + locale: &str, + _event: &DocumentsReviewPending, + ) -> LocalizedMessage { + let title = t!("documents_review_pending.title", locale = locale).to_string(); + let body = t!("documents_review_pending.body", locale = locale).to_string(); + LocalizedMessage { title, body } + } } #[cfg(test)] diff --git a/core/notifications/src/notification_event.rs b/core/notifications/src/notification_event.rs index f0870c288e..100056ac97 100644 --- a/core/notifications/src/notification_event.rs +++ b/core/notifications/src/notification_event.rs @@ -1,3 +1,5 @@ +use serde::{Deserialize, Serialize}; + use crate::{messages::*, primitives::*}; pub enum DeepLink { @@ -5,7 +7,63 @@ pub enum DeepLink { Circles, } -#[derive(Debug)] +pub trait NotificationEvent: std::fmt::Debug + Into { + fn user_id(&self) -> &GaloyUserId; + fn deep_link(&self) -> DeepLink; + fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage; +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum NotificationEventPayload { + CircleGrew(CircleGrew), + CircleThresholdReached(CircleThresholdReached), + DocumentsSubmitted(DocumentsSubmitted), + DocumentsApproved(DocumentsApproved), + DocumentsRejected(DocumentsRejected), + DocumentsReviewPending(DocumentsReviewPending), +} + +impl NotificationEvent for NotificationEventPayload { + fn user_id(&self) -> &GaloyUserId { + match self { + NotificationEventPayload::CircleGrew(event) => event.user_id(), + NotificationEventPayload::CircleThresholdReached(event) => event.user_id(), + NotificationEventPayload::DocumentsSubmitted(event) => event.user_id(), + NotificationEventPayload::DocumentsApproved(event) => event.user_id(), + NotificationEventPayload::DocumentsRejected(event) => event.user_id(), + NotificationEventPayload::DocumentsReviewPending(event) => event.user_id(), + } + } + + fn deep_link(&self) -> DeepLink { + match self { + NotificationEventPayload::CircleGrew(event) => event.deep_link(), + NotificationEventPayload::CircleThresholdReached(event) => event.deep_link(), + NotificationEventPayload::DocumentsSubmitted(event) => event.deep_link(), + NotificationEventPayload::DocumentsApproved(event) => event.deep_link(), + NotificationEventPayload::DocumentsRejected(event) => event.deep_link(), + NotificationEventPayload::DocumentsReviewPending(event) => event.deep_link(), + } + } + + fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { + match self { + NotificationEventPayload::CircleGrew(event) => event.to_localized_msg(locale), + NotificationEventPayload::CircleThresholdReached(event) => { + event.to_localized_msg(locale) + } + NotificationEventPayload::DocumentsSubmitted(event) => event.to_localized_msg(locale), + NotificationEventPayload::DocumentsApproved(event) => event.to_localized_msg(locale), + NotificationEventPayload::DocumentsRejected(event) => event.to_localized_msg(locale), + NotificationEventPayload::DocumentsReviewPending(event) => { + event.to_localized_msg(locale) + } + } + } +} + +#[derive(Debug, Serialize, Deserialize)] pub struct CircleGrew { pub user_id: GaloyUserId, pub circle_type: CircleType, @@ -13,13 +71,7 @@ pub struct CircleGrew { pub all_time_circle_size: u32, } -pub trait NotificationEventNew { - fn user_id(&self) -> &GaloyUserId; - fn deep_link(&self) -> DeepLink; - fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage; -} - -impl NotificationEventNew for CircleGrew { +impl NotificationEvent for CircleGrew { fn user_id(&self) -> &GaloyUserId { &self.user_id } @@ -33,7 +85,13 @@ impl NotificationEventNew for CircleGrew { } } -#[derive(Debug)] +impl From for NotificationEventPayload { + fn from(event: CircleGrew) -> Self { + NotificationEventPayload::CircleGrew(event) + } +} + +#[derive(Debug, Serialize, Deserialize)] pub struct CircleThresholdReached { pub user_id: GaloyUserId, pub circle_type: CircleType, @@ -41,7 +99,7 @@ pub struct CircleThresholdReached { pub threshold: u32, } -impl NotificationEventNew for CircleThresholdReached { +impl NotificationEvent for CircleThresholdReached { fn user_id(&self) -> &GaloyUserId { &self.user_id } @@ -54,3 +112,109 @@ impl NotificationEventNew for CircleThresholdReached { Messages::circle_threshold_reached(locale.as_ref(), self) } } + +impl From for NotificationEventPayload { + fn from(event: CircleThresholdReached) -> Self { + NotificationEventPayload::CircleThresholdReached(event) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DocumentsSubmitted { + pub user_id: GaloyUserId, +} + +impl NotificationEvent for DocumentsSubmitted { + fn user_id(&self) -> &GaloyUserId { + &self.user_id + } + + fn deep_link(&self) -> DeepLink { + DeepLink::None + } + + fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { + Messages::documents_submitted(locale.as_ref(), self) + } +} + +impl From for NotificationEventPayload { + fn from(event: DocumentsSubmitted) -> Self { + NotificationEventPayload::DocumentsSubmitted(event) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DocumentsApproved { + pub user_id: GaloyUserId, +} + +impl NotificationEvent for DocumentsApproved { + fn user_id(&self) -> &GaloyUserId { + &self.user_id + } + + fn deep_link(&self) -> DeepLink { + DeepLink::None + } + + fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { + Messages::documents_approved(locale.as_ref(), self) + } +} + +impl From for NotificationEventPayload { + fn from(event: DocumentsApproved) -> Self { + NotificationEventPayload::DocumentsApproved(event) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DocumentsRejected { + pub user_id: GaloyUserId, +} + +impl NotificationEvent for DocumentsRejected { + fn user_id(&self) -> &GaloyUserId { + &self.user_id + } + + fn deep_link(&self) -> DeepLink { + DeepLink::None + } + + fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { + Messages::documents_rejected(locale.as_ref(), self) + } +} + +impl From for NotificationEventPayload { + fn from(event: DocumentsRejected) -> Self { + NotificationEventPayload::DocumentsRejected(event) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DocumentsReviewPending { + pub user_id: GaloyUserId, +} + +impl NotificationEvent for DocumentsReviewPending { + fn user_id(&self) -> &GaloyUserId { + &self.user_id + } + + fn deep_link(&self) -> DeepLink { + DeepLink::None + } + + fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { + Messages::documents_review_pending(locale.as_ref(), self) + } +} + +impl From for NotificationEventPayload { + fn from(event: DocumentsReviewPending) -> Self { + NotificationEventPayload::DocumentsReviewPending(event) + } +} diff --git a/core/notifications/src/primitives.rs b/core/notifications/src/primitives.rs index 4ad4da9348..db2cdda597 100644 --- a/core/notifications/src/primitives.rs +++ b/core/notifications/src/primitives.rs @@ -95,7 +95,7 @@ pub enum UserNotificationCategory { AdminNotification, } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum CircleType { Inner, Outer, @@ -110,7 +110,7 @@ impl std::fmt::Display for CircleType { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum CircleTimeFrame { Month, AllTime, diff --git a/lib/tracing-rs/src/lib.rs b/lib/tracing-rs/src/lib.rs index 9850b1111d..0b94e06394 100644 --- a/lib/tracing-rs/src/lib.rs +++ b/lib/tracing-rs/src/lib.rs @@ -67,6 +67,12 @@ fn telemetry_resource(config: &TracingConfig) -> Resource { ])) } +pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display) { + Span::current().record("error", &tracing::field::display("true")); + Span::current().record("error.level", &tracing::field::display(level)); + Span::current().record("error.message", &tracing::field::display(error)); +} + pub fn extract_tracing_data() -> HashMap { let mut tracing_data = HashMap::new(); let propagator = TraceContextPropagator::new();