diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 180188a..b8c70ec 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -82,8 +82,6 @@ impl Agent for RustAgent { child_processes: &'a Arc>>, ) -> Pin> + Send + '_>> { Box::pin(async { - let code = std::fs::read_to_string(&self.rust_config.build.source_code_path).unwrap(); - let function_dir = format!( "/tmp/{}", Alphanumeric.sample_string(&mut rand::thread_rng(), 16) @@ -93,60 +91,61 @@ impl Agent for RustAgent { create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory"); - std::fs::write( - format!("{}/src/main.rs", &function_dir), - &self.workload_config.code, - ) - .expect("Unable to write main.rs file"); + std::fs::write( + format!("{}/src/main.rs", &function_dir), + &self.workload_config.code, + ) + .expect("Unable to write main.rs file"); - let cargo_toml = format!( - r#" + let cargo_toml = format!( + r#" [package] name = "{}" version = "0.1.0" edition = "2018" "#, - self.workload_config.workload_name - ); + self.workload_config.workload_name + ); - std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) - .expect("Unable to write Cargo.toml file"); + std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) + .expect("Unable to write Cargo.toml file"); let result = self.build(&function_dir, child_processes).await?; - if result.exit_code != 0 { - println!("Build failed: {:?}", result); - return Err(AgentError::BuildFailed(AgentOutput { - exit_code: result.exit_code, - stdout: result.stdout, - stderr: result.stderr, - })); - } + if result.exit_code != 0 { + println!("Build failed: {:?}", result); + return Err(AgentError::BuildFailed(AgentOutput { + exit_code: result.exit_code, + stdout: result.stdout, + stderr: result.stderr, + })); + } + + // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) + let binary_path = match self.rust_config.build.release { + true => format!( + "{}/target/release/{}", + &function_dir, self.workload_config.workload_name + ), + false => format!( + "{}/target/debug/{}", + &function_dir, self.workload_config.workload_name + ), + }; + + std::fs::copy( + binary_path, + format!("/tmp/{}", self.workload_config.workload_name), + ) + .expect("Unable to copy binary"); + + std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); - // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) - let binary_path = match self.rust_config.build.release { - true => format!( - "{}/target/release/{}", - &function_dir, self.workload_config.workload_name - ), - false => format!( - "{}/target/debug/{}", - &function_dir, self.workload_config.workload_name - ), - }; - - std::fs::copy( - binary_path, - format!("/tmp/{}", self.workload_config.workload_name), - ) - .expect("Unable to copy binary"); - - std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); - - Ok(AgentOutput { - exit_code: result.exit_code, - stdout: "Build successful".to_string(), - stderr: "".to_string(), + Ok(AgentOutput { + exit_code: result.exit_code, + stdout: "Build successful".to_string(), + stderr: "".to_string(), + }) }) } @@ -159,17 +158,22 @@ impl Agent for RustAgent { .spawn() .expect("Failed to run function"); - let agent_output = AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }; + child_processes.lock().await.insert(child.id()); + + let output = child.wait_with_output().expect("Failed to wait on child"); - if !output.status.success() { - println!("Run failed: {:?}", agent_output); - return Err(AgentError::BuildFailed(agent_output)); - } + let agent_output = AgentOutput { + exit_code: output.status.code().unwrap(), + stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), + stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), + }; - Ok(agent_output) + if !output.status.success() { + println!("Run failed: {:?}", agent_output); + return Err(AgentError::BuildFailed(agent_output)); + } + + Ok(agent_output) + }) } } diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index 780c36a..93437f2 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -2,16 +2,9 @@ use agent::{ agent::workload_runner_server::WorkloadRunnerServer, workload::service::WorkloadRunnerService, }; use clap::Parser; -use once_cell::sync::Lazy; -use std::collections::HashSet; -use std::sync::Arc; -use std::{net::ToSocketAddrs, path::PathBuf}; -use tokio::sync::Mutex; +use std::net::ToSocketAddrs; use tonic::transport::Server; -static CHILD_PROCESSES: Lazy>>> = - Lazy::new(|| Arc::new(Mutex::new(HashSet::new()))); - #[derive(Debug, Parser)] struct Args { #[clap(long, env, default_value = "0.0.0.0")] diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index b6f461c..7e6ce94 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -36,9 +36,12 @@ impl Runner { } } - pub fn new_from_execute_request(execute_request: ExecuteRequest) -> Result { + pub fn new_from_execute_request( + execute_request: ExecuteRequest, + child_processes: Arc>>, + ) -> Result { let config = Config::new_from_execute_request(execute_request)?; - Ok(Self::new(config)) + Ok(Self::new(config, child_processes)) } pub async fn run(&self) -> AgentResult { diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index 38e6e66..d28d991 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -1,6 +1,7 @@ use super::runner::Runner; use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest}; use agent::workload_runner_server::WorkloadRunner; +use once_cell::sync::Lazy; use std::collections::HashSet; use std::{process, sync::Arc}; use tokio::sync::Mutex; @@ -9,6 +10,9 @@ use tonic::{Request, Response}; type Result = std::result::Result, tonic::Status>; +static CHILD_PROCESSES: Lazy>>> = + Lazy::new(|| Arc::new(Mutex::new(HashSet::new()))); + pub struct WorkloadRunnerService; #[tonic::async_trait] @@ -20,7 +24,7 @@ impl WorkloadRunner for WorkloadRunnerService { let execute_request = req.into_inner(); - let runner = Runner::new_from_execute_request(execute_request) + let runner = Runner::new_from_execute_request(execute_request, CHILD_PROCESSES.clone()) .map_err(|e| tonic::Status::internal(e.to_string()))?; let res = runner @@ -45,7 +49,7 @@ impl WorkloadRunner for WorkloadRunnerService { } async fn signal(&self, _: Request) -> Result<()> { - let child_processes = self.child_processes.lock().await; + let child_processes = CHILD_PROCESSES.lock().await; for &child_id in child_processes.iter() { nix::sys::signal::kill(