Skip to content

Commit

Permalink
chore(notifications): boilerplate to start job registry
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Feb 12, 2024
1 parent bd54ff8 commit 64087d7
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 3 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/notifications/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ path = "src/bin/write_sdl.rs"
fail-on-warnings = []

[dependencies]
job-executor = { path = "../../lib/job-executor-rs" }
tracing = { path = "../../lib/tracing-rs" }
es-entity = { path = "../../lib/es-entity-rs" }

Expand All @@ -34,6 +35,7 @@ thiserror = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
sqlx = { workspace = true }
sqlxmq = { workspace = true }
mongodb = { workspace = true }
rand = { workspace = true }
uuid = { workspace = true }
Expand Down
9 changes: 7 additions & 2 deletions core/notifications/src/app/error.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use thiserror::Error;

use crate::{executor::error::*, user_notification_settings::error::*};
use crate::{
executor::error::ExecutorError, job::error::JobError,
user_notification_settings::error::UserNotificationSettingsError,
};

#[derive(Error, Debug)]
pub enum ApplicationError {
#[error("{0}")]
UserNotificationSettingsError(#[from] UserNotificationSettingsError),
#[error("{0}")]
Novu(#[from] ExecutorError),
JobError(#[from] JobError),
#[error("{0}")]
ExecutorError(#[from] ExecutorError),
}
10 changes: 9 additions & 1 deletion core/notifications/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ mod config;
pub mod error;

use sqlx::{Pool, Postgres};
use sqlxmq::JobRunnerHandle;
use tracing::instrument;

use crate::{executor::*, notification_event::*, primitives::*, user_notification_settings::*};
use std::sync::Arc;

use crate::{
executor::*, job, notification_event::*, primitives::*, user_notification_settings::*,
};

pub use config::*;
use error::*;
Expand All @@ -15,17 +20,20 @@ pub struct NotificationsApp {
settings: UserNotificationSettingsRepo,
executor: Executor,
_pool: Pool<Postgres>,
_runner: Arc<JobRunnerHandle>,
}

impl NotificationsApp {
pub async fn init(pool: Pool<Postgres>, config: AppConfig) -> Result<Self, ApplicationError> {
let settings = UserNotificationSettingsRepo::new(&pool);
let executor = Executor::init(config.executor.clone(), settings.clone()).await?;
let runner = job::start_job_runner(&pool).await?;
Ok(Self {
_config: config,
_pool: pool,
executor,
settings,
_runner: Arc::new(runner),
})
}

Expand Down
7 changes: 7 additions & 0 deletions core/notifications/src/job/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum JobError {
#[error("JobError - Sqlx: {0}")]
Sqlx(#[from] sqlx::Error),
}
22 changes: 22 additions & 0 deletions core/notifications/src/job/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
pub mod error;

use sqlxmq::{job, CurrentJob, JobBuilder, JobRegistry, JobRunnerHandle};

use error::JobError;

pub async fn start_job_runner(pool: &sqlx::PgPool) -> Result<JobRunnerHandle, JobError> {
let mut registry = JobRegistry::new(&[
// sync_all_wallets,
// sync_wallet,
// process_all_payout_queues,
// schedule_process_payout_queue,
// process_payout_queue,
// batch_wallet_accounting,
// batch_signing,
// batch_broadcasting,
// respawn_all_outbox_handlers,
// populate_outbox,
]);

Ok(registry.runner(pool).set_keep_alive(false).run().await?)
}
1 change: 1 addition & 0 deletions core/notifications/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ rust_i18n::i18n!("locales", fallback = "en");

mod app;
mod data_import;
mod job;
mod messages;

pub mod cli;
Expand Down

0 comments on commit 64087d7

Please sign in to comment.