Skip to content

Commit

Permalink
Add AppService trait and use it to implement the HTTP service
Browse files Browse the repository at this point in the history
Add an `AppService` trait to allow consumers to define custom services
that will be run as tokio tasks. This is similar (I believe) to
Laraval's `Provider` concept.

Use the `AppService` trait to add an HTTP server using Axum instead of
putting http/axum specific methods directly on the `App` trait
  • Loading branch information
spencewenski committed Apr 29, 2024
1 parent 92f0101 commit 299e72d
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 224 deletions.
23 changes: 16 additions & 7 deletions examples/minimal/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use aide::axum::ApiRouter;
use async_trait::async_trait;
use migration::Migrator;
use roadster::app::App as RoadsterApp;
use roadster::app_context::AppContext;
use roadster::config::app_config::AppConfig;
use roadster::controller::default_routes;
use roadster::service::http::http_service_builder::HttpServiceBuilder;
use roadster::service::AppService;
use roadster::worker::app_worker::AppWorker;
use roadster::worker::registry::WorkerRegistry;
use std::vec;

use crate::app_state::AppState;
use crate::cli::AppCli;
Expand All @@ -24,10 +24,6 @@ impl RoadsterApp for App {
type Cli = AppCli;
type M = Migrator;

fn router(config: &AppConfig) -> ApiRouter<Self::State> {
default_routes(BASE, config).merge(controller::routes(BASE))
}

async fn workers(
registry: &mut WorkerRegistry<Self>,
_context: &AppContext,
Expand All @@ -36,4 +32,17 @@ impl RoadsterApp for App {
registry.register_app_worker(ExampleWorker::build(state));
Ok(())
}

async fn services(
context: &AppContext,
state: &Self::State,
) -> anyhow::Result<Vec<Box<dyn AppService<Self>>>> {
let http_service = Box::new(
HttpServiceBuilder::<Self>::new(BASE, context)
.router(controller::routes(BASE))
.build(context, state)?,
);

Ok(vec![http_service])
}
}
187 changes: 44 additions & 143 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use std::future;
use std::future::Future;
use std::sync::Arc;

#[cfg(feature = "open-api")]
use aide::axum::ApiRouter;
#[cfg(feature = "open-api")]
use aide::openapi::OpenApi;
#[cfg(feature = "open-api")]
use aide::transform::TransformOpenApi;
use crate::app_context::AppContext;
#[cfg(feature = "cli")]
use crate::cli::{RoadsterCli, RunCommand, RunRoadsterCommand};
use crate::config::app_config::AppConfig;
#[cfg(not(feature = "cli"))]
use crate::config::environment::Environment;
#[cfg(feature = "sidekiq")]
use crate::config::worker::StaleCleanUpBehavior;
use crate::service::AppService;
use crate::tracing::init_tracing;
#[cfg(feature = "sidekiq")]
use crate::worker::registry::WorkerRegistry;
#[cfg(feature = "sidekiq")]
use anyhow::anyhow;
use async_trait::async_trait;
#[cfg(feature = "open-api")]
use axum::Extension;
use axum::Router;
#[cfg(feature = "cli")]
use clap::{Args, Command, FromArgMatches};
#[cfg(feature = "sidekiq")]
use itertools::Itertools;
#[cfg(feature = "sidekiq")]
use num_traits::ToPrimitive;
Expand All @@ -25,28 +25,15 @@ use sea_orm::{ConnectOptions, Database};
use sea_orm_migration::MigratorTrait;
#[cfg(feature = "sidekiq")]
use sidekiq::{periodic, Processor, ProcessorConfig};
use std::future;
use std::future::Future;
use std::sync::Arc;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "sidekiq")]
use tracing::debug;
use tracing::{error, info, instrument};

use crate::app_context::AppContext;
#[cfg(feature = "cli")]
use crate::cli::{RoadsterCli, RunCommand, RunRoadsterCommand};
use crate::config::app_config::AppConfig;
#[cfg(not(feature = "cli"))]
use crate::config::environment::Environment;
#[cfg(feature = "sidekiq")]
use crate::config::worker::StaleCleanUpBehavior;
use crate::controller::middleware::default::default_middleware;
use crate::controller::middleware::Middleware;
use crate::initializer::default::default_initializers;
use crate::initializer::Initializer;
use crate::tracing::init_tracing;
#[cfg(feature = "sidekiq")]
use crate::worker::registry::WorkerRegistry;

// todo: this method is getting unweildy, we should break it up
pub async fn start<A>(
// This parameter is (currently) not used when no features are enabled.
Expand Down Expand Up @@ -144,16 +131,6 @@ where
(redis_enqueue, redis_fetch)
};

