Skip to content

Commit

Permalink
feat!(gadget-sdk): add an Error type for executor module (#420)
Browse files Browse the repository at this point in the history
* feat!(gadget-sdk): add an Error type for executor module

We really shouldn't be using `Box<dyn Error>` anywhere in the SDK.

* chore: clippy
  • Loading branch information
Serial-ATA authored Oct 29, 2024
1 parent 81e1f0e commit 881e643
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 77 deletions.
6 changes: 6 additions & 0 deletions sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub enum Error {
#[error("Job runner error: {0}")]
Runner(#[from] crate::runners::RunnerError),

#[error("Executor error: {0}")]
Executor(#[from] crate::executor::process::Error),

#[error("Docker error: {0}")]
Docker(#[from] bollard::errors::Error),

Expand All @@ -44,6 +47,9 @@ pub enum Error {
#[cfg(any(feature = "std", feature = "wasm"))]
Subxt(#[from] subxt::Error),

#[error("{0}")]
Json(#[from] serde_json::Error),

#[cfg(feature = "std")]
#[error("Events watcher error: {0}")]
EventsWatcher(#[from] crate::events_watcher::error::Error),
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn run_executor(instructions: &str) {
let mut ended = Vec::new();
for (service, process) in &mut manager.children {
println!("LOG : Process {}", service.clone());
let status = process.status().unwrap();
let status = process.status();
println!("\tSTATUS: {:?}", status);
let output = process.read_until_default_timeout().await;
println!("\n{} read result:\n\t {:?}\n", service, output);
Expand Down
15 changes: 15 additions & 0 deletions sdk/src/executor/process/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use thiserror::Error;

#[derive(Debug, Error)]
pub enum Error {
#[error("A process exited unexpectedly")]
UnexpectedExit,
#[error("Process {0} doesn't exist")]
ProcessNotFound(sysinfo::Pid),
#[error("Failed to focus on {0}, it does not exist")]
ServiceNotFound(String),
#[error("Expected {0} and found {1} running instead - process termination aborted")]
ProcessMismatch(String, String),
#[error("Failed to kill process, errno: {0}")]
KillFailed(nix::errno::Errno),
}
70 changes: 26 additions & 44 deletions sdk/src/executor/process/manager.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::error::Error;
use crate::executor::process::types::{GadgetProcess, ProcessOutput, Status};
use crate::executor::process::utils::*;
use crate::executor::OS_COMMAND;
use crate::{craft_child_process, run_command};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::error::Error;
use sysinfo::{Pid, System};
use sysinfo::System;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;

Expand All @@ -26,9 +26,11 @@ impl GadgetProcessManager {

/// Load the state of previously running processes to recover gadget-executor
#[allow(dead_code)]
pub(crate) async fn new_from_saved(file: &str) -> Result<GadgetProcessManager, Box<dyn Error>> {
let file = std::fs::File::open(file).unwrap();
let mut new_manager: GadgetProcessManager = serde_json::from_reader(file).unwrap();
pub(crate) async fn new_from_saved(
file: &str,
) -> Result<GadgetProcessManager, crate::error::Error> {
let file = std::fs::File::open(file)?;
let mut new_manager: GadgetProcessManager = serde_json::from_reader(file)?;

// Restarts processes that were previously running
new_manager.restart_dead().await?;
Expand All @@ -38,7 +40,7 @@ impl GadgetProcessManager {

/// Store the state of the current processes
#[allow(dead_code)]
pub(crate) async fn save_state(&self) -> Result<String, Box<dyn Error>> {
pub(crate) async fn save_state(&self) -> Result<String, crate::error::Error> {
let serialized_data = serde_json::to_string(self)?;
let mut file = File::create("./savestate.json").await?;
file.write_all(serialized_data.clone().as_bytes()).await?;
Expand All @@ -47,26 +49,19 @@ impl GadgetProcessManager {

/// Runs the given command and stores it using the identifier as the key. Returns the identifier used
#[allow(unused_results)]
pub async fn run(
&mut self,
identifier: String,
command: &str,
) -> Result<String, Box<dyn Error>> {
pub async fn run(&mut self, identifier: String, command: &str) -> Result<String, Error> {
let gadget_process = run_command!(command)?;
self.children.insert(identifier.clone(), gadget_process);
Ok(identifier)
}

/// Focuses on the given service until its stream is exhausted, meaning that the process ran to completion. Returns a
/// ProcessOutput with its output (if there is any).
pub async fn focus_service_to_completion(
&mut self,
service: String,
) -> Result<String, Box<dyn Error>> {
pub async fn focus_service_to_completion(&mut self, service: String) -> Result<String, Error> {
let process = self
.children
.get_mut(&service)
.ok_or(format!("Failed to focus on {service}, it does not exist"))?;
.ok_or(Error::ServiceNotFound(service))?;
let mut output_stream = String::new();
loop {
match process.read_until_default_timeout().await {
Expand Down Expand Up @@ -95,25 +90,25 @@ impl GadgetProcessManager {
&mut self,
service: String,
specified_output: String,
) -> Result<ProcessOutput, Box<dyn Error>> {
) -> Result<ProcessOutput, Error> {
let process = self
.children
.get_mut(&service)
.ok_or(format!("Failed to focus on {service}, it does not exist"))?;
.ok_or(Error::ServiceNotFound(service))?;
Ok(process.read_until_receiving_string(specified_output).await)
}

/// Removes processes that are no longer running from the manager. Returns a Vector of the names of processes removed
#[allow(dead_code)]
pub(crate) async fn remove_dead(&mut self) -> Result<Vec<String>, Box<dyn Error>> {
pub(crate) async fn remove_dead(&mut self) -> Result<Vec<String>, Error> {
let dead_processes = Vec::new();
let mut to_remove = Vec::new();
let s = System::new_all();

// Find dead processes and gather them for return & removal
for (key, value) in self.children.iter() {
let current_pid = value.pid;
if let Some(process) = s.process(Pid::from_u32(current_pid)) {
if let Some(process) = s.process(current_pid) {
if process.name() == value.process_name {
// Still running
continue;
Expand All @@ -133,36 +128,23 @@ impl GadgetProcessManager {
/// Finds all dead processes that still exist in map and starts them again. This function
/// is used to restart all processes after loading a Manager from a file.
#[allow(unused_results)]
pub(crate) async fn restart_dead(&mut self) -> Result<(), Box<dyn Error>> {
pub(crate) async fn restart_dead(&mut self) -> Result<(), Error> {
let mut restarted_processes = Vec::new();
let mut to_remove = Vec::new();
// Find dead processes and restart them
for (key, value) in self.children.iter_mut() {
match value.status() {
Ok(status) => {
match status {
Status::Active | Status::Sleeping => {
// TODO: Metrics + Logs for these living processes
// Check if this process is still running what is expected
// If it is still correctly running, we just move along
continue;
}
Status::Inactive | Status::Dead => {
// Dead, should be restarted
}
Status::Unknown(code) => {
println!("LOG : {} yielded {}", key.clone(), code);
}
}
Status::Active | Status::Sleeping => {
// TODO: Metrics + Logs for these living processes
// Check if this process is still running what is expected
// If it is still correctly running, we just move along
continue;
}
Status::Inactive | Status::Dead => {
// Dead, should be restarted
}
Err(err) => {
// TODO: Log error
// Error generally means it died and no longer exists - restart it
println!(
"LOG : {} yielded {} while attempting to restart dead processes",
key.clone(),
err
);
Status::Unknown(code) => {
println!("LOG : {} yielded {}", key.clone(), code);
}
}
restarted_processes.push((key.clone(), value.restart_process().await?));
Expand Down
2 changes: 2 additions & 0 deletions sdk/src/executor/process/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod error;
pub mod manager;
pub(crate) mod types;
pub(crate) mod utils;
pub use error::Error;
77 changes: 47 additions & 30 deletions sdk/src/executor/process/types.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use super::error::Error;
use crate::executor::process::utils::*;
use crate::executor::OS_COMMAND;
use crate::{craft_child_process, run_command};
use failure::format_err;
use nix::libc::pid_t;
use nix::sys::signal;
use nix::sys::signal::Signal;
use serde::{Deserialize, Serialize};
use std::error::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::ffi::OsString;
use std::time::Duration;
use sysinfo::ProcessStatus::{
Expand All @@ -27,26 +26,42 @@ pub struct GadgetProcess {
/// The name of the process itself
pub process_name: OsString,
/// Process ID
pub pid: u32,
#[serde(serialize_with = "serialize_pid", deserialize_with = "deserialize_pid")]
pub pid: Pid,
/// History of output from process for reviewing/tracking progress
pub output: Vec<String>,
/// Stream for output from child process
#[serde(skip_serializing, skip_deserializing)]
pub stream: Option<Lines<BufReader<tokio::process::ChildStdout>>>,
}

fn serialize_pid<S>(pid: &Pid, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_u32(pid.as_u32())
}

fn deserialize_pid<'de, D>(deserializer: D) -> Result<Pid, D::Error>
where
D: Deserializer<'de>,
{
let value = u32::deserialize(deserializer)?;
Ok(Pid::from_u32(value))
}

impl GadgetProcess {
pub fn new(
command: String,
pid: Option<u32>,
pid: u32,
output: Vec<String>,
stream: Lines<BufReader<tokio::process::ChildStdout>>,
) -> Result<GadgetProcess, Box<dyn Error>> {
) -> Result<GadgetProcess, Error> {
let s = System::new_all();
let pid = pid.ok_or("No PID found")?;
let pid = Pid::from_u32(pid);
let process_name = s
.process(Pid::from_u32(pid))
.ok_or(format!("Process {pid} doesn't exist"))?
.process(pid)
.ok_or(Error::ProcessNotFound(pid))?
.name()
.to_os_string();
Ok(GadgetProcess {
Expand Down Expand Up @@ -194,10 +209,10 @@ impl GadgetProcess {
}

/// Restart a GadgetProcess, killing the previously running process if it exists. Returns the new GadgetProcess
pub(crate) async fn restart_process(&mut self) -> Result<GadgetProcess, Box<dyn Error>> {
pub(crate) async fn restart_process(&mut self) -> Result<GadgetProcess, Error> {
// Kill current process running this command
let s = System::new_all();
match s.process(Pid::from_u32(self.pid)) {
match s.process(self.pid) {
Some(process) => {
if process.name() == self.process_name {
self.kill()?;
Expand All @@ -216,43 +231,45 @@ impl GadgetProcess {
}

/// Checks the status of this GadgetProcess
pub(crate) fn status(&self) -> Result<Status, Box<dyn Error>> {
pub(crate) fn status(&self) -> Status {
let s = System::new_all();
match s.process(Pid::from_u32(self.pid)) {
Some(process) => Ok(Status::from(process.status())),
match s.process(self.pid) {
Some(process) => Status::from(process.status()),
None => {
// If it isn't found, then the process died
Ok(Status::Dead)
Status::Dead
}
}
}

/// Gets process name by PID
#[allow(dead_code)]
pub(crate) fn get_name(&self) -> Result<OsString, Box<dyn Error>> {
pub(crate) fn get_name(&self) -> Result<OsString, Error> {
let s = System::new_all();
let name = s
.process(Pid::from_u32(self.pid))
.ok_or(format!("Process {} doesn't exist", self.pid))?
.process(self.pid)
.ok_or(Error::ProcessNotFound(self.pid))?
.name();
Ok(name.into())
}

/// Terminates the process depicted by this GadgetProcess - will fail if the PID is now being reused
pub(crate) fn kill(&self) -> Result<(), Box<dyn Error>> {
pub(crate) fn kill(&self) -> Result<(), Error> {
let running_process = self.get_name()?;
if running_process == self.process_name {
Ok(signal::kill(
nix::unistd::Pid::from_raw(self.pid as pid_t),
Signal::SIGTERM,
)?)
} else {
Err(Box::from(format_err!(
"Expected {} and found {} running instead - process termination aborted",
self.process_name.to_string_lossy(),
running_process.to_string_lossy()
)))
if running_process != self.process_name {
return Err(Error::ProcessMismatch(
self.process_name.to_string_lossy().into_owned(),
running_process.to_string_lossy().into_owned(),
));
}

signal::kill(
nix::unistd::Pid::from_raw(self.pid.as_u32() as pid_t),
Signal::SIGTERM,
)
.map_err(Error::KillFailed)?;

Ok(())
}
}

Expand Down
4 changes: 2 additions & 2 deletions sdk/src/executor/process/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ macro_rules! run_command {
($cmd:expr) => {{
// Spawn child running process
let child: Child = craft_child_process!($cmd);
let pid = child.id().clone();
let pid = child.id().ok_or(Error::UnexpectedExit)?;
let stream = create_stream(child);
GadgetProcess::new(
$cmd.to_string(),
Expand All @@ -70,7 +70,7 @@ macro_rules! run_command {
($cmd:expr, $($args:expr),*) => {{
// Spawn child running process
let child = craft_child_process!($cmd,$($args),*);
let pid = child.id().clone();
let pid = child.id().ok_or(Error::UnexpectedExit)?;
let stream = create_stream(child);
GadgetProcess::new(
$cmd.to_string(),
Expand Down

0 comments on commit 881e643

Please sign in to comment.