Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(notifications): send push notification in job #3980

Merged
merged 7 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/notifications/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ path = "src/bin/write_sdl.rs"
fail-on-warnings = []

[dependencies]
job-executor = { path = "../../lib/job-executor-rs" }
tracing = { path = "../../lib/tracing-rs" }
es-entity = { path = "../../lib/es-entity-rs" }

Expand All @@ -34,6 +35,7 @@ thiserror = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
sqlx = { workspace = true }
sqlxmq = { workspace = true }
mongodb = { workspace = true }
rand = { workspace = true }
uuid = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions core/notifications/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ circle_threshold_reached.inner.body: "You have welcomed %{threshold} people to B

circle_threshold_reached.outer.title: "Outer Circle gains 💪"
circle_threshold_reached.outer.body: "Your Outer Circle reached %{threshold} people. You are driving Bitcoin adoption!"

documents_submitted.title: "Documents Received"
documents_submitted.body: "The documents for your verification are being processed."
20 changes: 20 additions & 0 deletions core/notifications/proto/notifications.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ message NotificationEvent {
oneof data {
CircleGrew circle_grew = 1;
CircleThresholdReached circle_threshold_reached = 2;
DocumentsSubmitted documents_submitted = 3;
DocumentsApproved documents_approved = 4;
DocumentsRejected documents_rejected = 5;
DocumentsReviewPending documents_review_pending = 6;
}
}

Expand All @@ -159,3 +163,19 @@ message CircleThresholdReached {
CircleTimeFrame time_frame = 3;
uint32 threshold = 4;
}

message DocumentsSubmitted {
string user_id = 1;
}

message DocumentsApproved {
string user_id = 1;
}

message DocumentsRejected {
string user_id = 1;
}

message DocumentsReviewPending {
string user_id = 1;
}
11 changes: 9 additions & 2 deletions core/notifications/src/app/error.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use thiserror::Error;

use crate::{executor::error::*, user_notification_settings::error::*};
use crate::{
executor::error::ExecutorError, job::error::JobError,
user_notification_settings::error::UserNotificationSettingsError,
};