let router = A::router(&config);
#[cfg(feature = "open-api")]
let (router, api) = {
let mut api = OpenApi::default();
let router = router.finish_api_with(&mut api, A::api_docs(&config));
// Arc is very important here or we will face massive memory and performance issues
let api = Arc::new(api);
let router = router.layer(Extension(api.clone()));
(router, api)
};
let context = AppContext::new(
config,
#[cfg(feature = "db-sql")]
Expand All @@ -162,14 +139,11 @@ where
redis_enqueue.clone(),
#[cfg(feature = "sidekiq")]
redis_fetch.clone(),
#[cfg(feature = "open-api")]
api,
)
.await?;

let context = Arc::new(context);
let state = A::context_to_state(context.clone()).await?;
let router = router.with_state::<()>(state.clone());
let state = Arc::new(state);

#[cfg(feature = "cli")]
Expand All @@ -182,58 +156,23 @@ where
}
}

let services = A::services(&context, &state).await?;

#[cfg(feature = "cli")]
for service in services.iter() {
if service
.handle_cli(&roadster_cli, &app_cli, &context, &state)
.await?
{
return Ok(());
}
}

#[cfg(feature = "db-sql")]
if context.config.database.auto_migrate {
A::M::up(&context.db, None).await?;
}

let initializers = default_initializers()
.into_iter()
.chain(A::initializers(&context))
.filter(|initializer| initializer.enabled(&context, &state))
.unique_by(|initializer| initializer.name())
.sorted_by(|a, b| Ord::cmp(&a.priority(&context, &state), &b.priority(&context, &state)))
.collect_vec();

let router = initializers
.iter()
.try_fold(router, |router, initializer| {
initializer.after_router(router, &context, &state)
})?;

let router = initializers
.iter()
.try_fold(router, |router, initializer| {
initializer.before_middleware(router, &context, &state)
})?;

// Install middleware, both the default middleware and any provided by the consumer.
info!("Installing middleware. Note: the order of installation is the inverse of the order middleware will run when handling a request.");
let router = default_middleware()
.into_iter()
.chain(A::middleware(&context, &state).into_iter())
.filter(|middleware| middleware.enabled(&context, &state))
.unique_by(|middleware| middleware.name())
.sorted_by(|a, b| Ord::cmp(&a.priority(&context, &state), &b.priority(&context, &state)))
// Reverse due to how Axum's `Router#layer` method adds middleware.
.rev()
.try_fold(router, |router, middleware| {
info!("Installing middleware: `{}`", middleware.name());
middleware.install(router, &context, &state)
})?;

let router = initializers
.iter()
.try_fold(router, |router, initializer| {
initializer.after_middleware(router, &context, &state)
})?;

let router = initializers
.iter()
.try_fold(router, |router, initializer| {
initializer.before_serve(router, &context, &state)
})?;

