Skip to content

Commit

Permalink
Initial supervisor just for logging FTM
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Apr 1, 2024
1 parent aa95abd commit 5b34183
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 49 deletions.
3 changes: 3 additions & 0 deletions src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub use gift_unwrapper::GiftUnwrapper;
pub mod event_enqueuer;
pub use event_enqueuer::{EventEnqueuer, PubsubPort};

pub mod supervisor;
pub use supervisor::Supervisor;

pub mod utilities;
#[cfg(test)]
pub use utilities::TestActor;
Expand Down
2 changes: 1 addition & 1 deletion src/actors/gift_unwrapper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::actors::messages::GiftUnwrapperMessage;
use crate::domain_objects::ReportRequest;
use anyhow::Result;
use anyhow::{anyhow, Result};
use nostr_sdk::prelude::*;
use ractor::{Actor, ActorProcessingErr, ActorRef, OutputPort};
use tracing::{error, info};
Expand Down
5 changes: 5 additions & 0 deletions src/actors/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ use crate::domain_objects::*;
use nostr_sdk::prelude::Event;
use std::fmt::Debug;

#[derive(Debug)]
pub enum SupervisorMessage {
Publish(ModeratedReport),
}

#[derive(Debug)]
pub enum RelayEventDispatcherMessage {
Connect,
Expand Down
121 changes: 121 additions & 0 deletions src/actors/supervisor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use crate::actors::PubsubPort;
use crate::actors::{
messages::{GiftUnwrapperMessage, RelayEventDispatcherMessage, SupervisorMessage},
EventEnqueuer, GiftUnwrapper, NostrPort, RelayEventDispatcher,
};
use anyhow::Result;
use nostr_sdk::prelude::*;
use ractor::{cast, Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
use tracing::error;

pub struct Supervisor<T, U> {
_phantom: std::marker::PhantomData<(T, U)>,
}
impl<T, U> Default for Supervisor<T, U>
where
T: NostrPort,
U: PubsubPort,
{
fn default() -> Self {
Self {
_phantom: std::marker::PhantomData,
}
}
}

#[ractor::async_trait]
impl<T, U> Actor for Supervisor<T, U>
where
T: NostrPort,
U: PubsubPort,
{
type Msg = SupervisorMessage;
type State = ActorRef<RelayEventDispatcherMessage>;
type Arguments = (T, U, Keys);

async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
(nostr_subscriber, google_publisher, reportinator_keys): (T, U, Keys),
) -> Result<Self::State, ActorProcessingErr> {
// Spawn actors and wire them together
let (event_dispatcher, _event_dispatcher_handle) = Actor::spawn_linked(
Some("event_dispatcher".to_string()),
RelayEventDispatcher::default(),
nostr_subscriber,
myself.get_cell(),
)
.await?;

let (gift_unwrapper, _gift_unwrapper_handle) = Actor::spawn_linked(
Some("gift_unwrapper".to_string()),
GiftUnwrapper,
reportinator_keys.clone(),
myself.get_cell(),
)
.await?;

cast!(
event_dispatcher,
RelayEventDispatcherMessage::SubscribeToEventReceived(Box::new(gift_unwrapper.clone()))
)?;

let (event_enqueuer, _event_enqueuer_handle) = Actor::spawn_linked(
Some("event_enqueuer".to_string()),
EventEnqueuer::default(),
google_publisher,
myself.get_cell(),
)
.await?;

cast!(
gift_unwrapper,
GiftUnwrapperMessage::SubscribeToEventUnwrapped(Box::new(event_enqueuer))
)?;

// Connect as the last message once everything is wired up
cast!(event_dispatcher, RelayEventDispatcherMessage::Connect)?;

Ok(event_dispatcher)
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SupervisorMessage::Publish(report) => {
cast!(state, RelayEventDispatcherMessage::Publish(report))?;
}
}
Ok(())
}

// For the moment we just log the errors and exit the whole system
async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorTerminated(who, _state, maybe_msg) => {
if let Some(msg) = maybe_msg {
error!("Actor terminated: {:?}, reason: {}", who, msg);
} else {
error!("Actor terminated: {:?}", who);
}
myself.stop(None)
}
SupervisionEvent::ActorPanicked(dead_actor, panic_msg) => {
error!("Actor panicked: {:?}, panic: {}", dead_actor, panic_msg);
}
SupervisionEvent::ActorStarted(_actor) => {}
SupervisionEvent::ProcessGroupChanged(_group) => {}
}

