diff --git a/core/src/lib.rs b/core/src/lib.rs index 5dd9ec2f2..8c55ec89f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -8,6 +8,7 @@ pub mod state; pub mod supervisor; pub mod timing; pub mod types; +pub mod util; /// This is a stand-in for the “never” type until RFC 1216 is stabilized. /// Because it is not constructable, the compiler enforces that a function diff --git a/core/src/messages/agent.rs b/core/src/messages/agent.rs index 40b26d421..a5e16a2c7 100644 --- a/core/src/messages/agent.rs +++ b/core/src/messages/agent.rs @@ -58,25 +58,25 @@ pub enum DroneLogMessageKind { #[derive(Serialize, Deserialize, Debug, TypedMessage)] #[typed_message(subject = "backend.#backend_id.log")] -pub struct DroneLogMessage { +pub struct BackendLogMessage { pub backend_id: BackendId, pub kind: DroneLogMessageKind, pub text: String, } -impl DroneLogMessage { +impl BackendLogMessage { #[cfg(feature = "bollard")] pub fn from_log_message( backend_id: &BackendId, log_message: &LogOutput, - ) -> Option { + ) -> Option { match log_message { - bollard::container::LogOutput::StdErr { message } => Some(DroneLogMessage { + bollard::container::LogOutput::StdErr { message } => Some(BackendLogMessage { backend_id: backend_id.clone(), kind: DroneLogMessageKind::Stderr, text: std::str::from_utf8(message).ok()?.to_string(), }), - bollard::container::LogOutput::StdOut { message } => Some(DroneLogMessage { + bollard::container::LogOutput::StdOut { message } => Some(BackendLogMessage { backend_id: backend_id.clone(), kind: DroneLogMessageKind::Stdout, text: std::str::from_utf8(message).ok()?.to_string(), @@ -101,7 +101,7 @@ impl DroneLogMessage { } } -impl JetStreamable for DroneLogMessage { +impl JetStreamable for BackendLogMessage { fn stream_name() -> &'static str { "backend_logs" } diff --git a/core/src/messages/mod.rs b/core/src/messages/mod.rs index f93aa7fbb..4535fe0a3 100644 --- a/core/src/messages/mod.rs +++ b/core/src/messages/mod.rs @@ -1,4 +1,4 @@ -use crate::nats::JetStreamable; +use crate::{nats::JetStreamable, util::LogAndIgnoreError}; use anyhow::anyhow; pub mod agent; pub mod cert; @@ -29,10 +29,18 @@ async fn add_jetstream_stream( pub async fn initialize_jetstreams( jetstream: &async_nats::jetstream::Context, ) -> anyhow::Result<()> { - let _ = add_jetstream_stream::(jetstream).await; - let _ = add_jetstream_stream::(jetstream).await; - let _ = add_jetstream_stream::(jetstream).await; - let _ = add_jetstream_stream::(jetstream).await; + add_jetstream_stream::(jetstream) + .await + .log_and_ignore_error(); + add_jetstream_stream::(jetstream) + .await + .log_and_ignore_error(); + add_jetstream_stream::(jetstream) + .await + .log_and_ignore_error(); + add_jetstream_stream::(jetstream) + .await + .log_and_ignore_error(); Ok(()) } diff --git a/core/src/util.rs b/core/src/util.rs new file mode 100644 index 000000000..8ccc40066 --- /dev/null +++ b/core/src/util.rs @@ -0,0 +1,13 @@ +use std::fmt::Debug; + +pub trait LogAndIgnoreError { + fn log_and_ignore_error(self); +} + +impl LogAndIgnoreError for Result { + fn log_and_ignore_error(self) { + if let Err(error) = self { + tracing::error!(?error); + } + } +} diff --git a/dev/tests/agent.rs b/dev/tests/agent.rs index 800743828..aff0e9c58 100644 --- a/dev/tests/agent.rs +++ b/dev/tests/agent.rs @@ -4,8 +4,8 @@ use plane_controller::{drone_state::monitor_drone_state, run::update_backend_sta use plane_core::{ messages::{ agent::{ - BackendState, BackendStatsMessage, DroneLogMessage, DroneLogMessageKind, SpawnRequest, - TerminationRequest, + BackendLogMessage, BackendState, BackendStatsMessage, DroneLogMessageKind, + SpawnRequest, TerminationRequest, }, drone_state::{DroneStatusMessage, UpdateBackendStateMessage}, scheduler::DrainDrone, @@ -308,7 +308,7 @@ async fn invalid_container_fails() { let (ctx, mut sub) = do_spawn_request(&mut req).await; let log_subscription = ctx .nats_connection - .subscribe(DroneLogMessage::subscribe_subject(&req.backend_id)) + .subscribe(BackendLogMessage::subscribe_subject(&req.backend_id)) .await .unwrap() .timeout(Duration::from_secs(10)); @@ -354,7 +354,7 @@ async fn check_that_logs_work() { let (ctx, mut status_sub) = do_spawn_request(&mut req).await; let mut log_subscription = Box::pin( ctx.nats_connection - .subscribe(DroneLogMessage::subscribe_subject(&req.backend_id)) + .subscribe(BackendLogMessage::subscribe_subject(&req.backend_id)) .await .unwrap() .timeout(Duration::from_secs(10)), diff --git a/drone/src/agent/backend.rs b/drone/src/agent/backend.rs index 2a42dcb95..ffbca0079 100644 --- a/drone/src/agent/backend.rs +++ b/drone/src/agent/backend.rs @@ -2,7 +2,7 @@ use crate::agent::engine::Engine; use futures::Future; use plane_core::{ logging::LogError, - messages::agent::DroneLogMessage, + messages::agent::BackendLogMessage, messages::{ agent::DroneLogMessageKind, dns::{DnsRecordType, SetDnsRecord}, @@ -32,7 +32,7 @@ pub struct BackendMonitor { _stats_loop: AbortOnDrop<()>, _dns_loop: AbortOnDrop>, _backend_id: BackendId, - meta_log_tx: Sender, + meta_log_tx: Sender, } impl BackendMonitor { @@ -62,8 +62,8 @@ impl BackendMonitor { &mut self, text: String, kind: DroneLogMessageKind, - ) -> impl Future>> + '_ { - self.meta_log_tx.send(DroneLogMessage { + ) -> impl Future>> + '_ { + self.meta_log_tx.send(BackendLogMessage { backend_id: self._backend_id.clone(), kind, text, @@ -100,7 +100,7 @@ impl BackendMonitor { backend_id: &BackendId, engine: &E, nc: &TypedNats, - meta_log_rx: ReceiverStream, + meta_log_rx: ReceiverStream, ) -> JoinHandle<()> { let mut stream = engine.log_stream(backend_id).merge(meta_log_rx); let nc = nc.clone(); diff --git a/drone/src/agent/engine.rs b/drone/src/agent/engine.rs index dcfdb7d09..0c09e4e1f 100644 --- a/drone/src/agent/engine.rs +++ b/drone/src/agent/engine.rs @@ -2,7 +2,7 @@ use anyhow::Result; use async_trait::async_trait; use futures::Stream; use plane_core::{ - messages::agent::{BackendStatsMessage, DroneLogMessage, SpawnRequest}, + messages::agent::{BackendLogMessage, BackendStatsMessage, SpawnRequest}, types::{BackendId, ClusterName, DroneId}, }; use std::{net::SocketAddr, pin::Pin}; @@ -46,7 +46,7 @@ pub trait Engine: Send + Sync + 'static { fn log_stream( &self, backend: &BackendId, - ) -> Pin + Send>>; + ) -> Pin + Send>>; fn stats_stream( &self, diff --git a/drone/src/agent/engines/docker/mod.rs b/drone/src/agent/engines/docker/mod.rs index 18b6c92ee..f5a29df6b 100644 --- a/drone/src/agent/engines/docker/mod.rs +++ b/drone/src/agent/engines/docker/mod.rs @@ -25,7 +25,7 @@ use bollard::{ }; use plane_core::{ messages::agent::{ - BackendStatsMessage, DockerExecutableConfig, DockerPullPolicy, DroneLogMessage, + BackendLogMessage, BackendStatsMessage, DockerExecutableConfig, DockerPullPolicy, SpawnRequest, }, timing::Timer, @@ -403,13 +403,13 @@ impl Engine for DockerInterface { fn log_stream( &self, backend: &BackendId, - ) -> Pin + Send>> { + ) -> Pin + Send>> { let stream = self.get_logs(&backend.to_resource_name()); let backend = backend.clone(); let stream = stream.filter_map(move |v| { v.ok() .as_ref() - .and_then(|d| DroneLogMessage::from_log_message(&backend, d)) + .and_then(|d| BackendLogMessage::from_log_message(&backend, d)) }); Box::pin(stream) }