From 881e6437067e9cb0ea937597c09adc5a39757cfd Mon Sep 17 00:00:00 2001 From: Alex <69764315+Serial-ATA@users.noreply.github.com> Date: Tue, 29 Oct 2024 13:46:37 -0400 Subject: [PATCH] feat!(gadget-sdk): add an Error type for executor module (#420) * feat!(gadget-sdk): add an Error type for executor module We really shouldn't be using `Box` anywhere in the SDK. * chore: clippy --- sdk/src/error.rs | 6 +++ sdk/src/executor/mod.rs | 2 +- sdk/src/executor/process/error.rs | 15 ++++++ sdk/src/executor/process/manager.rs | 70 ++++++++++---------------- sdk/src/executor/process/mod.rs | 2 + sdk/src/executor/process/types.rs | 77 ++++++++++++++++++----------- sdk/src/executor/process/utils.rs | 4 +- 7 files changed, 99 insertions(+), 77 deletions(-) create mode 100644 sdk/src/executor/process/error.rs diff --git a/sdk/src/error.rs b/sdk/src/error.rs index 867f6eac..a2143961 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -26,6 +26,9 @@ pub enum Error { #[error("Job runner error: {0}")] Runner(#[from] crate::runners::RunnerError), + #[error("Executor error: {0}")] + Executor(#[from] crate::executor::process::Error), + #[error("Docker error: {0}")] Docker(#[from] bollard::errors::Error), @@ -44,6 +47,9 @@ pub enum Error { #[cfg(any(feature = "std", feature = "wasm"))] Subxt(#[from] subxt::Error), + #[error("{0}")] + Json(#[from] serde_json::Error), + #[cfg(feature = "std")] #[error("Events watcher error: {0}")] EventsWatcher(#[from] crate::events_watcher::error::Error), diff --git a/sdk/src/executor/mod.rs b/sdk/src/executor/mod.rs index e7c86d32..47acdb9e 100644 --- a/sdk/src/executor/mod.rs +++ b/sdk/src/executor/mod.rs @@ -42,7 +42,7 @@ pub async fn run_executor(instructions: &str) { let mut ended = Vec::new(); for (service, process) in &mut manager.children { println!("LOG : Process {}", service.clone()); - let status = process.status().unwrap(); + let status = process.status(); println!("\tSTATUS: {:?}", status); let output = process.read_until_default_timeout().await; println!("\n{} read result:\n\t {:?}\n", service, output); diff --git a/sdk/src/executor/process/error.rs b/sdk/src/executor/process/error.rs new file mode 100644 index 00000000..d1c6cb04 --- /dev/null +++ b/sdk/src/executor/process/error.rs @@ -0,0 +1,15 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("A process exited unexpectedly")] + UnexpectedExit, + #[error("Process {0} doesn't exist")] + ProcessNotFound(sysinfo::Pid), + #[error("Failed to focus on {0}, it does not exist")] + ServiceNotFound(String), + #[error("Expected {0} and found {1} running instead - process termination aborted")] + ProcessMismatch(String, String), + #[error("Failed to kill process, errno: {0}")] + KillFailed(nix::errno::Errno), +} diff --git a/sdk/src/executor/process/manager.rs b/sdk/src/executor/process/manager.rs index e81b3004..0fbbc08f 100644 --- a/sdk/src/executor/process/manager.rs +++ b/sdk/src/executor/process/manager.rs @@ -1,11 +1,11 @@ +use super::error::Error; use crate::executor::process::types::{GadgetProcess, ProcessOutput, Status}; use crate::executor::process::utils::*; use crate::executor::OS_COMMAND; use crate::{craft_child_process, run_command}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::error::Error; -use sysinfo::{Pid, System}; +use sysinfo::System; use tokio::fs::File; use tokio::io::AsyncWriteExt; @@ -26,9 +26,11 @@ impl GadgetProcessManager { /// Load the state of previously running processes to recover gadget-executor #[allow(dead_code)] - pub(crate) async fn new_from_saved(file: &str) -> Result> { - let file = std::fs::File::open(file).unwrap(); - let mut new_manager: GadgetProcessManager = serde_json::from_reader(file).unwrap(); + pub(crate) async fn new_from_saved( + file: &str, + ) -> Result { + let file = std::fs::File::open(file)?; + let mut new_manager: GadgetProcessManager = serde_json::from_reader(file)?; // Restarts processes that were previously running new_manager.restart_dead().await?; @@ -38,7 +40,7 @@ impl GadgetProcessManager { /// Store the state of the current processes #[allow(dead_code)] - pub(crate) async fn save_state(&self) -> Result> { + pub(crate) async fn save_state(&self) -> Result { let serialized_data = serde_json::to_string(self)?; let mut file = File::create("./savestate.json").await?; file.write_all(serialized_data.clone().as_bytes()).await?; @@ -47,11 +49,7 @@ impl GadgetProcessManager { /// Runs the given command and stores it using the identifier as the key. Returns the identifier used #[allow(unused_results)] - pub async fn run( - &mut self, - identifier: String, - command: &str, - ) -> Result> { + pub async fn run(&mut self, identifier: String, command: &str) -> Result { let gadget_process = run_command!(command)?; self.children.insert(identifier.clone(), gadget_process); Ok(identifier) @@ -59,14 +57,11 @@ impl GadgetProcessManager { /// Focuses on the given service until its stream is exhausted, meaning that the process ran to completion. Returns a /// ProcessOutput with its output (if there is any). - pub async fn focus_service_to_completion( - &mut self, - service: String, - ) -> Result> { + pub async fn focus_service_to_completion(&mut self, service: String) -> Result { let process = self .children .get_mut(&service) - .ok_or(format!("Failed to focus on {service}, it does not exist"))?; + .ok_or(Error::ServiceNotFound(service))?; let mut output_stream = String::new(); loop { match process.read_until_default_timeout().await { @@ -95,17 +90,17 @@ impl GadgetProcessManager { &mut self, service: String, specified_output: String, - ) -> Result> { + ) -> Result { let process = self .children .get_mut(&service) - .ok_or(format!("Failed to focus on {service}, it does not exist"))?; + .ok_or(Error::ServiceNotFound(service))?; Ok(process.read_until_receiving_string(specified_output).await) } /// Removes processes that are no longer running from the manager. Returns a Vector of the names of processes removed #[allow(dead_code)] - pub(crate) async fn remove_dead(&mut self) -> Result, Box> { + pub(crate) async fn remove_dead(&mut self) -> Result, Error> { let dead_processes = Vec::new(); let mut to_remove = Vec::new(); let s = System::new_all(); @@ -113,7 +108,7 @@ impl GadgetProcessManager { // Find dead processes and gather them for return & removal for (key, value) in self.children.iter() { let current_pid = value.pid; - if let Some(process) = s.process(Pid::from_u32(current_pid)) { + if let Some(process) = s.process(current_pid) { if process.name() == value.process_name { // Still running continue; @@ -133,36 +128,23 @@ impl GadgetProcessManager { /// Finds all dead processes that still exist in map and starts them again. This function /// is used to restart all processes after loading a Manager from a file. #[allow(unused_results)] - pub(crate) async fn restart_dead(&mut self) -> Result<(), Box> { + pub(crate) async fn restart_dead(&mut self) -> Result<(), Error> { let mut restarted_processes = Vec::new(); let mut to_remove = Vec::new(); // Find dead processes and restart them for (key, value) in self.children.iter_mut() { match value.status() { - Ok(status) => { - match status { - Status::Active | Status::Sleeping => { - // TODO: Metrics + Logs for these living processes - // Check if this process is still running what is expected - // If it is still correctly running, we just move along - continue; - } - Status::Inactive | Status::Dead => { - // Dead, should be restarted - } - Status::Unknown(code) => { - println!("LOG : {} yielded {}", key.clone(), code); - } - } + Status::Active | Status::Sleeping => { + // TODO: Metrics + Logs for these living processes + // Check if this process is still running what is expected + // If it is still correctly running, we just move along + continue; + } + Status::Inactive | Status::Dead => { + // Dead, should be restarted } - Err(err) => { - // TODO: Log error - // Error generally means it died and no longer exists - restart it - println!( - "LOG : {} yielded {} while attempting to restart dead processes", - key.clone(), - err - ); + Status::Unknown(code) => { + println!("LOG : {} yielded {}", key.clone(), code); } } restarted_processes.push((key.clone(), value.restart_process().await?)); diff --git a/sdk/src/executor/process/mod.rs b/sdk/src/executor/process/mod.rs index 33361c4b..faddf6f6 100644 --- a/sdk/src/executor/process/mod.rs +++ b/sdk/src/executor/process/mod.rs @@ -1,3 +1,5 @@ +pub mod error; pub mod manager; pub(crate) mod types; pub(crate) mod utils; +pub use error::Error; diff --git a/sdk/src/executor/process/types.rs b/sdk/src/executor/process/types.rs index 753e84bf..c95867df 100644 --- a/sdk/src/executor/process/types.rs +++ b/sdk/src/executor/process/types.rs @@ -1,12 +1,11 @@ +use super::error::Error; use crate::executor::process::utils::*; use crate::executor::OS_COMMAND; use crate::{craft_child_process, run_command}; -use failure::format_err; use nix::libc::pid_t; use nix::sys::signal; use nix::sys::signal::Signal; -use serde::{Deserialize, Serialize}; -use std::error::Error; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::ffi::OsString; use std::time::Duration; use sysinfo::ProcessStatus::{ @@ -27,7 +26,8 @@ pub struct GadgetProcess { /// The name of the process itself pub process_name: OsString, /// Process ID - pub pid: u32, + #[serde(serialize_with = "serialize_pid", deserialize_with = "deserialize_pid")] + pub pid: Pid, /// History of output from process for reviewing/tracking progress pub output: Vec, /// Stream for output from child process @@ -35,18 +35,33 @@ pub struct GadgetProcess { pub stream: Option>>, } +fn serialize_pid(pid: &Pid, serializer: S) -> Result +where + S: Serializer, +{ + serializer.serialize_u32(pid.as_u32()) +} + +fn deserialize_pid<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let value = u32::deserialize(deserializer)?; + Ok(Pid::from_u32(value)) +} + impl GadgetProcess { pub fn new( command: String, - pid: Option, + pid: u32, output: Vec, stream: Lines>, - ) -> Result> { + ) -> Result { let s = System::new_all(); - let pid = pid.ok_or("No PID found")?; + let pid = Pid::from_u32(pid); let process_name = s - .process(Pid::from_u32(pid)) - .ok_or(format!("Process {pid} doesn't exist"))? + .process(pid) + .ok_or(Error::ProcessNotFound(pid))? .name() .to_os_string(); Ok(GadgetProcess { @@ -194,10 +209,10 @@ impl GadgetProcess { } /// Restart a GadgetProcess, killing the previously running process if it exists. Returns the new GadgetProcess - pub(crate) async fn restart_process(&mut self) -> Result> { + pub(crate) async fn restart_process(&mut self) -> Result { // Kill current process running this command let s = System::new_all(); - match s.process(Pid::from_u32(self.pid)) { + match s.process(self.pid) { Some(process) => { if process.name() == self.process_name { self.kill()?; @@ -216,43 +231,45 @@ impl GadgetProcess { } /// Checks the status of this GadgetProcess - pub(crate) fn status(&self) -> Result> { + pub(crate) fn status(&self) -> Status { let s = System::new_all(); - match s.process(Pid::from_u32(self.pid)) { - Some(process) => Ok(Status::from(process.status())), + match s.process(self.pid) { + Some(process) => Status::from(process.status()), None => { // If it isn't found, then the process died - Ok(Status::Dead) + Status::Dead } } } /// Gets process name by PID #[allow(dead_code)] - pub(crate) fn get_name(&self) -> Result> { + pub(crate) fn get_name(&self) -> Result { let s = System::new_all(); let name = s - .process(Pid::from_u32(self.pid)) - .ok_or(format!("Process {} doesn't exist", self.pid))? + .process(self.pid) + .ok_or(Error::ProcessNotFound(self.pid))? .name(); Ok(name.into()) } /// Terminates the process depicted by this GadgetProcess - will fail if the PID is now being reused - pub(crate) fn kill(&self) -> Result<(), Box> { + pub(crate) fn kill(&self) -> Result<(), Error> { let running_process = self.get_name()?; - if running_process == self.process_name { - Ok(signal::kill( - nix::unistd::Pid::from_raw(self.pid as pid_t), - Signal::SIGTERM, - )?) - } else { - Err(Box::from(format_err!( - "Expected {} and found {} running instead - process termination aborted", - self.process_name.to_string_lossy(), - running_process.to_string_lossy() - ))) + if running_process != self.process_name { + return Err(Error::ProcessMismatch( + self.process_name.to_string_lossy().into_owned(), + running_process.to_string_lossy().into_owned(), + )); } + + signal::kill( + nix::unistd::Pid::from_raw(self.pid.as_u32() as pid_t), + Signal::SIGTERM, + ) + .map_err(Error::KillFailed)?; + + Ok(()) } } diff --git a/sdk/src/executor/process/utils.rs b/sdk/src/executor/process/utils.rs index eff3c358..ce8d9e90 100644 --- a/sdk/src/executor/process/utils.rs +++ b/sdk/src/executor/process/utils.rs @@ -58,7 +58,7 @@ macro_rules! run_command { ($cmd:expr) => {{ // Spawn child running process let child: Child = craft_child_process!($cmd); - let pid = child.id().clone(); + let pid = child.id().ok_or(Error::UnexpectedExit)?; let stream = create_stream(child); GadgetProcess::new( $cmd.to_string(), @@ -70,7 +70,7 @@ macro_rules! run_command { ($cmd:expr, $($args:expr),*) => {{ // Spawn child running process let child = craft_child_process!($cmd,$($args),*); - let pid = child.id().clone(); + let pid = child.id().ok_or(Error::UnexpectedExit)?; let stream = create_stream(child); GadgetProcess::new( $cmd.to_string(),