Skip to content

Commit

Permalink
Custom state as member of AppContext
Browse files Browse the repository at this point in the history
Add the app's custom state as a member of the `AppContext` this has
several benefits:

- Only need to pass the `AppContext` as a parameter to methods, instead
  of separate methods for the `AppContext` and the custom state. This
  reduces boilerplate and simplfies many method defs.
- We can ensure the custom state can be cheaply cloned by wrapping in an
  `Arc` inside the `AppContext`.
- We no longer require the consumer to implement an `Into<AppContext>`
  (or similar) trait.
  • Loading branch information
spencewenski committed May 12, 2024
1 parent b70596f commit 04fda90
Show file tree
Hide file tree
Showing 36 changed files with 253 additions and 276 deletions.
17 changes: 8 additions & 9 deletions examples/minimal/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use roadster::service::http::service::HttpService;
use roadster::service::registry::ServiceRegistry;
use roadster::service::worker::sidekiq::app_worker::AppWorker;
use roadster::service::worker::sidekiq::service::SidekiqWorkerService;
use std::sync::Arc;

use crate::app_state::AppState;
use crate::cli::AppCli;
Expand All @@ -24,23 +23,23 @@ impl RoadsterApp for App {
type Cli = AppCli;
type M = Migrator;

async fn with_state(_context: &AppContext) -> anyhow::Result<Self::State> {
Ok(())
}

async fn services(
registry: &mut ServiceRegistry<Self>,
context: Arc<AppContext>,
state: Arc<Self::State>,
context: AppContext<Self::State>,
) -> anyhow::Result<()> {
registry
.register_builder(
HttpService::builder(BASE, &context, state.as_ref())
.router(controller::routes(BASE)),
)
.register_builder(HttpService::builder(BASE, &context).router(controller::routes(BASE)))
.await?;

registry
.register_builder(
SidekiqWorkerService::builder(context.clone(), state.clone())
SidekiqWorkerService::builder(context.clone())
.await?
.register_app_worker(ExampleWorker::build(&state))?,
.register_app_worker(ExampleWorker::build(&context))?,
)
.await?;

Expand Down
27 changes: 1 addition & 26 deletions examples/minimal/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,4 @@
// to implement the required traits used to convert it to/from `AppState`.
#![allow(clippy::disallowed_types)]

use std::sync::Arc;

use roadster::app_context::AppContext;

#[derive(Debug, Clone)]
pub struct AppState {
context: Arc<AppContext>,
}

impl AppState {
pub fn new(ctx: Arc<AppContext>) -> Self {
Self { context: ctx }
}
}

impl From<Arc<AppContext>> for AppState {
fn from(value: Arc<AppContext>) -> Self {
AppState::new(value)
}
}

impl From<AppState> for Arc<AppContext> {
fn from(value: AppState) -> Self {
value.context
}
}
pub type AppState = ();
17 changes: 14 additions & 3 deletions examples/minimal/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use clap::{Parser, Subcommand};
use roadster::app_context::AppContext;

use roadster::cli::RunCommand;

Expand All @@ -18,9 +19,14 @@ pub struct AppCli {
#[async_trait]
impl RunCommand<App> for AppCli {
#[allow(clippy::disallowed_types)]
async fn run(&self, app: &App, cli: &AppCli, state: &AppState) -> anyhow::Result<bool> {
async fn run(
&self,
app: &App,
cli: &AppCli,
context: &AppContext<AppState>,
) -> anyhow::Result<bool> {
if let Some(command) = self.command.as_ref() {
command.run(app, cli, state).await
command.run(app, cli, context).await
} else {
Ok(false)
}
Expand All @@ -35,7 +41,12 @@ pub enum AppCommand {}

#[async_trait]
impl RunCommand<App> for AppCommand {
async fn run(&self, _app: &App, _cli: &AppCli, _state: &AppState) -> anyhow::Result<bool> {
async fn run(
&self,
_app: &App,
_cli: &AppCli,
_context: &AppContext<AppState>,
) -> anyhow::Result<bool> {
Ok(false)
}
}
7 changes: 5 additions & 2 deletions examples/minimal/src/controller/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use aide::axum::ApiRouter;
use aide::transform::TransformOperation;
use axum::extract::State;
use axum::Json;
use roadster::app_context::AppContext;
use roadster::controller::build_path;
use roadster::service::worker::sidekiq::app_worker::AppWorker;
use roadster::view::app_error::AppError;
Expand All @@ -15,7 +16,7 @@ use tracing::instrument;
const BASE: &str = "/example";
const TAG: &str = "Example";

pub fn routes(parent: &str) -> ApiRouter<AppState> {
pub fn routes(parent: &str) -> ApiRouter<AppContext<AppState>> {
let root = build_path(parent, BASE);

ApiRouter::new().api_route(&root, get_with(example_get, example_get_docs))
Expand All @@ -26,7 +27,9 @@ pub fn routes(parent: &str) -> ApiRouter<AppState> {
pub struct ExampleResponse {}

#[instrument(skip_all)]
async fn example_get(State(state): State<AppState>) -> Result<Json<ExampleResponse>, AppError> {
async fn example_get(
State(state): State<AppContext<AppState>>,
) -> Result<Json<ExampleResponse>, AppError> {
ExampleWorker::enqueue(&state, "Example".to_string()).await?;
Ok(Json(ExampleResponse {}))
}
Expand Down
3 changes: 2 additions & 1 deletion examples/minimal/src/controller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::app_state::AppState;
use aide::axum::ApiRouter;
use roadster::app_context::AppContext;

pub mod example;

pub fn routes(parent: &str) -> ApiRouter<AppState> {
pub fn routes(parent: &str) -> ApiRouter<AppContext<AppState>> {
ApiRouter::new().merge(example::routes(parent))
}
3 changes: 2 additions & 1 deletion examples/minimal/src/worker/example.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::app::App;
use crate::app_state::AppState;
use async_trait::async_trait;
use roadster::app_context::AppContext;
use roadster::service::worker::sidekiq::app_worker::AppWorker;
use sidekiq::Worker;
use tracing::{info, instrument};
Expand All @@ -18,7 +19,7 @@ impl Worker<String> for ExampleWorker {

#[async_trait]
impl AppWorker<App, String> for ExampleWorker {
fn build(_state: &AppState) -> Self {
fn build(_context: &AppContext<AppState>) -> Self {
Self {}
}
}
54 changes: 22 additions & 32 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use sea_orm::{ConnectOptions, Database};
use sea_orm_migration::MigratorTrait;
use std::future;
use std::future::Future;
use std::sync::Arc;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, instrument, warn};
Expand Down Expand Up @@ -117,38 +116,36 @@ where
(redis_enqueue, redis_fetch)
};

let context = AppContext::new(
let context = AppContext::<()>::new(
config,
#[cfg(feature = "db-sql")]
db,
#[cfg(feature = "sidekiq")]
redis_enqueue.clone(),
#[cfg(feature = "sidekiq")]
redis_fetch.clone(),
)
.await?;
)?;

let context = Arc::new(context);
let state = A::context_to_state(context.clone()).await?;
let state = Arc::new(state);
let state = A::with_state(&context).await?;
let context = context.with_custom(state);

#[cfg(feature = "cli")]
{
if roadster_cli.run(&app, &roadster_cli, &context).await? {
return Ok(());
}
if app_cli.run(&app, &app_cli, &state).await? {
if app_cli.run(&app, &app_cli, &context).await? {
return Ok(());
}
}

let mut service_registry = ServiceRegistry::new(context.clone(), state.clone());
A::services(&mut service_registry, context.clone(), state.clone()).await?;
let mut service_registry = ServiceRegistry::new(context.clone());
A::services(&mut service_registry, context.clone()).await?;

#[cfg(feature = "cli")]
for (_name, service) in service_registry.services.iter() {
if service
.handle_cli(&roadster_cli, &app_cli, &context, &state)
.handle_cli(&roadster_cli, &app_cli, &context)
.await?
{
return Ok(());
Expand All @@ -171,11 +168,10 @@ where
// Spawn tasks for the app's services
for (name, service) in service_registry.services {
let context = context.clone();
let state = state.clone();
let cancel_token = cancel_token.clone();
join_set.spawn(Box::pin(async move {
info!(service=%name, "Running service");
service.run(context, state, cancel_token).await
service.run(context, cancel_token).await
}));
}

Expand All @@ -185,15 +181,14 @@ where
context.clone(),
graceful_shutdown(
token_shutdown_signal(cancel_token.clone()),
A::graceful_shutdown(context.clone(), state.clone()),
#[cfg(feature = "db-sql")]
A::graceful_shutdown(context.clone()),
context.clone(),
),
));
// Task to listen for the signal to gracefully shutdown, and trigger other tasks to stop.
let graceful_shutdown_signal = graceful_shutdown_signal(
cancel_token.clone(),
A::graceful_shutdown_signal(context.clone(), state.clone()),
A::graceful_shutdown_signal(context.clone()),
);
join_set.spawn(cancel_token_on_signal_received(
graceful_shutdown_signal,
Expand Down Expand Up @@ -223,7 +218,8 @@ where

#[async_trait]
pub trait App: Send + Sync {
type State: From<Arc<AppContext>> + Into<Arc<AppContext>> + Clone + Send + Sync + 'static;
// Todo: Are clone, etc necessary if we store it inside an Arc?
type State: Clone + Send + Sync + 'static;
#[cfg(feature = "cli")]
type Cli: clap::Args + RunCommand<Self>;
#[cfg(feature = "db-sql")]
Expand Down Expand Up @@ -258,34 +254,27 @@ pub trait App: Send + Sync {
/// method is provided in case there's any additional work that needs to be done that the
/// consumer can't put in a [`From<AppContext>`] implementation. For example, any
/// configuration that needs to happen in an async method.
async fn context_to_state(context: Arc<AppContext>) -> anyhow::Result<Self::State> {
let state = Self::State::from(context);
Ok(state)
}
async fn with_state(context: &AppContext) -> anyhow::Result<Self::State>;

/// Provide the services to run in the app.
async fn services(
_registry: &mut ServiceRegistry<Self>,
_context: Arc<AppContext>,
_state: Arc<Self::State>,
_context: AppContext<Self::State>,
) -> anyhow::Result<()> {
Ok(())
}

/// Override to provide a custom shutdown signal. Roadster provides some default shutdown
/// signals, but it may be desirable to provide a custom signal in order to, e.g., shutdown the
/// server when a particular API is called.
async fn graceful_shutdown_signal(_context: Arc<AppContext>, _state: Arc<Self::State>) {
async fn graceful_shutdown_signal(_context: AppContext<Self::State>) {
let _output: () = future::pending().await;
}

/// Override to provide custom graceful shutdown logic to clean up any resources created by
/// the app. Roadster will take care of cleaning up the resources it created.
#[instrument(skip_all)]
async fn graceful_shutdown(
_context: Arc<AppContext>,
_state: Arc<Self::State>,
) -> anyhow::Result<()> {
async fn graceful_shutdown(_context: AppContext<Self::State>) -> anyhow::Result<()> {
Ok(())
}
}
Expand Down Expand Up @@ -343,9 +332,9 @@ async fn token_shutdown_signal(cancellation_token: CancellationToken) {
cancellation_token.cancelled().await
}

async fn cancel_on_error<T, F>(
async fn cancel_on_error<T, F, S>(
cancellation_token: CancellationToken,
context: Arc<AppContext>,
context: AppContext<S>,
f: F,
) -> anyhow::Result<T>
where
Expand All @@ -359,10 +348,11 @@ where
}

#[instrument(skip_all)]
async fn graceful_shutdown<F1, F2>(
async fn graceful_shutdown<F1, F2, S>(
shutdown_signal: F1,
app_graceful_shutdown: F2,
#[cfg(feature = "db-sql")] context: Arc<AppContext>,
// This parameter is (currently) not used when no features are enabled.
#[allow(unused_variables)] context: AppContext<S>,
) -> anyhow::Result<()>
where
F1: Future<Output = ()> + Send + 'static,
Expand Down
25 changes: 19 additions & 6 deletions src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ use sea_orm::DatabaseConnection;

use crate::config::app_config::AppConfig;

#[derive(Debug, Clone)]
pub struct AppContext {
#[derive(Clone)]
pub struct AppContext<T = ()> {
inner: Arc<AppContextInner>,
custom: Arc<T>,
}

impl AppContext {
pub async fn new(
impl<T> AppContext<T> {
pub fn new(
config: AppConfig,
#[cfg(feature = "db-sql")] db: DatabaseConnection,
#[cfg(feature = "sidekiq")] redis_enqueue: sidekiq::RedisPool,
#[cfg(feature = "sidekiq")] redis_fetch: Option<sidekiq::RedisPool>,
) -> anyhow::Result<Self> {
) -> anyhow::Result<AppContext<()>> {
let inner = AppContextInner {
config,
#[cfg(feature = "db-sql")]
Expand All @@ -26,11 +27,19 @@ impl AppContext {
#[cfg(feature = "sidekiq")]
redis_fetch,
};
Ok(Self {
Ok(AppContext {
inner: Arc::new(inner),
custom: Arc::new(()),
})
}

pub fn with_custom<NewT>(self, custom: NewT) -> AppContext<NewT> {
AppContext {
inner: self.inner,
custom: Arc::new(custom),
}
}

pub fn config(&self) -> &AppConfig {
&self.inner.config
}
Expand All @@ -49,6 +58,10 @@ impl AppContext {
pub fn redis_fetch(&self) -> Option<&sidekiq::RedisPool> {
self.inner.redis_fetch.as_ref()
}

pub fn custom(&self) -> &T {
&self.custom
}
}

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 04fda90

Please sign in to comment.