diff --git a/Cargo.lock b/Cargo.lock index 8f5fc83..b0afae6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,6 +256,8 @@ dependencies = [ "http", "http-body", "http-body-util", + "hyper", + "hyper-util", "itoa", "matchit", "memchr", @@ -264,10 +266,15 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper 1.0.1", + "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -288,6 +295,7 @@ dependencies = [ "sync_wrapper 0.1.2", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -1510,6 +1518,7 @@ name = "nos-followers" version = "0.1.0" dependencies = [ "anyhow", + "axum", "cached", "chrono", "config", @@ -1526,6 +1535,7 @@ dependencies = [ "time", "tokio", "tokio-util", + "tower-http", "tracing", "tracing-subscriber", ] @@ -2494,6 +2504,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.7" @@ -2916,6 +2936,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags", + "bytes", + "http", + "http-body", + "http-body-util", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" version = "0.3.2" diff --git a/Cargo.toml b/Cargo.toml index 7bde762..542e54c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] anyhow = "1.0.86" +axum = "0.7.5" cached = { version = "0.53.1", features = ["async"] } chrono = { version = "0.4.38", features = ["serde"] } config = "0.14.0" @@ -21,5 +22,6 @@ thiserror = "1.0.63" time = "0.3.36" tokio = { version = "1.39.2", features = ["full"] } tokio-util = { version = "0.7.11", features = ["rt"] } +tower-http = { version = "0.5.2", features = ["timeout", "trace"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/compose.yaml b/compose.yaml index 01a208a..8689508 100644 --- a/compose.yaml +++ b/compose.yaml @@ -20,6 +20,8 @@ services: depends_on: db: condition: service_healthy + ports: + - "3000:3000" restart: always attach: true diff --git a/src/http_server.rs b/src/http_server.rs new file mode 100644 index 0000000..c159ceb --- /dev/null +++ b/src/http_server.rs @@ -0,0 +1,51 @@ +mod router; +use anyhow::{Context, Result}; +use axum::Router; +use router::create_router; +use std::net::SocketAddr; +use std::time::Duration; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use tracing::info; + +pub struct HttpServer; +impl HttpServer { + pub async fn run(cancellation_token: CancellationToken) -> Result<()> { + let router = create_router()?; + + start_http_server(router, cancellation_token).await + } +} + +async fn start_http_server(router: Router, cancellation_token: CancellationToken) -> Result<()> { + let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); + let listener = tokio::net::TcpListener::bind(addr).await?; + let token_clone = cancellation_token.clone(); + let server_future = tokio::spawn(async { + axum::serve(listener, router) + .with_graceful_shutdown(shutdown_hook(token_clone)) + .await + .context("Failed to start HTTP server") + }); + + await_shutdown(cancellation_token, server_future).await; + + Ok(()) +} + +async fn await_shutdown( + cancellation_token: CancellationToken, + server_future: tokio::task::JoinHandle>, +) { + cancellation_token.cancelled().await; + info!("Shutdown signal received."); + match timeout(Duration::from_secs(5), server_future).await { + Ok(_) => info!("HTTP service exited successfully."), + Err(e) => info!("HTTP service exited after timeout: {}", e), + } +} + +async fn shutdown_hook(cancellation_token: CancellationToken) { + cancellation_token.cancelled().await; + info!("Exiting the process"); +} diff --git a/src/http_server/router.rs b/src/http_server/router.rs new file mode 100644 index 0000000..1bab63c --- /dev/null +++ b/src/http_server/router.rs @@ -0,0 +1,38 @@ +use anyhow::Result; +use axum::{http::HeaderMap, response::Html}; +use axum::{response::IntoResponse, routing::get, Router}; +use std::time::Duration; +use tower_http::trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}; +use tower_http::LatencyUnit; +use tower_http::{timeout::TimeoutLayer, trace::DefaultOnFailure}; +use tracing::Level; + +pub fn create_router() -> Result { + let tracing_layer = TraceLayer::new_for_http() + .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) + .on_response( + DefaultOnResponse::new() + .level(Level::INFO) + .latency_unit(LatencyUnit::Millis), + ) + .on_failure(DefaultOnFailure::new().level(Level::ERROR)); + + Ok(Router::new() + .route("/", get(serve_root_page)) + .layer(tracing_layer) + .layer(TimeoutLayer::new(Duration::from_secs(1)))) +} + +async fn serve_root_page(_headers: HeaderMap) -> impl IntoResponse { + let body = " + + + Nos + + +

Healthy

+ + "; + + Html(body) +} diff --git a/src/main.rs b/src/main.rs index 8fdcf35..8aa2057 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ mod fetch_friendly_id; mod follow_change_handler; mod follows_differ; mod google_publisher; +mod http_server; mod migrations; mod relay_subscriber; mod repo; @@ -14,6 +15,7 @@ use crate::config::Config; use crate::domain::follow_change::FollowChange; use follow_change_handler::FollowChangeHandler; use follows_differ::FollowsDiffer; +use http_server::HttpServer; use migrations::apply_migrations; use neo4rs::Graph; use nostr_sdk::prelude::*; @@ -89,14 +91,21 @@ async fn main() -> Result<()> { .since(five_minutes_ago) .kind(Kind::ContactList)]; - start_nostr_subscription( + let nostr_sub = start_nostr_subscription( shared_nostr_client, - &[relay], + [relay].into(), filters, event_sender, cancellation_token.clone(), - ) - .await?; + ); + + let http_server = HttpServer::run(cancellation_token.clone()); + + tokio::select! { + _ = nostr_sub => info!("Nostr subscription ended"), + _ = http_server => info!("HTTP server ended"), + _ = cancellation_token.cancelled() => info!("Cancellation token cancelled"), + } info!("Finished Nostr subscription"); diff --git a/src/relay_subscriber.rs b/src/relay_subscriber.rs index 49bd9b3..7547af6 100644 --- a/src/relay_subscriber.rs +++ b/src/relay_subscriber.rs @@ -21,7 +21,7 @@ pub fn create_client() -> Client { pub async fn start_nostr_subscription( nostr_client: Client, - relays: &[String], + relays: Vec, filters: Vec, event_tx: Sender>, cancellation_token: CancellationToken,