diff --git a/src/actors.rs b/src/actors.rs index 10f6da8..18a18c3 100644 --- a/src/actors.rs +++ b/src/actors.rs @@ -7,6 +7,9 @@ pub use gift_unwrapper::GiftUnwrapper; pub mod event_enqueuer; pub use event_enqueuer::{EventEnqueuer, PubsubPort}; +pub mod supervisor; +pub use supervisor::Supervisor; + pub mod utilities; #[cfg(test)] pub use utilities::TestActor; diff --git a/src/actors/gift_unwrapper.rs b/src/actors/gift_unwrapper.rs index 35ea68c..f5c5871 100644 --- a/src/actors/gift_unwrapper.rs +++ b/src/actors/gift_unwrapper.rs @@ -1,6 +1,6 @@ use crate::actors::messages::GiftUnwrapperMessage; use crate::domain_objects::ReportRequest; -use anyhow::Result; +use anyhow::{anyhow, Result}; use nostr_sdk::prelude::*; use ractor::{Actor, ActorProcessingErr, ActorRef, OutputPort}; use tracing::{error, info}; diff --git a/src/actors/messages.rs b/src/actors/messages.rs index 91db6fe..4dd1368 100644 --- a/src/actors/messages.rs +++ b/src/actors/messages.rs @@ -3,6 +3,11 @@ use crate::domain_objects::*; use nostr_sdk::prelude::Event; use std::fmt::Debug; +#[derive(Debug)] +pub enum SupervisorMessage { + Publish(ModeratedReport), +} + #[derive(Debug)] pub enum RelayEventDispatcherMessage { Connect, diff --git a/src/actors/supervisor.rs b/src/actors/supervisor.rs new file mode 100644 index 0000000..a8020c4 --- /dev/null +++ b/src/actors/supervisor.rs @@ -0,0 +1,121 @@ +use crate::actors::PubsubPort; +use crate::actors::{ + messages::{GiftUnwrapperMessage, RelayEventDispatcherMessage, SupervisorMessage}, + EventEnqueuer, GiftUnwrapper, NostrPort, RelayEventDispatcher, +}; +use anyhow::Result; +use nostr_sdk::prelude::*; +use ractor::{cast, Actor, ActorProcessingErr, ActorRef, SupervisionEvent}; +use tracing::error; + +pub struct Supervisor { + _phantom: std::marker::PhantomData<(T, U)>, +} +impl Default for Supervisor +where + T: NostrPort, + U: PubsubPort, +{ + fn default() -> Self { + Self { + _phantom: std::marker::PhantomData, + } + } +} + +#[ractor::async_trait] +impl Actor for Supervisor +where + T: NostrPort, + U: PubsubPort, +{ + type Msg = SupervisorMessage; + type State = ActorRef; + type Arguments = (T, U, Keys); + + async fn pre_start( + &self, + myself: ActorRef, + (nostr_subscriber, google_publisher, reportinator_keys): (T, U, Keys), + ) -> Result { + // Spawn actors and wire them together + let (event_dispatcher, _event_dispatcher_handle) = Actor::spawn_linked( + Some("event_dispatcher".to_string()), + RelayEventDispatcher::default(), + nostr_subscriber, + myself.get_cell(), + ) + .await?; + + let (gift_unwrapper, _gift_unwrapper_handle) = Actor::spawn_linked( + Some("gift_unwrapper".to_string()), + GiftUnwrapper, + reportinator_keys.clone(), + myself.get_cell(), + ) + .await?; + + cast!( + event_dispatcher, + RelayEventDispatcherMessage::SubscribeToEventReceived(Box::new(gift_unwrapper.clone())) + )?; + + let (event_enqueuer, _event_enqueuer_handle) = Actor::spawn_linked( + Some("event_enqueuer".to_string()), + EventEnqueuer::default(), + google_publisher, + myself.get_cell(), + ) + .await?; + + cast!( + gift_unwrapper, + GiftUnwrapperMessage::SubscribeToEventUnwrapped(Box::new(event_enqueuer)) + )?; + + // Connect as the last message once everything is wired up + cast!(event_dispatcher, RelayEventDispatcherMessage::Connect)?; + + Ok(event_dispatcher) + } + + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + SupervisorMessage::Publish(report) => { + cast!(state, RelayEventDispatcherMessage::Publish(report))?; + } + } + Ok(()) + } + + // For the moment we just log the errors and exit the whole system + async fn handle_supervisor_evt( + &self, + myself: ActorRef, + message: SupervisionEvent, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + SupervisionEvent::ActorTerminated(who, _state, maybe_msg) => { + if let Some(msg) = maybe_msg { + error!("Actor terminated: {:?}, reason: {}", who, msg); + } else { + error!("Actor terminated: {:?}", who); + } + myself.stop(None) + } + SupervisionEvent::ActorPanicked(dead_actor, panic_msg) => { + error!("Actor panicked: {:?}, panic: {}", dead_actor, panic_msg); + } + SupervisionEvent::ActorStarted(_actor) => {} + SupervisionEvent::ProcessGroupChanged(_group) => {} + } + + Ok(()) + } +} diff --git a/src/adapters/http_server.rs b/src/adapters/http_server.rs index 21f3560..075ad13 100644 --- a/src/adapters/http_server.rs +++ b/src/adapters/http_server.rs @@ -1,7 +1,7 @@ mod app_errors; mod router; mod slack_interactions_route; -use crate::actors::messages::RelayEventDispatcherMessage; +use crate::actors::messages::SupervisorMessage; use anyhow::{Context, Result}; use axum::Router; use handlebars::Handlebars; @@ -18,14 +18,14 @@ use tracing::info; #[derive(Clone)] pub struct WebAppState { hb: Arc>, - event_dispatcher: ActorRef, + event_dispatcher: ActorRef, } pub struct HttpServer; impl HttpServer { pub async fn run( cancellation_token: CancellationToken, - event_dispatcher: ActorRef, + event_dispatcher: ActorRef, ) -> Result<()> { let router = create_router(event_dispatcher)?; diff --git a/src/adapters/http_server/router.rs b/src/adapters/http_server/router.rs index c40a42d..45d0dd4 100644 --- a/src/adapters/http_server/router.rs +++ b/src/adapters/http_server/router.rs @@ -1,6 +1,6 @@ use super::slack_interactions_route::slack_interactions_route; use super::WebAppState; -use crate::actors::messages::RelayEventDispatcherMessage; +use crate::actors::messages::SupervisorMessage; use anyhow::Result; use axum::{extract::State, http::HeaderMap, response::Html}; use axum::{response::IntoResponse, routing::get, Router}; @@ -15,8 +15,8 @@ use tower_http::LatencyUnit; use tower_http::{timeout::TimeoutLayer, trace::DefaultOnFailure}; use tracing::Level; -pub fn create_router(event_dispatcher: ActorRef) -> Result { - let web_app_state = create_web_app_state(event_dispatcher)?; +pub fn create_router(message_dispatcher: ActorRef) -> Result { + let web_app_state = create_web_app_state(message_dispatcher)?; let tracing_layer = TraceLayer::new_for_http() .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) .on_response( @@ -35,9 +35,7 @@ pub fn create_router(event_dispatcher: ActorRef) -> .with_state(web_app_state)) } -fn create_web_app_state( - event_dispatcher: ActorRef, -) -> Result { +fn create_web_app_state(message_dispatcher: ActorRef) -> Result { let templates_dir = env::var("TEMPLATES_DIR").unwrap_or_else(|_| "/app/templates".to_string()); let mut hb = Handlebars::new(); @@ -46,7 +44,7 @@ fn create_web_app_state( Ok(WebAppState { hb: Arc::new(hb), - event_dispatcher, + event_dispatcher: message_dispatcher, }) } diff --git a/src/adapters/http_server/slack_interactions_route.rs b/src/adapters/http_server/slack_interactions_route.rs index f1087b5..f0a1713 100644 --- a/src/adapters/http_server/slack_interactions_route.rs +++ b/src/adapters/http_server/slack_interactions_route.rs @@ -1,6 +1,6 @@ use super::app_errors::AppError; use super::WebAppState; -use crate::actors::messages::RelayEventDispatcherMessage; +use crate::actors::messages::SupervisorMessage; use crate::domain_objects::{ModeratedReport, ModerationCategory, ReportRequest}; use anyhow::{anyhow, Context, Result}; use axum::{extract::State, routing::post, Extension, Router}; @@ -48,7 +48,8 @@ fn prepare_listener_environment( async fn slack_interaction_handler( State(WebAppState { - event_dispatcher, .. + event_dispatcher: message_dispatcher, + .. }): State, Extension(event): Extension, ) -> Result<(), AppError> { @@ -71,8 +72,8 @@ async fn slack_interaction_handler( maybe_moderated_report.map(|moderated_report| { cast!( - event_dispatcher, - RelayEventDispatcherMessage::Publish(moderated_report) + message_dispatcher, + SupervisorMessage::Publish(moderated_report) ) }); @@ -201,7 +202,7 @@ mod tests { #[tokio::test] async fn test_fails_with_empty_request() { let (test_actor_ref, _receiver_actor_handle) = - TestActor::::spawn_default() + TestActor::::spawn_default() .await .unwrap(); let state = WebAppState { diff --git a/src/lib.rs b/src/lib.rs index 4dc1759..5c34c14 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ pub mod actors; +pub mod adapters; pub mod domain_objects; pub mod service_manager; diff --git a/src/main.rs b/src/main.rs index 24f98ce..9bd97a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,16 +3,12 @@ mod adapters; mod domain_objects; mod service_manager; -use crate::actors::{ - messages::{GiftUnwrapperMessage, RelayEventDispatcherMessage}, - EventEnqueuer, GiftUnwrapper, NostrPort, RelayEventDispatcher, -}; +use crate::actors::Supervisor; use crate::adapters::{GooglePublisher, HttpServer, NostrSubscriber}; use crate::service_manager::ServiceManager; -use actors::PubsubPort; +use actors::{NostrPort, PubsubPort}; use anyhow::{Context, Result}; use nostr_sdk::prelude::*; -use ractor::cast; use std::env; use tracing::info; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -81,33 +77,14 @@ async fn start_server( let mut manager = ServiceManager::new(); // Spawn actors and wire them together - let event_dispatcher = manager - .spawn_actor(RelayEventDispatcher::default(), nostr_subscriber) + let supervisor = manager + .spawn_actor( + Supervisor::default(), + (nostr_subscriber, google_publisher, reportinator_keys), + ) .await?; - let gift_unwrapper = manager - .spawn_blocking_actor(GiftUnwrapper, reportinator_keys.clone()) - .await?; - - cast!( - event_dispatcher, - RelayEventDispatcherMessage::SubscribeToEventReceived(Box::new(gift_unwrapper.clone())) - )?; - - let event_enqueuer = manager - .spawn_actor(EventEnqueuer::default(), google_publisher) - .await?; - - cast!( - gift_unwrapper, - GiftUnwrapperMessage::SubscribeToEventUnwrapped(Box::new(event_enqueuer)) - )?; - - // Connect as the last message once everything is wired up - cast!(event_dispatcher, RelayEventDispatcherMessage::Connect)?; - - manager - .spawn_service(|cancellation_token| HttpServer::run(cancellation_token, event_dispatcher)); + manager.spawn_service(|cancellation_token| HttpServer::run(cancellation_token, supervisor)); manager .listen_stop_signals() diff --git a/src/service_manager.rs b/src/service_manager.rs index 15233bf..235dc26 100644 --- a/src/service_manager.rs +++ b/src/service_manager.rs @@ -42,7 +42,13 @@ impl ServiceManager { let name = Some(simplify_type_name(std::any::type_name::())); let (actor_ref, actor_handle) = Actor::spawn(name, actor, args).await?; self.tracker.reopen(); - self.tracker.spawn(actor_handle); + let token = self.token.clone(); + self.tracker.spawn(async move { + if let Err(e) = actor_handle.await { + error!("Actor failed: {}", e); + } + token.cancel() + }); self.tracker.close(); self.actors.push(actor_ref.get_cell()); @@ -54,6 +60,7 @@ impl ServiceManager { Ok(actor_ref) } + #[allow(unused)] pub async fn spawn_blocking_actor( &mut self, actor: A, @@ -86,7 +93,7 @@ impl ServiceManager { } // Spawn through a function that receives a cancellation token - #[allow(dead_code)] + #[allow(unused)] pub fn spawn_service(&self, task: F) -> JoinHandle<()> where F: FnOnce(CancellationToken) -> Fut + Send + 'static,