Ok(())
}
}
6 changes: 3 additions & 3 deletions src/adapters/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod app_errors;
mod router;
mod slack_interactions_route;
use crate::actors::messages::RelayEventDispatcherMessage;
use crate::actors::messages::SupervisorMessage;
use anyhow::{Context, Result};
use axum::Router;
use handlebars::Handlebars;
Expand All @@ -18,14 +18,14 @@ use tracing::info;
#[derive(Clone)]
pub struct WebAppState {
hb: Arc<Handlebars<'static>>,
event_dispatcher: ActorRef<RelayEventDispatcherMessage>,
event_dispatcher: ActorRef<SupervisorMessage>,
}

pub struct HttpServer;
impl HttpServer {
pub async fn run(
cancellation_token: CancellationToken,
event_dispatcher: ActorRef<RelayEventDispatcherMessage>,
event_dispatcher: ActorRef<SupervisorMessage>,
) -> Result<()> {
let router = create_router(event_dispatcher)?;

Expand Down
12 changes: 5 additions & 7 deletions src/adapters/http_server/router.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::slack_interactions_route::slack_interactions_route;
use super::WebAppState;
use crate::actors::messages::RelayEventDispatcherMessage;
use crate::actors::messages::SupervisorMessage;
use anyhow::Result;
use axum::{extract::State, http::HeaderMap, response::Html};
use axum::{response::IntoResponse, routing::get, Router};
Expand All @@ -15,8 +15,8 @@ use tower_http::LatencyUnit;
use tower_http::{timeout::TimeoutLayer, trace::DefaultOnFailure};
use tracing::Level;

pub fn create_router(event_dispatcher: ActorRef<RelayEventDispatcherMessage>) -> Result<Router> {
let web_app_state = create_web_app_state(event_dispatcher)?;
pub fn create_router(message_dispatcher: ActorRef<SupervisorMessage>) -> Result<Router> {
let web_app_state = create_web_app_state(message_dispatcher)?;
let tracing_layer = TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::new().level(Level::INFO))
.on_response(
Expand All @@ -35,9 +35,7 @@ pub fn create_router(event_dispatcher: ActorRef<RelayEventDispatcherMessage>) ->
.with_state(web_app_state))
}

fn create_web_app_state(
event_dispatcher: ActorRef<RelayEventDispatcherMessage>,
) -> Result<WebAppState> {
fn create_web_app_state(message_dispatcher: ActorRef<SupervisorMessage>) -> Result<WebAppState> {
let templates_dir = env::var("TEMPLATES_DIR").unwrap_or_else(|_| "/app/templates".to_string());
let mut hb = Handlebars::new();

Expand All @@ -46,7 +44,7 @@ fn create_web_app_state(

Ok(WebAppState {
hb: Arc::new(hb),
event_dispatcher,
event_dispatcher: message_dispatcher,
})
}

Expand Down
11 changes: 6 additions & 5 deletions src/adapters/http_server/slack_interactions_route.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::app_errors::AppError;
use super::WebAppState;
use crate::actors::messages::RelayEventDispatcherMessage;
use crate::actors::messages::SupervisorMessage;
use crate::domain_objects::{ModeratedReport, ModerationCategory, ReportRequest};
use anyhow::{anyhow, Context, Result};
use axum::{extract::State, routing::post, Extension, Router};
Expand Down Expand Up @@ -48,7 +48,8 @@ fn prepare_listener_environment(

async fn slack_interaction_handler(
State(WebAppState {
event_dispatcher, ..
event_dispatcher: message_dispatcher,
..
}): State<WebAppState>,
Extension(event): Extension<SlackInteractionEvent>,
) -> Result<(), AppError> {
Expand All @@ -71,8 +72,8 @@ async fn slack_interaction_handler(

maybe_moderated_report.map(|moderated_report| {
cast!(
event_dispatcher,
RelayEventDispatcherMessage::Publish(moderated_report)
message_dispatcher,
SupervisorMessage::Publish(moderated_report)
)
});

Expand Down Expand Up @@ -201,7 +202,7 @@ mod tests {
#[tokio::test]
async fn test_fails_with_empty_request() {
let (test_actor_ref, _receiver_actor_handle) =
TestActor::<RelayEventDispatcherMessage>::spawn_default()
TestActor::<SupervisorMessage>::spawn_default()
.await
.unwrap();
let state = WebAppState {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod actors;
pub mod adapters;
pub mod domain_objects;
pub mod service_manager;
39 changes: 8 additions & 31 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ mod adapters;
mod domain_objects;
mod service_manager;

use crate::actors::{
messages::{GiftUnwrapperMessage, RelayEventDispatcherMessage},
EventEnqueuer, GiftUnwrapper, NostrPort, RelayEventDispatcher,
};
use crate::actors::Supervisor;
use crate::adapters::{GooglePublisher, HttpServer, NostrSubscriber};
use crate::service_manager::ServiceManager;
use actors::PubsubPort;
use actors::{NostrPort, PubsubPort};
use anyhow::{Context, Result};
use nostr_sdk::prelude::*;
use ractor::cast;
use std::env;
use tracing::info;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
Expand Down Expand Up @@ -81,33 +77,14 @@ async fn start_server(
let mut manager = ServiceManager::new();

// Spawn actors and wire them together
let event_dispatcher = manager
.spawn_actor(RelayEventDispatcher::default(), nostr_subscriber)
let supervisor = manager
.spawn_actor(
Supervisor::default(),
(nostr_subscriber, google_publisher, reportinator_keys),
)
.await?;

let gift_unwrapper = manager
.spawn_blocking_actor(GiftUnwrapper, reportinator_keys.clone())
.await?;

cast!(
event_dispatcher,
RelayEventDispatcherMessage::SubscribeToEventReceived(Box::new(gift_unwrapper.clone()))
)?;

let event_enqueuer = manager
.spawn_actor(EventEnqueuer::default(), google_publisher)
.await?;

cast!(
gift_unwrapper,
GiftUnwrapperMessage::SubscribeToEventUnwrapped(Box::new(event_enqueuer))
)?;

// Connect as the last message once everything is wired up
cast!(event_dispatcher, RelayEventDispatcherMessage::Connect)?;

manager
.spawn_service(|cancellation_token| HttpServer::run(cancellation_token, event_dispatcher));
manager.spawn_service(|cancellation_token| HttpServer::run(cancellation_token, supervisor));

manager
.listen_stop_signals()
Expand Down
11 changes: 9 additions & 2 deletions src/service_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ impl ServiceManager {
let name = Some(simplify_type_name(std::any::type_name::<A>()));
let (actor_ref, actor_handle) = Actor::spawn(name, actor, args).await?;
self.tracker.reopen();
self.tracker.spawn(actor_handle);
let token = self.token.clone();
self.tracker.spawn(async move {
if let Err(e) = actor_handle.await {
error!("Actor failed: {}", e);
}
token.cancel()
});
self.tracker.close();

self.actors.push(actor_ref.get_cell());
Expand All @@ -54,6 +60,7 @@ impl ServiceManager {
Ok(actor_ref)
}

#[allow(unused)]
pub async fn spawn_blocking_actor<A>(
&mut self,
actor: A,
Expand Down Expand Up @@ -86,7 +93,7 @@ impl ServiceManager {
}

// Spawn through a function that receives a cancellation token
#[allow(dead_code)]
#[allow(unused)]
pub fn spawn_service<F, Fut>(&self, task: F) -> JoinHandle<()>
where
F: FnOnce(CancellationToken) -> Fut + Send + 'static,
Expand Down

0 comments on commit 5b34183

Please sign in to comment.