From 0b2dff5de9c1a9a10346293491f83937a34d9562 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Mon, 12 Feb 2024 09:45:29 +0100 Subject: [PATCH] chore(notifications): executor push notification in job --- core/notifications/src/app/mod.rs | 5 +-- core/notifications/src/executor/mod.rs | 2 +- core/notifications/src/job/error.rs | 4 ++ core/notifications/src/job/mod.rs | 33 ++++++++-------- .../src/job/send_push_notification.rs | 10 +++-- core/notifications/src/notification_event.rs | 39 +++++++++++++++++++ 6 files changed, 68 insertions(+), 25 deletions(-) diff --git a/core/notifications/src/app/mod.rs b/core/notifications/src/app/mod.rs index 0c17d24c29..ab96ba1d5b 100644 --- a/core/notifications/src/app/mod.rs +++ b/core/notifications/src/app/mod.rs @@ -18,7 +18,6 @@ use error::*; pub struct NotificationsApp { _config: AppConfig, settings: UserNotificationSettingsRepo, - executor: Executor, pool: Pool, _runner: Arc, } @@ -27,11 +26,10 @@ impl NotificationsApp { pub async fn init(pool: Pool, config: AppConfig) -> Result { let settings = UserNotificationSettingsRepo::new(&pool); let executor = Executor::init(config.executor.clone(), settings.clone()).await?; - let runner = job::start_job_runner(&pool).await?; + let runner = job::start_job_runner(&pool, executor).await?; Ok(Self { _config: config, pool, - executor, settings, _runner: Arc::new(runner), }) @@ -157,7 +155,6 @@ impl NotificationsApp { let mut tx = self.pool.begin().await?; job::spawn_send_push_notification(&mut tx, event.into()).await?; tx.commit().await?; - // self.executor.notify(event).await?; Ok(()) } } diff --git a/core/notifications/src/executor/mod.rs b/core/notifications/src/executor/mod.rs index 3f6339adb6..2c19d48b44 100644 --- a/core/notifications/src/executor/mod.rs +++ b/core/notifications/src/executor/mod.rs @@ -28,7 +28,7 @@ impl Executor { }) } - pub async fn notify(&self, event: T) -> Result<(), ExecutorError> { + pub async fn notify(&self, event: &T) -> Result<(), ExecutorError> { let settings = self.settings.find_for_user_id(event.user_id()).await?; if !settings.should_send_notification( UserNotificationChannel::Push, diff --git a/core/notifications/src/job/error.rs b/core/notifications/src/job/error.rs index 331eeb173e..8c7bd5ec3c 100644 --- a/core/notifications/src/job/error.rs +++ b/core/notifications/src/job/error.rs @@ -1,9 +1,13 @@ use thiserror::Error; +use crate::executor::error::ExecutorError; + #[derive(Error, Debug)] pub enum JobError { #[error("JobError - Sqlx: {0}")] Sqlx(#[from] sqlx::Error), + #[error("JobError - ExecutorError: {0}")] + Executor(#[from] ExecutorError), } impl job_executor::JobExecutionError for JobError {} diff --git a/core/notifications/src/job/mod.rs b/core/notifications/src/job/mod.rs index ce76f742b0..55e86bbb63 100644 --- a/core/notifications/src/job/mod.rs +++ b/core/notifications/src/job/mod.rs @@ -2,28 +2,23 @@ mod send_push_notification; pub mod error; -use sqlxmq::{job, CurrentJob, JobBuilder, JobRegistry, JobRunnerHandle}; +use sqlxmq::{job, CurrentJob, JobRegistry, JobRunnerHandle}; use tracing::instrument; use job_executor::JobExecutor; +use crate::executor::Executor; + use error::JobError; use send_push_notification::SendPushNotificationData; -pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result { - let mut registry = JobRegistry::new(&[ - // sync_all_wallets, - // sync_wallet, - // process_all_payout_queues, - // schedule_process_payout_queue, - // process_payout_queue, - // batch_wallet_accounting, - // batch_signing, - // batch_broadcasting, - // respawn_all_outbox_handlers, - // populate_outbox, - ]); +pub async fn start_job_runner( + pool: &sqlx::PgPool, + executor: Executor, +) -> Result { + let mut registry = JobRegistry::new(&[send_push_notification]); + registry.set_context(executor); Ok(registry.runner(pool).set_keep_alive(false).run().await?) } @@ -32,13 +27,17 @@ pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result Result<(), JobError> { +async fn send_push_notification( + mut current_job: CurrentJob, + executor: Executor, +) -> Result<(), JobError> { JobExecutor::builder(&mut current_job) .build() .expect("couldn't build JobExecutor") .execute(|data| async move { - let data: SendPushNotificationData = data.expect("no BatchBroadcastingData available"); - send_push_notification::execute(data).await + let data: SendPushNotificationData = + data.expect("no SendPushNotificationData available"); + send_push_notification::execute(data, executor).await }) .await?; Ok(()) diff --git a/core/notifications/src/job/send_push_notification.rs b/core/notifications/src/job/send_push_notification.rs index bca718b443..2a0d33aa45 100644 --- a/core/notifications/src/job/send_push_notification.rs +++ b/core/notifications/src/job/send_push_notification.rs @@ -4,7 +4,7 @@ use tracing::instrument; use std::collections::HashMap; use super::error::JobError; -use crate::notification_event::NotificationEventPayload; +use crate::{executor::Executor, notification_event::NotificationEventPayload}; #[derive(Debug, Serialize, Deserialize)] pub(super) struct SendPushNotificationData { @@ -22,7 +22,11 @@ impl From for SendPushNotificationData { } } -#[instrument(name = "job.send_push_notification", err)] -pub async fn execute(data: SendPushNotificationData) -> Result { +#[instrument(name = "job.send_push_notification", skip(executor), err)] +pub async fn execute( + data: SendPushNotificationData, + executor: Executor, +) -> Result { + executor.notify(&data.payload).await?; Ok(data) } diff --git a/core/notifications/src/notification_event.rs b/core/notifications/src/notification_event.rs index f883ab2333..7f5716d226 100644 --- a/core/notifications/src/notification_event.rs +++ b/core/notifications/src/notification_event.rs @@ -24,6 +24,45 @@ pub enum NotificationEventPayload { DocumentsReviewPending(DocumentsReviewPending), } +impl NotificationEvent for NotificationEventPayload { + fn user_id(&self) -> &GaloyUserId { + match self { + NotificationEventPayload::CircleGrew(event) => event.user_id(), + NotificationEventPayload::CircleThresholdReached(event) => event.user_id(), + NotificationEventPayload::DocumentsSubmitted(event) => event.user_id(), + NotificationEventPayload::DocumentsApproved(event) => event.user_id(), + NotificationEventPayload::DocumentsRejected(event) => event.user_id(), + NotificationEventPayload::DocumentsReviewPending(event) => event.user_id(), + } + } + + fn deep_link(&self) -> DeepLink { + match self { + NotificationEventPayload::CircleGrew(event) => event.deep_link(), + NotificationEventPayload::CircleThresholdReached(event) => event.deep_link(), + NotificationEventPayload::DocumentsSubmitted(event) => event.deep_link(), + NotificationEventPayload::DocumentsApproved(event) => event.deep_link(), + NotificationEventPayload::DocumentsRejected(event) => event.deep_link(), + NotificationEventPayload::DocumentsReviewPending(event) => event.deep_link(), + } + } + + fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { + match self { + NotificationEventPayload::CircleGrew(event) => event.to_localized_msg(locale), + NotificationEventPayload::CircleThresholdReached(event) => { + event.to_localized_msg(locale) + } + NotificationEventPayload::DocumentsSubmitted(event) => event.to_localized_msg(locale), + NotificationEventPayload::DocumentsApproved(event) => event.to_localized_msg(locale), + NotificationEventPayload::DocumentsRejected(event) => event.to_localized_msg(locale), + NotificationEventPayload::DocumentsReviewPending(event) => { + event.to_localized_msg(locale) + } + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct CircleGrew { pub user_id: GaloyUserId,