#[derive(Error, Debug)]
pub enum ApplicationError {
#[error("{0}")]
UserNotificationSettingsError(#[from] UserNotificationSettingsError),
#[error("{0}")]
Novu(#[from] ExecutorError),
JobError(#[from] JobError),
#[error("{0}")]
ExecutorError(#[from] ExecutorError),
#[error("{0}")]
Sqlx(#[from] sqlx::Error),
}
32 changes: 17 additions & 15 deletions core/notifications/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ mod config;
pub mod error;

use sqlx::{Pool, Postgres};
use sqlxmq::JobRunnerHandle;
use tracing::instrument;

use crate::{executor::*, notification_event::*, primitives::*, user_notification_settings::*};
use std::sync::Arc;

use crate::{
executor::*, job, notification_event::*, primitives::*, user_notification_settings::*,
};

pub use config::*;
use error::*;
Expand All @@ -13,19 +18,20 @@ use error::*;
pub struct NotificationsApp {
_config: AppConfig,
settings: UserNotificationSettingsRepo,
executor: Executor,
_pool: Pool<Postgres>,
pool: Pool<Postgres>,
_runner: Arc<JobRunnerHandle>,
}

impl NotificationsApp {
pub async fn init(pool: Pool<Postgres>, config: AppConfig) -> Result<Self, ApplicationError> {
let settings = UserNotificationSettingsRepo::new(&pool);
let executor = Executor::init(config.executor.clone(), settings.clone()).await?;
let runner = job::start_job_runner(&pool, executor).await?;
Ok(Self {
_config: config,
_pool: pool,
executor,
pool,
settings,
_runner: Arc::new(runner),
})
}

Expand Down Expand Up @@ -141,18 +147,14 @@ impl NotificationsApp {
Ok(user_settings)
}

#[instrument(name = "app.handle_circle_grew", skip(self), err)]
pub async fn handle_circle_grew(&self, event: CircleGrew) -> Result<(), ApplicationError> {
self.executor.notify(event).await?;
Ok(())
}

#[instrument(name = "app.handle_threshold_reached", skip(self), err)]
pub async fn handle_threshold_reached(
#[instrument(name = "app.handle_notification_event", skip(self), err)]
pub async fn handle_notification_event<T: NotificationEvent>(
&self,
event: CircleThresholdReached,
event: T,
) -> Result<(), ApplicationError> {
self.executor.notify(event).await?;
let mut tx = self.pool.begin().await?;
job::spawn_send_push_notification(&mut tx, event.into()).await?;
tx.commit().await?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion core/notifications/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Executor {
})
}

pub async fn notify<T: NotificationEventNew>(&self, event: T) -> Result<(), ExecutorError> {
pub async fn notify<T: NotificationEvent>(&self, event: &T) -> Result<(), ExecutorError> {
let settings = self.settings.find_for_user_id(event.user_id()).await?;
if !settings.should_send_notification(
UserNotificationChannel::Push,
Expand Down
52 changes: 50 additions & 2 deletions core/notifications/src/grpc/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl NotificationsService for Notifications {
.map(primitives::CircleType::from)
.map_err(|e| Status::invalid_argument(e.to_string()))?;
self.app
.handle_circle_grew(notification_event::CircleGrew {
.handle_notification_event(notification_event::CircleGrew {
user_id: GaloyUserId::from(user_id),
circle_type,
this_month_circle_size,
Expand All @@ -277,14 +277,62 @@ impl NotificationsService for Notifications {
.map(primitives::CircleTimeFrame::from)
.map_err(|e| Status::invalid_argument(e.to_string()))?;
self.app
.handle_threshold_reached(notification_event::CircleThresholdReached {
.handle_notification_event(notification_event::CircleThresholdReached {
user_id: GaloyUserId::from(user_id),
circle_type,
time_frame,
threshold,
})
.await?
}
Some(proto::NotificationEvent {
data:
Some(proto::notification_event::Data::DocumentsSubmitted(
proto::DocumentsSubmitted { user_id },
)),
}) => {
self.app
.handle_notification_event(notification_event::DocumentsSubmitted {
user_id: GaloyUserId::from(user_id),
})
.await?
}
Some(proto::NotificationEvent {
data:
Some(proto::notification_event::Data::DocumentsApproved(proto::DocumentsApproved {
user_id,
})),
}) => {
self.app
.handle_notification_event(notification_event::DocumentsApproved {
user_id: GaloyUserId::from(user_id),
})
.await?
}
Some(proto::NotificationEvent {
data:
Some(proto::notification_event::Data::DocumentsRejected(proto::DocumentsRejected {
user_id,
})),
}) => {
self.app
.handle_notification_event(notification_event::DocumentsRejected {
user_id: GaloyUserId::from(user_id),
})
.await?
}
Some(proto::NotificationEvent {
data:
Some(proto::notification_event::Data::DocumentsReviewPending(
proto::DocumentsReviewPending { user_id },
)),
}) => {
self.app
.handle_notification_event(notification_event::DocumentsReviewPending {
user_id: GaloyUserId::from(user_id),
})
.await?
}
_ => return Err(Status::invalid_argument("event is required")),
}

Expand Down
13 changes: 13 additions & 0 deletions core/notifications/src/job/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use thiserror::Error;

use crate::executor::error::ExecutorError;

#[derive(Error, Debug)]
pub enum JobError {
#[error("JobError - Sqlx: {0}")]
Sqlx(#[from] sqlx::Error),
#[error("JobError - ExecutorError: {0}")]
Executor(#[from] ExecutorError),
}

impl job_executor::JobExecutionError for JobError {}
63 changes: 63 additions & 0 deletions core/notifications/src/job/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
mod send_push_notification;

pub mod error;

use sqlxmq::{job, CurrentJob, JobRegistry, JobRunnerHandle};
use tracing::instrument;

use job_executor::JobExecutor;

use crate::executor::Executor;

use error::JobError;

use send_push_notification::SendPushNotificationData;

pub async fn start_job_runner(
pool: &sqlx::PgPool,
executor: Executor,
) -> Result<JobRunnerHandle, JobError> {
let mut registry = JobRegistry::new(&[send_push_notification]);
registry.set_context(executor);

Ok(registry.runner(pool).set_keep_alive(false).run().await?)
}

#[job(
name = "send_push_notification",
channel_name = "send_push_notification"
)]
async fn send_push_notification(
mut current_job: CurrentJob,
executor: Executor,
) -> Result<(), JobError> {
JobExecutor::builder(&mut current_job)
.build()
.expect("couldn't build JobExecutor")
.execute(|data| async move {
let data: SendPushNotificationData =
data.expect("no SendPushNotificationData available");
send_push_notification::execute(data, executor).await
})
.await?;
Ok(())
}

#[instrument(name = "job.spawn_send_push_notification", skip_all, fields(error, error.level, error.message), err)]
pub async fn spawn_send_push_notification(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
data: impl Into<SendPushNotificationData>,
) -> Result<(), JobError> {
let data = data.into();
if let Err(e) = send_push_notification
.builder()
.set_json(&data)
.expect("Couldn't set json")
.spawn(&mut **tx)
.await
{
tracing::insert_error_fields(tracing::Level::WARN, &e);
return Err(e.into());
}
Ok(())
}
32 changes: 32 additions & 0 deletions core/notifications/src/job/send_push_notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use serde::{Deserialize, Serialize};
use tracing::instrument;

use std::collections::HashMap;

use super::error::JobError;
use crate::{executor::Executor, notification_event::NotificationEventPayload};

#[derive(Debug, Serialize, Deserialize)]
pub(super) struct SendPushNotificationData {
payload: NotificationEventPayload,
#[serde(flatten)]
pub(super) tracing_data: HashMap<String, String>,
}

impl From<NotificationEventPayload> for SendPushNotificationData {
fn from(payload: NotificationEventPayload) -> Self {
Self {
payload,
tracing_data: tracing::extract_tracing_data(),
}
}
}

#[instrument(name = "job.send_push_notification", skip(executor), err)]
pub async fn execute(
data: SendPushNotificationData,
executor: Executor,
) -> Result<SendPushNotificationData, JobError> {
executor.notify(&data.payload).await?;
Ok(data)
}
1 change: 1 addition & 0 deletions core/notifications/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ rust_i18n::i18n!("locales", fallback = "en");

mod app;
mod data_import;
mod job;
mod messages;

pub mod cli;
Expand Down
26 changes: 26 additions & 0 deletions core/notifications/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,32 @@ impl Messages {
.to_string();
LocalizedMessage { title, body }
}

pub fn documents_submitted(locale: &str, _event: &DocumentsSubmitted) -> LocalizedMessage {
let title = t!("documents_submitted.title", locale = locale).to_string();
let body = t!("documents_submitted.body", locale = locale).to_string();
LocalizedMessage { title, body }
}

pub fn documents_approved(locale: &str, _event: &DocumentsApproved) -> LocalizedMessage {
let title = t!("documents_approved.title", locale = locale).to_string();
let body = t!("documents_approved.body", locale = locale).to_string();
LocalizedMessage { title, body }
}
pub fn documents_rejected(locale: &str, _event: &DocumentsRejected) -> LocalizedMessage {
let title = t!("documents_rejected.title", locale = locale).to_string();
let body = t!("documents_rejected.body", locale = locale).to_string();
LocalizedMessage { title, body }
}

pub fn documents_review_pending(
locale: &str,
_event: &DocumentsReviewPending,
) -> LocalizedMessage {
let title = t!("documents_review_pending.title", locale = locale).to_string();
let body = t!("documents_review_pending.body", locale = locale).to_string();
LocalizedMessage { title, body }
}
}

#[cfg(test)]
Expand Down
Loading
Loading