diff --git a/core/notifications/src/app/error.rs b/core/notifications/src/app/error.rs index d25710a8ff3..a51e51c0275 100644 --- a/core/notifications/src/app/error.rs +++ b/core/notifications/src/app/error.rs @@ -13,4 +13,6 @@ pub enum ApplicationError { JobError(#[from] JobError), #[error("{0}")] ExecutorError(#[from] ExecutorError), + #[error("{0}")] + Sqlx(#[from] sqlx::Error), } diff --git a/core/notifications/src/app/mod.rs b/core/notifications/src/app/mod.rs index 035b7850f9f..f466c8f493d 100644 --- a/core/notifications/src/app/mod.rs +++ b/core/notifications/src/app/mod.rs @@ -19,7 +19,7 @@ pub struct NotificationsApp { _config: AppConfig, settings: UserNotificationSettingsRepo, executor: Executor, - _pool: Pool, + pool: Pool, _runner: Arc, } @@ -30,7 +30,7 @@ impl NotificationsApp { let runner = job::start_job_runner(&pool).await?; Ok(Self { _config: config, - _pool: pool, + pool, executor, settings, _runner: Arc::new(runner), @@ -154,6 +154,9 @@ impl NotificationsApp { &self, event: T, ) -> Result<(), ApplicationError> { + let mut tx = self.pool.begin().await?; + job::spawn_send_push_notification(&mut tx).await?; + tx.commit().await?; self.executor.notify(event).await?; Ok(()) } diff --git a/core/notifications/src/job/mod.rs b/core/notifications/src/job/mod.rs index 03a75d9109f..422c32794ae 100644 --- a/core/notifications/src/job/mod.rs +++ b/core/notifications/src/job/mod.rs @@ -1,9 +1,14 @@ +mod send_push_notification; + pub mod error; use sqlxmq::{job, CurrentJob, JobBuilder, JobRegistry, JobRunnerHandle}; +use tracing::instrument; use error::JobError; +use send_push_notification::SendPushNotificationData; + pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result { let mut registry = JobRegistry::new(&[ // sync_all_wallets, @@ -20,3 +25,38 @@ pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result Result<(), JobError> { + // JobExecutor::builder(&mut current_job) + // .build() + // .expect("couldn't build JobExecutor") + // .execute(|data| async move { + // let data: BatchBroadcastingData = data.expect("no BatchBroadcastingData available"); + // batch_broadcasting::execute(data, blockchain_cfg, batches).await + // }) + // .await?; + Ok(()) +} + +#[instrument(name = "job.spawn_send_push_notification", skip_all, fields(error, error.level, error.message), err)] +pub async fn spawn_send_push_notification( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + // data: impl Into, +) -> Result<(), JobError> { + // let data = data.into(); + if let Err(e) = send_push_notification + .builder() + // .set_json(&data) + // .expect("Couldn't set json") + .spawn(&mut **tx) + .await + { + tracing::insert_error_fields(tracing::Level::WARN, &e); + return Err(e.into()); + } + Ok(()) +} diff --git a/core/notifications/src/job/send_push_notification.rs b/core/notifications/src/job/send_push_notification.rs new file mode 100644 index 00000000000..030358eacbc --- /dev/null +++ b/core/notifications/src/job/send_push_notification.rs @@ -0,0 +1,4 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub(super) struct SendPushNotificationData {} diff --git a/lib/tracing-rs/src/lib.rs b/lib/tracing-rs/src/lib.rs index 9850b1111d3..0b94e063944 100644 --- a/lib/tracing-rs/src/lib.rs +++ b/lib/tracing-rs/src/lib.rs @@ -67,6 +67,12 @@ fn telemetry_resource(config: &TracingConfig) -> Resource { ])) } +pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display) { + Span::current().record("error", &tracing::field::display("true")); + Span::current().record("error.level", &tracing::field::display(level)); + Span::current().record("error.message", &tracing::field::display(error)); +} + pub fn extract_tracing_data() -> HashMap { let mut tracing_data = HashMap::new(); let propagator = TraceContextPropagator::new();