#[cfg(feature = "sidekiq")]
let (processor, sidekiq_cancellation_token, _sidekiq_cancellation_token_drop_guard) =
if redis_fetch.is_some() && context.config.worker.sidekiq.queues.is_empty() {
Expand Down Expand Up @@ -297,17 +236,17 @@ where

let cancel_token = CancellationToken::new();
let mut join_set = JoinSet::new();
// Task to serve the app.
join_set.spawn(cancel_on_error(
cancel_token.clone(),
context.clone(),
A::serve(
router,
token_shutdown_signal(cancel_token.clone()),
context.clone(),
state.clone(),
),
));

// Spawn tasks for the app's services
for service in services {
let context = context.clone();
let state = state.clone();
let cancel_token = cancel_token.clone();
join_set.spawn(Box::pin(async move {
service.run(context, state, cancel_token).await
}));
}

// Task to run the sidekiq processor
#[cfg(feature = "sidekiq")]
join_set.spawn(Box::pin(async {
Expand Down Expand Up @@ -403,31 +342,6 @@ pub trait App: Send + Sync {
Ok(state)
}

#[cfg(not(feature = "open-api"))]
fn router(_config: &AppConfig) -> Router<Self::State>;

#[cfg(feature = "open-api")]
fn router(_config: &AppConfig) -> ApiRouter<Self::State>;

#[cfg(feature = "open-api")]
fn api_docs(config: &AppConfig) -> impl Fn(TransformOpenApi) -> TransformOpenApi {
|api| {
api.title(&config.app.name)
.description(&format!("# {}", config.app.name))
}
}

fn middleware(
_context: &AppContext,
_state: &Self::State,
) -> Vec<Box<dyn Middleware<Self::State>>> {
Default::default()
}

fn initializers(_context: &AppContext) -> Vec<Box<dyn Initializer<Self::State>>> {
Default::default()
}

/// Worker queue names can either be provided here, or as config values. If provided here
/// the consumer is able to use string constants, which can be used when creating a worker
/// instance. This can reduce the risk of copy/paste errors and typos.
Expand All @@ -445,24 +359,11 @@ pub trait App: Send + Sync {
Ok(())
}

async fn serve<F>(
router: Router,
shutdown_signal: F,
context: Arc<AppContext>,
_state: Arc<Self::State>,
) -> anyhow::Result<()>
where
F: Future<Output = ()> + Send + 'static,
{
let server_addr = context.config.server.url();
info!("Server will start at {server_addr}");

let app_listener = tokio::net::TcpListener::bind(server_addr).await?;
axum::serve(app_listener, router)
.with_graceful_shutdown(shutdown_signal)
.await?;

Ok(())
async fn services(
_context: &AppContext,
_state: &Self::State,
) -> anyhow::Result<Vec<Box<dyn AppService<Self>>>> {
Ok(Default::default())
}

/// Override to provide a custom shutdown signal. Roadster provides some default shutdown
Expand Down
7 changes: 0 additions & 7 deletions src/app_context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::sync::Arc;

#[cfg(feature = "open-api")]
use aide::openapi::OpenApi;
#[cfg(feature = "db-sql")]
use sea_orm::DatabaseConnection;

Expand All @@ -20,8 +18,6 @@ pub struct AppContext {
/// config is set to zero, in which case the [sidekiq::Processor] would also not be started.
#[cfg(feature = "sidekiq")]
pub redis_fetch: Option<sidekiq::RedisPool>,
#[cfg(feature = "open-api")]
pub api: Arc<OpenApi>,
// Prevent consumers from directly creating an AppContext
_private: (),
}
Expand All @@ -32,7 +28,6 @@ impl AppContext {
#[cfg(feature = "db-sql")] db: DatabaseConnection,
#[cfg(feature = "sidekiq")] redis_enqueue: sidekiq::RedisPool,
#[cfg(feature = "sidekiq")] redis_fetch: Option<sidekiq::RedisPool>,
#[cfg(feature = "open-api")] api: Arc<OpenApi>,
) -> anyhow::Result<Self> {
let context = Self {
config,
Expand All @@ -42,8 +37,6 @@ impl AppContext {
redis_enqueue,
#[cfg(feature = "sidekiq")]
redis_fetch,
#[cfg(feature = "open-api")]
api,
_private: (),
};
Ok(context)
Expand Down
27 changes: 0 additions & 27 deletions src/cli/list_routes.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,4 @@
use async_trait::async_trait;
use clap::Parser;
use tracing::info;

use crate::app::App;
use crate::app_context::AppContext;
use crate::cli::{RoadsterCli, RunRoadsterCommand};

#[derive(Debug, Parser)]
pub struct ListRoutesArgs {}

#[async_trait]
impl<A> RunRoadsterCommand<A> for ListRoutesArgs
where
A: App,
{
async fn run(
&self,
_app: &A,
_cli: &RoadsterCli,
context: &AppContext,
) -> anyhow::Result<bool> {
info!("API routes:");
context
.api
.as_ref()
.operations()
.for_each(|(path, method, _operation)| info!("[{method}]\t{path}"));
Ok(true)
}
}
12 changes: 10 additions & 2 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,17 @@ where
async fn run(&self, app: &A, cli: &RoadsterCli, context: &AppContext) -> anyhow::Result<bool> {
match self {
#[cfg(feature = "open-api")]
RoadsterSubCommand::ListRoutes(args) => args.run(app, cli, context).await,
RoadsterSubCommand::ListRoutes(_) => {
#[allow(unused_doc_comments)]
/// Implemented by [crate::service::http::http_service::HttpService]
Ok(false)
}
#[cfg(feature = "open-api")]
RoadsterSubCommand::OpenApi(args) => args.run(app, cli, context).await,
RoadsterSubCommand::OpenApi(_) => {
#[allow(unused_doc_comments)]
/// Implemented by [crate::service::http::http_service::HttpService]
Ok(false)
}
#[cfg(feature = "db-sql")]
RoadsterSubCommand::Migrate(args) => args.run(app, cli, context).await,
RoadsterSubCommand::PrintConfig(args) => args.run(app, cli, context).await,
Expand Down
Loading

0 comments on commit 299e72d

Please sign in to comment.