diff --git a/proto/agent.proto b/proto/agent.proto index 84b3798..42c98c8 100644 --- a/proto/agent.proto +++ b/proto/agent.proto @@ -26,10 +26,10 @@ message ExecuteResponse { DEBUG = 5; } - string stdout = 1; - string stderr = 2; - Stage stage = 3; - int32 exit_code = 4; + Stage stage = 1; + optional string stdout = 2; + optional string stderr = 3; + optional int32 exit_code = 4; } message SignalRequest { diff --git a/proto/vmm.proto b/proto/vmm.proto index 8244446..34ec802 100644 --- a/proto/vmm.proto +++ b/proto/vmm.proto @@ -26,9 +26,10 @@ message ExecuteResponse { DEBUG = 5; } - string stdout = 1; - string stderr = 2; - int32 exit_code = 3; + Stage stage = 1; + optional string stdout = 2; + optional string stderr = 3; + optional int32 exit_code = 4; } service VmmService { @@ -41,7 +42,6 @@ message RunVmmRequest { Language language = 2; string code = 3; LogLevel log_level = 4; - } message RunVmmResponse { diff --git a/src/agent/src/agents/debug.rs b/src/agent/src/agents/debug.rs index dbbf241..ac03121 100644 --- a/src/agent/src/agents/debug.rs +++ b/src/agent/src/agents/debug.rs @@ -1,4 +1,5 @@ use super::AgentOutput; +use crate::agent::execute_response::Stage; use crate::agents::Agent; use crate::{workload, AgentResult}; use async_trait::async_trait; @@ -6,6 +7,7 @@ use std::collections::HashSet; use std::fs::create_dir_all; use std::sync::Arc; use std::time::SystemTime; +use tokio::sync::mpsc::{self, Receiver}; use tokio::sync::Mutex; pub struct DebugAgent { @@ -20,7 +22,7 @@ impl From for DebugAgent { #[async_trait] impl Agent for DebugAgent { - async fn prepare(&self, _: Arc>>) -> AgentResult { + async fn prepare(&self, _: Arc>>) -> AgentResult> { let dir = format!("/tmp/{}", self.workload_config.workload_name); println!("Function directory: {}", dir); @@ -37,25 +39,51 @@ impl Agent for DebugAgent { ) .expect("Unable to write debug.txt file"); - Ok(AgentOutput { - exit_code: 0, - stdout: "Build successfully!".into(), - stderr: String::default(), - }) + let (tx, rx) = mpsc::channel(1); + tokio::spawn(async move { + let _ = tx + .send(AgentOutput { + stage: Stage::Building, + stdout: Some("Build successfully!".into()), + stderr: None, + exit_code: None, + }) + .await; + }); + + Ok(rx) } - async fn run(&self, _: Arc>>) -> AgentResult { + async fn run(&self, _: Arc>>) -> AgentResult> { let dir = format!("/tmp/{}", self.workload_config.workload_name); - let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) - .expect("Unable to read debug.txt file"); + let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)); std::fs::remove_dir_all(dir).expect("Unable to remove directory"); - Ok(AgentOutput { - exit_code: 0, - stdout: content, - stderr: String::default(), - }) + let (tx, rx) = mpsc::channel(1); + tokio::spawn(async move { + if let Ok(content) = content { + let _ = tx + .send(AgentOutput { + stage: Stage::Done, + stdout: Some(content), + stderr: None, + exit_code: Some(0), + }) + .await; + } + + let _ = tx + .send(AgentOutput { + stage: Stage::Failed, + stdout: None, + stderr: Some("unable to read debug.txt".into()), + exit_code: Some(1), + }) + .await; + }); + + Ok(rx) } } diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index 9d4032c..ff5212a 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -1,9 +1,12 @@ -use crate::{AgentError, AgentResult}; +use crate::{ + agent::{execute_response::Stage, ExecuteResponse}, + AgentError, AgentResult, +}; use async_trait::async_trait; use serde::Deserialize; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; #[cfg(feature = "debug-agent")] pub mod debug; @@ -11,15 +14,33 @@ pub mod rust; #[derive(Debug, Clone)] pub struct AgentOutput { - pub exit_code: i32, - pub stdout: String, - pub stderr: String, + pub stage: Stage, + pub stdout: Option, + pub stderr: Option, + pub exit_code: Option, +} + +impl From for ExecuteResponse { + fn from(value: AgentOutput) -> Self { + Self { + stage: value.stage as i32, + stdout: value.stdout, + stderr: value.stderr, + exit_code: value.exit_code, + } + } } #[async_trait] pub trait Agent { - async fn prepare(&self, child_processes: Arc>>) -> AgentResult; - async fn run(&self, child_processes: Arc>>) -> AgentResult; + async fn prepare( + &self, + child_processes: Arc>>, + ) -> AgentResult>; + async fn run( + &self, + child_processes: Arc>>, + ) -> AgentResult>; } #[derive(Debug, Clone, Deserialize)] @@ -55,3 +76,111 @@ impl TryFrom<&str> for Language { } } } + +mod process_utils { + use super::AgentOutput; + use crate::agent::execute_response::Stage; + use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::{ChildStderr, ChildStdout}, + sync::mpsc, + task::JoinHandle, + }; + + /// Spawn a tokio thread and send each line of `stdout`` to the `tx` given as a parameter. + pub async fn send_stdout_to_tx( + stdout: ChildStdout, + tx: mpsc::Sender, + stage: Option, + ) -> JoinHandle<()> { + tokio::spawn(async move { + let reader = BufReader::new(stdout); + let mut reader_lines = reader.lines(); + + while let Ok(Some(line)) = reader_lines.next_line().await { + let _ = tx + .send(AgentOutput { + stage: stage.unwrap_or(Stage::Running), + stdout: Some(line), + stderr: None, + exit_code: None, + }) + .await; + } + }) + } + + /// Same as [`send_stdout_to_tx`]. + pub async fn send_stderr_to_tx( + stderr: ChildStderr, + tx: mpsc::Sender, + stage: Option, + ) -> JoinHandle<()> { + tokio::spawn(async move { + let reader = BufReader::new(stderr); + let mut reader_lines = reader.lines(); + + while let Ok(Some(line)) = reader_lines.next_line().await { + let _ = tx + .send(AgentOutput { + stage: stage.unwrap_or(Stage::Running), + stdout: Some(line), + stderr: None, + exit_code: None, + }) + .await; + } + }) + } + + /// Function to wait for the `child` to finish and send the result to the `tx` given as a parameter. + pub async fn send_exit_status_to_tx( + mut child: tokio::process::Child, + tx: mpsc::Sender, + send_done: bool, + ) -> Result<(), ()> { + let exit_status = child.wait().await.map(|status| status.code()); + + match exit_status { + Ok(exit_code) => { + if exit_code != Some(0_i32) { + let _ = tx + .send(AgentOutput { + stage: Stage::Failed, + stdout: None, + stderr: None, + exit_code, + }) + .await; + + Err(()) + } else { + if send_done { + let _ = tx + .send(AgentOutput { + stage: Stage::Done, + stdout: None, + stderr: None, + exit_code, + }) + .await; + } + + Ok(()) + } + } + Err(e) => { + let _ = tx + .send(AgentOutput { + stage: Stage::Failed, + stdout: None, + stderr: Some(e.to_string()), + exit_code: None, + }) + .await; + + Err(()) + } + } + } +} diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 207276e..90aed45 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -1,13 +1,20 @@ use super::{Agent, AgentOutput}; +use crate::agent::execute_response::Stage; +use crate::agents::process_utils; use crate::{workload, AgentError, AgentResult}; use async_trait::async_trait; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; use std::collections::HashSet; use std::fs::create_dir_all; +use std::process::Stdio; use std::sync::Arc; -use tokio::process::Command; -use tokio::sync::Mutex; +use tokio::process::{Child, Command}; +use tokio::sync::{ + broadcast, + mpsc::{self, Receiver}, + Mutex, +}; #[derive(Deserialize)] #[serde(rename_all = "kebab-case")] @@ -23,72 +30,57 @@ struct RustAgentConfig { pub struct RustAgent { workload_config: workload::config::Config, rust_config: RustAgentConfig, + build_notifier: broadcast::Sender>, +} + +// TODO should change with a TryFrom +impl From for RustAgent { + fn from(workload_config: workload::config::Config) -> Self { + let rust_config: RustAgentConfig = toml::from_str(&workload_config.config_string).unwrap(); + + Self { + workload_config, + rust_config, + build_notifier: broadcast::channel::>(1).0, + } + } } impl RustAgent { - async fn build( + async fn get_build_child_process( &self, function_dir: &str, child_processes: Arc>>, - ) -> AgentResult { - if self.rust_config.build.release { - let child = Command::new("cargo") + ) -> Child { + let mut command = Command::new("cargo"); + let command = if self.rust_config.build.release { + command + .stderr(Stdio::piped()) .arg("build") - .arg("--release") .current_dir(function_dir) - .spawn() - .expect("Failed to build function"); - - { - child_processes.lock().await.insert(child.id().unwrap()); - } - - let output = child - .wait_with_output() - .await - .expect("Failed to wait on child"); - - Ok(AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }) + .arg("--release") } else { - let child = Command::new("cargo") + command + .stderr(Stdio::piped()) .arg("build") .current_dir(function_dir) - .spawn() - .expect("Failed to build function"); - - let output = child - .wait_with_output() - .await - .expect("Failed to wait on child"); + }; + let child = command.spawn().expect("Failed to start build"); - Ok(AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }) + { + child_processes.lock().await.insert(child.id().unwrap()); } - } -} - -// TODO should change with a TryFrom -impl From for RustAgent { - fn from(workload_config: workload::config::Config) -> Self { - let rust_config: RustAgentConfig = toml::from_str(&workload_config.config_string).unwrap(); - Self { - workload_config, - rust_config, - } + child } } #[async_trait] impl Agent for RustAgent { - async fn prepare(&self, child_processes: Arc>>) -> AgentResult { + async fn prepare( + &self, + child_processes: Arc>>, + ) -> AgentResult> { let function_dir = format!( "/tmp/{}", Alphanumeric.sample_string(&mut rand::thread_rng(), 16) @@ -117,46 +109,60 @@ impl Agent for RustAgent { std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) .expect("Unable to write Cargo.toml file"); - let result = self.build(&function_dir, child_processes).await?; - - if result.exit_code != 0 { - println!("Build failed: {:?}", result); - return Err(AgentError::BuildFailed(AgentOutput { - exit_code: result.exit_code, - stdout: result.stdout, - stderr: result.stderr, - })); - } - - // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) - let binary_path = match self.rust_config.build.release { - true => format!( - "{}/target/release/{}", - &function_dir, self.workload_config.workload_name - ), - false => format!( - "{}/target/debug/{}", - &function_dir, self.workload_config.workload_name - ), - }; - - std::fs::copy( - binary_path, - format!("/tmp/{}", self.workload_config.workload_name), - ) - .expect("Unable to copy binary"); + let mut child = self + .get_build_child_process(&function_dir, child_processes) + .await; + let workload_name = self.workload_config.workload_name.clone(); + let is_release = self.rust_config.build.release; + let tx_build_notifier = self.build_notifier.clone(); + + let (tx, rx) = mpsc::channel(10); + tokio::spawn(async move { + let stderr = child.stderr.take().unwrap(); + let _ = process_utils::send_stderr_to_tx(stderr, tx.clone(), Some(Stage::Building)) + .await + .await; + let build_result = process_utils::send_exit_status_to_tx(child, tx, false).await; + // if error in build, short-circuit the execution + if build_result.is_err() { + let _ = tx_build_notifier.send(Err(())); + } else { + // Once finished: copy the binary to /tmp + // We could imagine a more complex scenario where we would put this in an artifact repository (like S3) + let binary_path = match is_release { + true => format!("{}/target/release/{}", &function_dir, workload_name), + false => format!("{}/target/debug/{}", &function_dir, workload_name), + }; + + std::fs::copy(binary_path, format!("/tmp/{}", workload_name)) + .expect("Unable to copy binary"); + + // notify when build is done + let _ = tx_build_notifier.send(build_result); + } - std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); + std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); + }); - Ok(AgentOutput { - exit_code: result.exit_code, - stdout: "Build successful".to_string(), - stderr: "".to_string(), - }) + Ok(rx) } - async fn run(&self, child_processes: Arc>>) -> AgentResult { - let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) + async fn run( + &self, + child_processes: Arc>>, + ) -> AgentResult> { + // wait for build to finish + self.build_notifier + .subscribe() + .recv() + .await + .map_err(|_| AgentError::BuildNotifier)? + .map_err(|_| AgentError::BuildFailed)?; + + println!("Starting run()"); + let mut child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) .spawn() .expect("Failed to run function"); @@ -164,22 +170,25 @@ impl Agent for RustAgent { child_processes.lock().await.insert(child.id().unwrap()); } - let output = child - .wait_with_output() - .await - .expect("Failed to wait on child"); + let (tx, rx) = mpsc::channel(10); + let child_stdout = child.stdout.take().unwrap(); + let tx_stdout = tx.clone(); + let child_stderr = child.stderr.take().unwrap(); + let tx_stderr = tx; - let agent_output = AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }; + tokio::spawn(async move { + let _ = process_utils::send_stdout_to_tx(child_stdout, tx_stdout.clone(), None) + .await + .await; + let _ = process_utils::send_exit_status_to_tx(child, tx_stdout, true).await; + }); - if !output.status.success() { - println!("Run failed: {:?}", agent_output); - return Err(AgentError::BuildFailed(agent_output)); - } + tokio::spawn(async move { + let _ = process_utils::send_stderr_to_tx(child_stderr, tx_stderr, None) + .await + .await; + }); - Ok(agent_output) + Ok(rx) } } diff --git a/src/agent/src/lib.rs b/src/agent/src/lib.rs index 0c43daf..48a4a5c 100644 --- a/src/agent/src/lib.rs +++ b/src/agent/src/lib.rs @@ -1,4 +1,3 @@ -use agents::AgentOutput; use std::fmt; mod agents; @@ -9,7 +8,8 @@ pub enum AgentError { OpenConfigFileError(std::io::Error), ParseConfigError(toml::de::Error), InvalidLanguage(String), - BuildFailed(AgentOutput), + BuildNotifier, + BuildFailed, } impl fmt::Display for AgentError { @@ -17,8 +17,11 @@ impl fmt::Display for AgentError { match self { AgentError::OpenConfigFileError(e) => write!(f, "Failed to open config file: {}", e), AgentError::ParseConfigError(e) => write!(f, "Failed to parse config file: {}", e), - AgentError::BuildFailed(output) => write!(f, "Build failed: {:?}", output), AgentError::InvalidLanguage(e) => write!(f, "Invalid language: {}", e), + AgentError::BuildNotifier => { + write!(f, "Could not get notification from build notifier") + } + AgentError::BuildFailed => write!(f, "Build has failed"), } } } diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index 1cbeb61..2f1bd5f 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -7,7 +7,7 @@ use crate::{ }; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{mpsc::Receiver, Mutex}; #[cfg(feature = "debug-agent")] use crate::agents::debug; @@ -28,7 +28,7 @@ impl Runner { Language::Debug => Box::new(debug::DebugAgent::from(config.clone())), }; - Runner { + Self { config, agent, child_processes, @@ -43,8 +43,8 @@ impl Runner { Ok(Self::new(config, child_processes)) } - pub async fn run(&self) -> AgentResult { - let result = match self.config.action { + pub async fn run(self) -> AgentResult> { + let rx = match self.config.action { Action::Prepare => { self.agent .prepare(Arc::clone(&self.child_processes)) @@ -52,17 +52,33 @@ impl Runner { } Action::Run => self.agent.run(Arc::clone(&self.child_processes)).await?, Action::PrepareAndRun => { - let res = self + let (tx1, rx) = tokio::sync::mpsc::channel::(10); + let tx2 = tx1.clone(); + + // Merges the two receivers given as parameters and returns only one + let mut rx_prepare = self .agent .prepare(Arc::clone(&self.child_processes)) .await?; - println!("Prepare result {:?}", res); - self.agent.run(Arc::clone(&self.child_processes)).await? + tokio::spawn(async move { + while let Some(output) = rx_prepare.recv().await { + let _ = tx1.send(output).await; + } + }); + + tokio::spawn(async move { + let rx_run = self.agent.run(Arc::clone(&self.child_processes)).await; + if let Ok(mut rx_run) = rx_run { + while let Some(output) = rx_run.recv().await { + let _ = tx2.clone().send(output).await; + } + } + }); + + rx } }; - println!("Result: {:?}", result); - - Ok(result) + Ok(rx) } } diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index 996d256..bcce6ac 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -1,10 +1,10 @@ use super::runner::Runner; -use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest}; +use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest}; use agent::workload_runner_server::WorkloadRunner; use once_cell::sync::Lazy; use std::collections::HashSet; use std::{process, sync::Arc}; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response}; @@ -20,30 +20,21 @@ impl WorkloadRunner for WorkloadRunnerService { type ExecuteStream = ReceiverStream>; async fn execute(&self, req: Request) -> Result { - let (tx, rx) = tokio::sync::mpsc::channel(4); - - let execute_request = req.into_inner(); - - let runner = Runner::new_from_execute_request(execute_request, CHILD_PROCESSES.clone()) + let runner = Runner::new_from_execute_request(req.into_inner(), CHILD_PROCESSES.clone()) .map_err(|e| tonic::Status::internal(e.to_string()))?; - let res = runner + let mut runner_rx = runner .run() .await .map_err(|e| tonic::Status::internal(e.to_string()))?; - let _ = tx - .send(Ok(ExecuteResponse { - stage: Stage::Done as i32, - stdout: res.stdout, - stderr: res.stderr, - exit_code: res.exit_code, - })) - .await - .map_err(|e| { - println!("Failed to send response: {:?}", e); - tonic::Status::internal("Failed to send response") - })?; + let (tx, rx) = mpsc::channel(10); + tokio::spawn(async move { + while let Some(agent_output) = runner_rx.recv().await { + println!("Sending to the gRPC client: {:?}", agent_output); + let _ = tx.send(Ok(agent_output.into())).await; + } + }); Ok(Response::new(ReceiverStream::new(rx))) } diff --git a/src/api/src/service.rs b/src/api/src/service.rs index 243c287..4db3fd8 100644 --- a/src/api/src/service.rs +++ b/src/api/src/service.rs @@ -1,5 +1,8 @@ use crate::client::{ - vmmorchestrator::{ExecuteResponse, RunVmmRequest, ShutdownVmRequest, ShutdownVmResponse}, + vmmorchestrator::{ + execute_response::Stage, ExecuteResponse, RunVmmRequest, ShutdownVmRequest, + ShutdownVmResponse, + }, VmmClient, }; use actix_web::{post, web, HttpRequest, HttpResponse, Responder}; @@ -49,14 +52,39 @@ pub async fn run(req_body: web::Json) -> impl Responder { #[derive(Debug, Serialize)] pub struct ExecuteJsonResponse { - pub stdout: String, - pub stderr: String, - pub exit_code: i32, + pub stage: StageJson, + pub stdout: Option, + pub stderr: Option, + pub exit_code: Option, +} + +#[derive(Debug, Serialize)] +pub enum StageJson { + Pending, + Building, + Running, + Done, + Failed, + Debug, +} + +impl From for StageJson { + fn from(value: Stage) -> Self { + match value { + Stage::Pending => StageJson::Pending, + Stage::Building => StageJson::Building, + Stage::Running => StageJson::Running, + Stage::Done => StageJson::Done, + Stage::Failed => StageJson::Failed, + Stage::Debug => StageJson::Debug, + } + } } impl From for ExecuteJsonResponse { fn from(value: ExecuteResponse) -> Self { Self { + stage: Stage::from_i32(value.stage).unwrap().into(), stdout: value.stdout, stderr: value.stderr, exit_code: value.exit_code, diff --git a/src/cli/examples/main.rs b/src/cli/examples/main.rs index 772f207..06d6a9a 100644 --- a/src/cli/examples/main.rs +++ b/src/cli/examples/main.rs @@ -17,5 +17,10 @@ fn fibonacci(n: u32) -> u64 { fn main() { let n = 10; - println!("Fibonacci number at position {} is: {}", n, fibonacci(n)); + + for i in 0..=10 { + println!("🚀 Loop {}", i); + println!("Fibonacci number at position {} is: {}", n, fibonacci(n)); + std::thread::sleep(std::time::Duration::from_secs(1)); + } } diff --git a/src/vmm/src/grpc/server.rs b/src/vmm/src/grpc/server.rs index 898a60f..50a4e24 100644 --- a/src/vmm/src/grpc/server.rs +++ b/src/vmm/src/grpc/server.rs @@ -248,14 +248,17 @@ impl VmmServiceTrait for VmmService { let mut response_stream = client.execute(agent_request).await?; // Process each message as it arrives - while let Some(response) = response_stream.message().await? { - let vmm_response = vmmorchestrator::ExecuteResponse { - stdout: response.stdout, - stderr: response.stderr, - exit_code: response.exit_code, - }; - tx.send(Ok(vmm_response)).await.unwrap(); - } + tokio::spawn(async move { + while let Ok(Some(response)) = response_stream.message().await { + let vmm_response = vmmorchestrator::ExecuteResponse { + stage: response.stage, + stdout: response.stdout, + stderr: response.stderr, + exit_code: response.exit_code, + }; + let _ = tx.send(Ok(vmm_response)).await; + } + }); } Err(e) => { error!("ERROR {:?}", e);