Skip to content

Commit

Permalink
chore(notifications): more job::send_push_notification boilerplate
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Feb 12, 2024
1 parent 177c1df commit 83e5c8e
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 2 deletions.
2 changes: 2 additions & 0 deletions core/notifications/src/app/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ pub enum ApplicationError {
JobError(#[from] JobError),
#[error("{0}")]
ExecutorError(#[from] ExecutorError),
#[error("{0}")]
Sqlx(#[from] sqlx::Error),
}
7 changes: 5 additions & 2 deletions core/notifications/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct NotificationsApp {
_config: AppConfig,
settings: UserNotificationSettingsRepo,
executor: Executor,
_pool: Pool<Postgres>,
pool: Pool<Postgres>,
_runner: Arc<JobRunnerHandle>,
}

Expand All @@ -30,7 +30,7 @@ impl NotificationsApp {
let runner = job::start_job_runner(&pool).await?;
Ok(Self {
_config: config,
_pool: pool,
pool,
executor,
settings,
_runner: Arc::new(runner),
Expand Down Expand Up @@ -154,6 +154,9 @@ impl NotificationsApp {
&self,
event: T,
) -> Result<(), ApplicationError> {
let mut tx = self.pool.begin().await?;
job::spawn_send_push_notification(&mut tx).await?;
tx.commit().await?;
self.executor.notify(event).await?;
Ok(())
}
Expand Down
40 changes: 40 additions & 0 deletions core/notifications/src/job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
mod send_push_notification;

pub mod error;

use sqlxmq::{job, CurrentJob, JobBuilder, JobRegistry, JobRunnerHandle};
use tracing::instrument;

use error::JobError;

use send_push_notification::SendPushNotificationData;

pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result<JobRunnerHandle, JobError> {
let mut registry = JobRegistry::new(&[
// sync_all_wallets,
Expand All @@ -20,3 +25,38 @@ pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result<JobRunnerHandle, Jo

Ok(registry.runner(pool).set_keep_alive(false).run().await?)
}

#[job(
name = "send_push_notification",
channel_name = "send_push_notification"
)]
async fn send_push_notification(mut current_job: CurrentJob) -> Result<(), JobError> {
// JobExecutor::builder(&mut current_job)
// .build()
// .expect("couldn't build JobExecutor")
// .execute(|data| async move {
// let data: BatchBroadcastingData = data.expect("no BatchBroadcastingData available");
// batch_broadcasting::execute(data, blockchain_cfg, batches).await
// })
// .await?;
Ok(())
}

#[instrument(name = "job.spawn_send_push_notification", skip_all, fields(error, error.level, error.message), err)]
pub async fn spawn_send_push_notification(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
// data: impl Into<SendPushNotificationData>,
) -> Result<(), JobError> {
// let data = data.into();
if let Err(e) = send_push_notification
.builder()
// .set_json(&data)
// .expect("Couldn't set json")
.spawn(&mut **tx)
.await
{
tracing::insert_error_fields(tracing::Level::WARN, &e);
return Err(e.into());
}
Ok(())
}
4 changes: 4 additions & 0 deletions core/notifications/src/job/send_push_notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
pub(super) struct SendPushNotificationData {}
6 changes: 6 additions & 0 deletions lib/tracing-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ fn telemetry_resource(config: &TracingConfig) -> Resource {
]))
}

pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display) {
Span::current().record("error", &tracing::field::display("true"));
Span::current().record("error.level", &tracing::field::display(level));
Span::current().record("error.message", &tracing::field::display(error));
}

pub fn extract_tracing_data() -> HashMap<String, String> {
let mut tracing_data = HashMap::new();
let propagator = TraceContextPropagator::new();
Expand Down

0 comments on commit 83e5c8e

Please sign in to comment.