Skip to content

Commit

Permalink
chore(notifications): more job::send_push_notification boilerplate
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Feb 12, 2024
1 parent 177c1df commit 87b3e2c
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 18 deletions.
2 changes: 2 additions & 0 deletions core/notifications/src/app/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ pub enum ApplicationError {
JobError(#[from] JobError),
#[error("{0}")]
ExecutorError(#[from] ExecutorError),
#[error("{0}")]
Sqlx(#[from] sqlx::Error),
}
9 changes: 6 additions & 3 deletions core/notifications/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct NotificationsApp {
_config: AppConfig,
settings: UserNotificationSettingsRepo,
executor: Executor,
_pool: Pool<Postgres>,
pool: Pool<Postgres>,
_runner: Arc<JobRunnerHandle>,
}

Expand All @@ -30,7 +30,7 @@ impl NotificationsApp {
let runner = job::start_job_runner(&pool).await?;
Ok(Self {
_config: config,
_pool: pool,
pool,
executor,
settings,
_runner: Arc::new(runner),
Expand Down Expand Up @@ -154,7 +154,10 @@ impl NotificationsApp {
&self,
event: T,
) -> Result<(), ApplicationError> {
self.executor.notify(event).await?;
let mut tx = self.pool.begin().await?;
job::spawn_send_push_notification(&mut tx, event.into()).await?;
tx.commit().await?;
// self.executor.notify(event).await?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions core/notifications/src/job/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ pub enum JobError {
#[error("JobError - Sqlx: {0}")]
Sqlx(#[from] sqlx::Error),
}

impl job_executor::JobExecutionError for JobError {}
42 changes: 42 additions & 0 deletions core/notifications/src/job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
mod send_push_notification;

pub mod error;

use sqlxmq::{job, CurrentJob, JobBuilder, JobRegistry, JobRunnerHandle};
use tracing::instrument;

use job_executor::JobExecutor;

use error::JobError;

use send_push_notification::SendPushNotificationData;

pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result<JobRunnerHandle, JobError> {
let mut registry = JobRegistry::new(&[
// sync_all_wallets,
Expand All @@ -20,3 +27,38 @@ pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result<JobRunnerHandle, Jo

Ok(registry.runner(pool).set_keep_alive(false).run().await?)
}

#[job(
name = "send_push_notification",
channel_name = "send_push_notification"
)]
async fn send_push_notification(mut current_job: CurrentJob) -> Result<(), JobError> {
JobExecutor::builder(&mut current_job)
.build()
.expect("couldn't build JobExecutor")
.execute(|data| async move {
let data: SendPushNotificationData = data.expect("no BatchBroadcastingData available");
send_push_notification::execute(data).await
})
.await?;
Ok(())
}

#[instrument(name = "job.spawn_send_push_notification", skip_all, fields(error, error.level, error.message), err)]
pub async fn spawn_send_push_notification(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
data: impl Into<SendPushNotificationData>,
) -> Result<(), JobError> {
let data = data.into();
if let Err(e) = send_push_notification
.builder()
.set_json(&data)
.expect("Couldn't set json")
.spawn(&mut **tx)
.await
{
tracing::insert_error_fields(tracing::Level::WARN, &e);
return Err(e.into());
}
Ok(())
}
28 changes: 28 additions & 0 deletions core/notifications/src/job/send_push_notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use serde::{Deserialize, Serialize};
use tracing::instrument;

use std::collections::HashMap;

use super::error::JobError;
use crate::notification_event::NotificationEventPayload;

#[derive(Debug, Serialize, Deserialize)]
pub(super) struct SendPushNotificationData {
payload: NotificationEventPayload,
#[serde(flatten)]
pub(super) tracing_data: HashMap<String, String>,
}

impl From<NotificationEventPayload> for SendPushNotificationData {
fn from(payload: NotificationEventPayload) -> Self {
Self {
payload,
tracing_data: tracing::extract_tracing_data(),
}
}
}

#[instrument(name = "job.send_push_notification", err)]
pub async fn execute(data: SendPushNotificationData) -> Result<SendPushNotificationData, JobError> {
Ok(data)
}
69 changes: 56 additions & 13 deletions core/notifications/src/notification_event.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
use serde::{Deserialize, Serialize};

use crate::{messages::*, primitives::*};

pub enum DeepLink {
None,
Circles,
}

#[derive(Debug)]
pub trait NotificationEvent: std::fmt::Debug + Into<NotificationEventPayload> {
fn user_id(&self) -> &GaloyUserId;
fn deep_link(&self) -> DeepLink;
fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage;
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum NotificationEventPayload {
CircleGrew(CircleGrew),
CircleThresholdReached(CircleThresholdReached),
DocumentsSubmitted(DocumentsSubmitted),
DocumentsApproved(DocumentsApproved),
DocumentsRejected(DocumentsRejected),
DocumentsReviewPending(DocumentsReviewPending),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CircleGrew {
pub user_id: GaloyUserId,
pub circle_type: CircleType,
pub this_month_circle_size: u32,
pub all_time_circle_size: u32,
}

pub trait NotificationEvent: std::fmt::Debug {
fn user_id(&self) -> &GaloyUserId;
fn deep_link(&self) -> DeepLink;
fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage;
}

impl NotificationEvent for CircleGrew {
fn user_id(&self) -> &GaloyUserId {
&self.user_id
Expand All @@ -32,8 +44,13 @@ impl NotificationEvent for CircleGrew {
Messages::circle_grew(locale.as_ref(), self)
}
}
impl From<CircleGrew> for NotificationEventPayload {
fn from(event: CircleGrew) -> Self {
NotificationEventPayload::CircleGrew(event)
}
}

#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct CircleThresholdReached {
pub user_id: GaloyUserId,
pub circle_type: CircleType,
Expand All @@ -54,8 +71,13 @@ impl NotificationEvent for CircleThresholdReached {
Messages::circle_threshold_reached(locale.as_ref(), self)
}
}
impl From<CircleThresholdReached> for NotificationEventPayload {
fn from(event: CircleThresholdReached) -> Self {
NotificationEventPayload::CircleThresholdReached(event)
}
}

#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct DocumentsSubmitted {
pub user_id: GaloyUserId,
}
Expand All @@ -73,8 +95,13 @@ impl NotificationEvent for DocumentsSubmitted {
Messages::documents_submitted(locale.as_ref(), self)
}
}
impl From<DocumentsSubmitted> for NotificationEventPayload {
fn from(event: DocumentsSubmitted) -> Self {
NotificationEventPayload::DocumentsSubmitted(event)
}
}

#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct DocumentsApproved {
pub user_id: GaloyUserId,
}
Expand All @@ -92,8 +119,13 @@ impl NotificationEvent for DocumentsApproved {
Messages::documents_approved(locale.as_ref(), self)
}
}
impl From<DocumentsApproved> for NotificationEventPayload {
fn from(event: DocumentsApproved) -> Self {
NotificationEventPayload::DocumentsApproved(event)
}
}

#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct DocumentsRejected {
pub user_id: GaloyUserId,
}
Expand All @@ -111,8 +143,13 @@ impl NotificationEvent for DocumentsRejected {
Messages::documents_rejected(locale.as_ref(), self)
}
}
impl From<DocumentsRejected> for NotificationEventPayload {
fn from(event: DocumentsRejected) -> Self {
NotificationEventPayload::DocumentsRejected(event)
}
}

#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct DocumentsReviewPending {
pub user_id: GaloyUserId,
}
Expand All @@ -130,3 +167,9 @@ impl NotificationEvent for DocumentsReviewPending {
Messages::documents_review_pending(locale.as_ref(), self)
}
}

impl From<DocumentsReviewPending> for NotificationEventPayload {
fn from(event: DocumentsReviewPending) -> Self {
NotificationEventPayload::DocumentsReviewPending(event)
}
}
4 changes: 2 additions & 2 deletions core/notifications/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub enum UserNotificationCategory {
AdminNotification,
}

#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub enum CircleType {
Inner,
Outer,
Expand All @@ -110,7 +110,7 @@ impl std::fmt::Display for CircleType {
}
}

#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub enum CircleTimeFrame {
Month,
AllTime,
Expand Down
6 changes: 6 additions & 0 deletions lib/tracing-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ fn telemetry_resource(config: &TracingConfig) -> Resource {
]))
}

pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display) {
Span::current().record("error", &tracing::field::display("true"));
Span::current().record("error.level", &tracing::field::display(level));
Span::current().record("error.message", &tracing::field::display(error));
}

pub fn extract_tracing_data() -> HashMap<String, String> {
let mut tracing_data = HashMap::new();
let propagator = TraceContextPropagator::new();
Expand Down

0 comments on commit 87b3e2c

Please sign in to comment.