Skip to content

Commit

Permalink
chore(notifications): executor push notification in job
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Feb 12, 2024
1 parent 87b3e2c commit 0b2dff5
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 25 deletions.
5 changes: 1 addition & 4 deletions core/notifications/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use error::*;
pub struct NotificationsApp {
_config: AppConfig,
settings: UserNotificationSettingsRepo,
executor: Executor,
pool: Pool<Postgres>,
_runner: Arc<JobRunnerHandle>,
}
Expand All @@ -27,11 +26,10 @@ impl NotificationsApp {
pub async fn init(pool: Pool<Postgres>, config: AppConfig) -> Result<Self, ApplicationError> {
let settings = UserNotificationSettingsRepo::new(&pool);
let executor = Executor::init(config.executor.clone(), settings.clone()).await?;
let runner = job::start_job_runner(&pool).await?;
let runner = job::start_job_runner(&pool, executor).await?;
Ok(Self {
_config: config,
pool,
executor,
settings,
_runner: Arc::new(runner),
})
Expand Down Expand Up @@ -157,7 +155,6 @@ impl NotificationsApp {
let mut tx = self.pool.begin().await?;
job::spawn_send_push_notification(&mut tx, event.into()).await?;
tx.commit().await?;
// self.executor.notify(event).await?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion core/notifications/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Executor {
})
}

pub async fn notify<T: NotificationEvent>(&self, event: T) -> Result<(), ExecutorError> {
pub async fn notify<T: NotificationEvent>(&self, event: &T) -> Result<(), ExecutorError> {
let settings = self.settings.find_for_user_id(event.user_id()).await?;
if !settings.should_send_notification(
UserNotificationChannel::Push,
Expand Down
4 changes: 4 additions & 0 deletions core/notifications/src/job/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use thiserror::Error;

use crate::executor::error::ExecutorError;

#[derive(Error, Debug)]
pub enum JobError {
#[error("JobError - Sqlx: {0}")]
Sqlx(#[from] sqlx::Error),
#[error("JobError - ExecutorError: {0}")]
Executor(#[from] ExecutorError),
}

impl job_executor::JobExecutionError for JobError {}
33 changes: 16 additions & 17 deletions core/notifications/src/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,23 @@ mod send_push_notification;

pub mod error;

use sqlxmq::{job, CurrentJob, JobBuilder, JobRegistry, JobRunnerHandle};
use sqlxmq::{job, CurrentJob, JobRegistry, JobRunnerHandle};
use tracing::instrument;

use job_executor::JobExecutor;

use crate::executor::Executor;

use error::JobError;

use send_push_notification::SendPushNotificationData;

pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result<JobRunnerHandle, JobError> {
let mut registry = JobRegistry::new(&[
// sync_all_wallets,
// sync_wallet,
// process_all_payout_queues,
// schedule_process_payout_queue,
// process_payout_queue,
// batch_wallet_accounting,
// batch_signing,
// batch_broadcasting,
// respawn_all_outbox_handlers,
// populate_outbox,
]);
pub async fn start_job_runner(
pool: &sqlx::PgPool,
executor: Executor,
) -> Result<JobRunnerHandle, JobError> {
let mut registry = JobRegistry::new(&[send_push_notification]);
registry.set_context(executor);

Ok(registry.runner(pool).set_keep_alive(false).run().await?)
}
Expand All @@ -32,13 +27,17 @@ pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result<JobRunnerHandle, Jo
name = "send_push_notification",
channel_name = "send_push_notification"
)]
async fn send_push_notification(mut current_job: CurrentJob) -> Result<(), JobError> {
async fn send_push_notification(
mut current_job: CurrentJob,
executor: Executor,
) -> Result<(), JobError> {
JobExecutor::builder(&mut current_job)
.build()
.expect("couldn't build JobExecutor")
.execute(|data| async move {
let data: SendPushNotificationData = data.expect("no BatchBroadcastingData available");
send_push_notification::execute(data).await
let data: SendPushNotificationData =
data.expect("no SendPushNotificationData available");
send_push_notification::execute(data, executor).await
})
.await?;
Ok(())
Expand Down
10 changes: 7 additions & 3 deletions core/notifications/src/job/send_push_notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tracing::instrument;
use std::collections::HashMap;

use super::error::JobError;
use crate::notification_event::NotificationEventPayload;
use crate::{executor::Executor, notification_event::NotificationEventPayload};

#[derive(Debug, Serialize, Deserialize)]
pub(super) struct SendPushNotificationData {
Expand All @@ -22,7 +22,11 @@ impl From<NotificationEventPayload> for SendPushNotificationData {
}
}

#[instrument(name = "job.send_push_notification", err)]
pub async fn execute(data: SendPushNotificationData) -> Result<SendPushNotificationData, JobError> {
#[instrument(name = "job.send_push_notification", skip(executor), err)]
pub async fn execute(
data: SendPushNotificationData,
executor: Executor,
) -> Result<SendPushNotificationData, JobError> {
executor.notify(&data.payload).await?;
Ok(data)
}
39 changes: 39 additions & 0 deletions core/notifications/src/notification_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,45 @@ pub enum NotificationEventPayload {
DocumentsReviewPending(DocumentsReviewPending),
}

impl NotificationEvent for NotificationEventPayload {
fn user_id(&self) -> &GaloyUserId {
match self {
NotificationEventPayload::CircleGrew(event) => event.user_id(),
NotificationEventPayload::CircleThresholdReached(event) => event.user_id(),
NotificationEventPayload::DocumentsSubmitted(event) => event.user_id(),
NotificationEventPayload::DocumentsApproved(event) => event.user_id(),
NotificationEventPayload::DocumentsRejected(event) => event.user_id(),
NotificationEventPayload::DocumentsReviewPending(event) => event.user_id(),
}
}

fn deep_link(&self) -> DeepLink {
match self {
NotificationEventPayload::CircleGrew(event) => event.deep_link(),
NotificationEventPayload::CircleThresholdReached(event) => event.deep_link(),
NotificationEventPayload::DocumentsSubmitted(event) => event.deep_link(),
NotificationEventPayload::DocumentsApproved(event) => event.deep_link(),
NotificationEventPayload::DocumentsRejected(event) => event.deep_link(),
NotificationEventPayload::DocumentsReviewPending(event) => event.deep_link(),
}
}

fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage {
match self {
NotificationEventPayload::CircleGrew(event) => event.to_localized_msg(locale),
NotificationEventPayload::CircleThresholdReached(event) => {
event.to_localized_msg(locale)
}
NotificationEventPayload::DocumentsSubmitted(event) => event.to_localized_msg(locale),
NotificationEventPayload::DocumentsApproved(event) => event.to_localized_msg(locale),
NotificationEventPayload::DocumentsRejected(event) => event.to_localized_msg(locale),
NotificationEventPayload::DocumentsReviewPending(event) => {
event.to_localized_msg(locale)
}
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CircleGrew {
pub user_id: GaloyUserId,
Expand Down

0 comments on commit 0b2dff5

Please sign in to comment.