Skip to content

Commit

Permalink
Pass config and context by reference in all public APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
spencewenski committed May 12, 2024
1 parent 04fda90 commit 5f3dbbd
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 36 deletions.
8 changes: 4 additions & 4 deletions examples/minimal/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ impl RoadsterApp for App {

async fn services(
registry: &mut ServiceRegistry<Self>,
context: AppContext<Self::State>,
context: &AppContext<Self::State>,
) -> anyhow::Result<()> {
registry
.register_builder(HttpService::builder(BASE, &context).router(controller::routes(BASE)))
.register_builder(HttpService::builder(BASE, context).router(controller::routes(BASE)))
.await?;

registry
.register_builder(
SidekiqWorkerService::builder(context.clone())
SidekiqWorkerService::builder(context)
.await?
.register_app_worker(ExampleWorker::build(&context))?,
.register_app_worker(ExampleWorker::build(context))?,
)
.await?;

Expand Down
64 changes: 40 additions & 24 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ where
}
}

let mut service_registry = ServiceRegistry::new(context.clone());
A::services(&mut service_registry, context.clone()).await?;
let mut service_registry = ServiceRegistry::new(&context);
A::services(&mut service_registry, &context).await?;

#[cfg(feature = "cli")]
for (_name, service) in service_registry.services.iter() {
Expand Down Expand Up @@ -171,29 +171,45 @@ where
let cancel_token = cancel_token.clone();
join_set.spawn(Box::pin(async move {
info!(service=%name, "Running service");
service.run(context, cancel_token).await
service.run(&context, cancel_token).await
}));
}

// Task to clean up resources when gracefully shutting down.
join_set.spawn(cancel_on_error(
cancel_token.clone(),
context.clone(),
graceful_shutdown(
token_shutdown_signal(cancel_token.clone()),
A::graceful_shutdown(context.clone()),
context.clone(),
),
));
{
let context = context.clone();
let cancel_token = cancel_token.clone();
let app_graceful_shutdown = {
let context = context.clone();
Box::pin(async move { A::graceful_shutdown(&context).await })
};
join_set.spawn(Box::pin(async move {
cancel_on_error(
cancel_token.clone(),
&context,
graceful_shutdown(
token_shutdown_signal(cancel_token.clone()),
app_graceful_shutdown,
context.clone(),
),
)
.await
}));
}
// Task to listen for the signal to gracefully shutdown, and trigger other tasks to stop.
let graceful_shutdown_signal = graceful_shutdown_signal(
cancel_token.clone(),
A::graceful_shutdown_signal(context.clone()),
);
join_set.spawn(cancel_token_on_signal_received(
graceful_shutdown_signal,
cancel_token.clone(),
));
{
let context = context.clone();
let app_graceful_shutdown_signal = {
let context = context.clone();
Box::pin(async move { A::graceful_shutdown_signal(&context).await })
};
let graceful_shutdown_signal =
graceful_shutdown_signal(cancel_token.clone(), app_graceful_shutdown_signal);
join_set.spawn(cancel_token_on_signal_received(
graceful_shutdown_signal,
cancel_token.clone(),
));
}

// Wait for all the tasks to complete.
while let Some(result) = join_set.join_next().await {
Expand Down Expand Up @@ -259,22 +275,22 @@ pub trait App: Send + Sync {
/// Provide the services to run in the app.
async fn services(
_registry: &mut ServiceRegistry<Self>,
_context: AppContext<Self::State>,
_context: &AppContext<Self::State>,
) -> anyhow::Result<()> {
Ok(())
}

/// Override to provide a custom shutdown signal. Roadster provides some default shutdown
/// signals, but it may be desirable to provide a custom signal in order to, e.g., shutdown the
/// server when a particular API is called.
async fn graceful_shutdown_signal(_context: AppContext<Self::State>) {
async fn graceful_shutdown_signal(_context: &AppContext<Self::State>) {
let _output: () = future::pending().await;
}

/// Override to provide custom graceful shutdown logic to clean up any resources created by
/// the app. Roadster will take care of cleaning up the resources it created.
#[instrument(skip_all)]
async fn graceful_shutdown(_context: AppContext<Self::State>) -> anyhow::Result<()> {
async fn graceful_shutdown(_context: &AppContext<Self::State>) -> anyhow::Result<()> {
Ok(())
}
}
Expand Down Expand Up @@ -334,7 +350,7 @@ async fn token_shutdown_signal(cancellation_token: CancellationToken) {

async fn cancel_on_error<T, F, S>(
cancellation_token: CancellationToken,
context: AppContext<S>,
context: &AppContext<S>,
f: F,
) -> anyhow::Result<T>
where
Expand Down
2 changes: 1 addition & 1 deletion src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct AppContext<T = ()> {
}

impl<T> AppContext<T> {
pub fn new(
pub(crate) fn new(
config: AppConfig,
#[cfg(feature = "db-sql")] db: DatabaseConnection,
#[cfg(feature = "sidekiq")] redis_enqueue: sidekiq::RedisPool,
Expand Down
2 changes: 1 addition & 1 deletion src/service/http/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<A: App> AppService<A> for HttpService {

async fn run(
&self,
app_context: AppContext<A::State>,
app_context: &AppContext<A::State>,
cancel_token: CancellationToken,
) -> anyhow::Result<()> {
let server_addr = app_context.config().service.http.custom.address.url();
Expand Down
2 changes: 1 addition & 1 deletion src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub trait AppService<A: App>: Send + Sync {
/// the service.
async fn run(
&self,
app_context: AppContext<A::State>,
app_context: &AppContext<A::State>,
cancel_token: CancellationToken,
) -> anyhow::Result<()>;
}
Expand Down
4 changes: 2 additions & 2 deletions src/service/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ where
}

impl<A: App> ServiceRegistry<A> {
pub(crate) fn new(context: AppContext<A::State>) -> Self {
pub(crate) fn new(context: &AppContext<A::State>) -> Self {
Self {
context,
context: context.clone(),
services: Default::default(),
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/service/worker/sidekiq/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<A: App> AppService<A> for SidekiqWorkerService {

async fn run(
&self,
_app_context: AppContext<A::State>,
_app_context: &AppContext<A::State>,
cancel_token: CancellationToken,
) -> anyhow::Result<()> {
let processor = self.processor.clone();
Expand Down Expand Up @@ -80,11 +80,11 @@ impl<A: App> AppService<A> for SidekiqWorkerService {

impl SidekiqWorkerService {
pub async fn builder<A>(
context: AppContext<A::State>,
context: &AppContext<A::State>,
) -> anyhow::Result<SidekiqWorkerServiceBuilder<A>>
where
A: App + 'static,
{
SidekiqWorkerServiceBuilder::with_default_processor(&context, None).await
SidekiqWorkerServiceBuilder::with_default_processor(context, None).await
}
}

0 comments on commit 5f3dbbd

Please sign in to comment.