From b4ca4024faaa30f7e522aa7dfb331ace24a5a264 Mon Sep 17 00:00:00 2001 From: Mateo Fernandez Date: Wed, 29 May 2024 21:54:30 +0200 Subject: [PATCH 1/7] feat(agent): add channels for streaming logs of build and run stage fix(proto): use optional field for non-used output and exit_code Signed-off-by: Mateo Fernandez --- proto/agent.proto | 8 +- src/agent/src/agents/debug.rs | 56 ++++++--- src/agent/src/agents/mod.rs | 122 +++++++++++++++++-- src/agent/src/agents/rust.rs | 190 +++++++++++++++--------------- src/agent/src/lib.rs | 7 +- src/agent/src/workload/runner.rs | 17 ++- src/agent/src/workload/service.rs | 31 ++--- 7 files changed, 277 insertions(+), 154 deletions(-) diff --git a/proto/agent.proto b/proto/agent.proto index 84b3798..42c98c8 100644 --- a/proto/agent.proto +++ b/proto/agent.proto @@ -26,10 +26,10 @@ message ExecuteResponse { DEBUG = 5; } - string stdout = 1; - string stderr = 2; - Stage stage = 3; - int32 exit_code = 4; + Stage stage = 1; + optional string stdout = 2; + optional string stderr = 3; + optional int32 exit_code = 4; } message SignalRequest { diff --git a/src/agent/src/agents/debug.rs b/src/agent/src/agents/debug.rs index dbbf241..ac03121 100644 --- a/src/agent/src/agents/debug.rs +++ b/src/agent/src/agents/debug.rs @@ -1,4 +1,5 @@ use super::AgentOutput; +use crate::agent::execute_response::Stage; use crate::agents::Agent; use crate::{workload, AgentResult}; use async_trait::async_trait; @@ -6,6 +7,7 @@ use std::collections::HashSet; use std::fs::create_dir_all; use std::sync::Arc; use std::time::SystemTime; +use tokio::sync::mpsc::{self, Receiver}; use tokio::sync::Mutex; pub struct DebugAgent { @@ -20,7 +22,7 @@ impl From for DebugAgent { #[async_trait] impl Agent for DebugAgent { - async fn prepare(&self, _: Arc>>) -> AgentResult { + async fn prepare(&self, _: Arc>>) -> AgentResult> { let dir = format!("/tmp/{}", self.workload_config.workload_name); println!("Function directory: {}", dir); @@ -37,25 +39,51 @@ impl Agent for DebugAgent { ) .expect("Unable to write debug.txt file"); - Ok(AgentOutput { - exit_code: 0, - stdout: "Build successfully!".into(), - stderr: String::default(), - }) + let (tx, rx) = mpsc::channel(1); + tokio::spawn(async move { + let _ = tx + .send(AgentOutput { + stage: Stage::Building, + stdout: Some("Build successfully!".into()), + stderr: None, + exit_code: None, + }) + .await; + }); + + Ok(rx) } - async fn run(&self, _: Arc>>) -> AgentResult { + async fn run(&self, _: Arc>>) -> AgentResult> { let dir = format!("/tmp/{}", self.workload_config.workload_name); - let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)) - .expect("Unable to read debug.txt file"); + let content = std::fs::read_to_string(format!("{}/debug.txt", &dir)); std::fs::remove_dir_all(dir).expect("Unable to remove directory"); - Ok(AgentOutput { - exit_code: 0, - stdout: content, - stderr: String::default(), - }) + let (tx, rx) = mpsc::channel(1); + tokio::spawn(async move { + if let Ok(content) = content { + let _ = tx + .send(AgentOutput { + stage: Stage::Done, + stdout: Some(content), + stderr: None, + exit_code: Some(0), + }) + .await; + } + + let _ = tx + .send(AgentOutput { + stage: Stage::Failed, + stdout: None, + stderr: Some("unable to read debug.txt".into()), + exit_code: Some(1), + }) + .await; + }); + + Ok(rx) } } diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index 9d4032c..d4db775 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -1,9 +1,12 @@ -use crate::{AgentError, AgentResult}; +use crate::{ + agent::{execute_response::Stage, ExecuteResponse}, + AgentError, AgentResult, +}; use async_trait::async_trait; use serde::Deserialize; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; #[cfg(feature = "debug-agent")] pub mod debug; @@ -11,15 +14,33 @@ pub mod rust; #[derive(Debug, Clone)] pub struct AgentOutput { - pub exit_code: i32, - pub stdout: String, - pub stderr: String, + pub stage: Stage, + pub stdout: Option, + pub stderr: Option, + pub exit_code: Option, +} + +impl From for ExecuteResponse { + fn from(value: AgentOutput) -> Self { + Self { + stage: value.stage as i32, + stdout: value.stdout, + stderr: value.stderr, + exit_code: value.exit_code, + } + } } #[async_trait] pub trait Agent { - async fn prepare(&self, child_processes: Arc>>) -> AgentResult; - async fn run(&self, child_processes: Arc>>) -> AgentResult; + async fn prepare( + &self, + child_processes: Arc>>, + ) -> AgentResult>; + async fn run( + &self, + child_processes: Arc>>, + ) -> AgentResult>; } #[derive(Debug, Clone, Deserialize)] @@ -55,3 +76,90 @@ impl TryFrom<&str> for Language { } } } + +mod process_utils { + use super::AgentOutput; + use crate::agent::execute_response::Stage; + use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::{ChildStderr, ChildStdout}, + sync::mpsc, + }; + + /// Spawn a tokio thread and send each line of `stdout`` to the `tx` given as a parameter. + pub async fn send_stdout_to_tx(stdout: ChildStdout, tx: mpsc::Sender) { + tokio::spawn(async move { + let reader = BufReader::new(stdout); + let mut reader_lines = reader.lines(); + + while let Ok(Some(line)) = reader_lines.next_line().await { + println!("Got line from stdout: {:?}", line); + + let _ = tx + .send(AgentOutput { + stage: Stage::Running, + stdout: Some(line), + stderr: None, + exit_code: None, + }) + .await; + } + }); + } + + /// Same as [`send_stdout_to_tx`]. + pub async fn send_stderr_to_tx(stderr: ChildStderr, tx: mpsc::Sender) { + tokio::spawn(async move { + let reader = BufReader::new(stderr); + let mut reader_lines = reader.lines(); + + while let Ok(Some(line)) = reader_lines.next_line().await { + println!("Got line from stderr: {:?}", line); + + let _ = tx + .send(AgentOutput { + stage: Stage::Running, + stdout: Some(line), + stderr: None, + exit_code: None, + }) + .await; + } + }); + } + + /// Function to wait for the `child` to finish and send the result to the `tx` given as a parameter. + pub async fn send_exit_status_to_tx( + child: &mut tokio::process::Child, + tx: mpsc::Sender, + ) -> Result<(), ()> { + let exit_status = child.wait().await; + + match exit_status { + Ok(code) => { + let _ = tx + .send(AgentOutput { + stage: Stage::Done, + stdout: None, + stderr: None, + exit_code: code.code(), + }) + .await; + + Ok(()) + } + Err(e) => { + let _ = tx + .send(AgentOutput { + stage: Stage::Failed, + stdout: None, + stderr: Some(e.to_string()), + exit_code: None, + }) + .await; + + Err(()) + } + } + } +} diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 207276e..78db2f9 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -1,13 +1,19 @@ use super::{Agent, AgentOutput}; +use crate::agents::process_utils; use crate::{workload, AgentError, AgentResult}; use async_trait::async_trait; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; use std::collections::HashSet; use std::fs::create_dir_all; +use std::process::Stdio; use std::sync::Arc; -use tokio::process::Command; -use tokio::sync::Mutex; +use tokio::process::{Child, Command}; +use tokio::sync::{ + broadcast, + mpsc::{self, Receiver}, + Mutex, +}; #[derive(Deserialize)] #[serde(rename_all = "kebab-case")] @@ -23,72 +29,57 @@ struct RustAgentConfig { pub struct RustAgent { workload_config: workload::config::Config, rust_config: RustAgentConfig, + build_notifier: broadcast::Sender>, +} + +// TODO should change with a TryFrom +impl From for RustAgent { + fn from(workload_config: workload::config::Config) -> Self { + let rust_config: RustAgentConfig = toml::from_str(&workload_config.config_string).unwrap(); + + Self { + workload_config, + rust_config, + build_notifier: broadcast::channel::>(1).0, + } + } } impl RustAgent { - async fn build( + async fn get_build_child_process( &self, function_dir: &str, child_processes: Arc>>, - ) -> AgentResult { - if self.rust_config.build.release { - let child = Command::new("cargo") + ) -> Child { + let mut command = Command::new("cargo"); + let command = if self.rust_config.build.release { + command + .stderr(Stdio::piped()) .arg("build") - .arg("--release") .current_dir(function_dir) - .spawn() - .expect("Failed to build function"); - - { - child_processes.lock().await.insert(child.id().unwrap()); - } - - let output = child - .wait_with_output() - .await - .expect("Failed to wait on child"); - - Ok(AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }) + .arg("--release") } else { - let child = Command::new("cargo") + command + .stderr(Stdio::piped()) .arg("build") .current_dir(function_dir) - .spawn() - .expect("Failed to build function"); - - let output = child - .wait_with_output() - .await - .expect("Failed to wait on child"); - - Ok(AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }) - } - } -} - -// TODO should change with a TryFrom -impl From for RustAgent { - fn from(workload_config: workload::config::Config) -> Self { - let rust_config: RustAgentConfig = toml::from_str(&workload_config.config_string).unwrap(); + }; + let child = command.spawn().expect("Failed to start build"); - Self { - workload_config, - rust_config, + { + child_processes.lock().await.insert(child.id().unwrap()); } + + child } } #[async_trait] impl Agent for RustAgent { - async fn prepare(&self, child_processes: Arc>>) -> AgentResult { + async fn prepare( + &self, + child_processes: Arc>>, + ) -> AgentResult> { let function_dir = format!( "/tmp/{}", Alphanumeric.sample_string(&mut rand::thread_rng(), 16) @@ -117,46 +108,52 @@ impl Agent for RustAgent { std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml) .expect("Unable to write Cargo.toml file"); - let result = self.build(&function_dir, child_processes).await?; + let mut child = self + .get_build_child_process(&function_dir, child_processes) + .await; + let workload_name = self.workload_config.workload_name.clone(); + let is_release = self.rust_config.build.release; + let tx_build_notifier = self.build_notifier.clone(); - if result.exit_code != 0 { - println!("Build failed: {:?}", result); - return Err(AgentError::BuildFailed(AgentOutput { - exit_code: result.exit_code, - stdout: result.stdout, - stderr: result.stderr, - })); - } + let (tx, rx) = mpsc::channel(1); + tokio::spawn(async move { + process_utils::send_stderr_to_tx(child.stderr.take().unwrap(), tx.clone()).await; + let build_result = process_utils::send_exit_status_to_tx(&mut child, tx).await; - // Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3) - let binary_path = match self.rust_config.build.release { - true => format!( - "{}/target/release/{}", - &function_dir, self.workload_config.workload_name - ), - false => format!( - "{}/target/debug/{}", - &function_dir, self.workload_config.workload_name - ), - }; + // Once finished: copy the binary to /tmp + // We could imagine a more complex scenario where we would put this in an artifact repository (like S3) + let binary_path = match is_release { + true => format!("{}/target/release/{}", &function_dir, workload_name), + false => format!("{}/target/debug/{}", &function_dir, workload_name), + }; - std::fs::copy( - binary_path, - format!("/tmp/{}", self.workload_config.workload_name), - ) - .expect("Unable to copy binary"); + std::fs::copy(binary_path, format!("/tmp/{}", workload_name)) + .expect("Unable to copy binary"); + + std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); - std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); + // notify when build is done + let _ = tx_build_notifier.send(build_result); + }); - Ok(AgentOutput { - exit_code: result.exit_code, - stdout: "Build successful".to_string(), - stderr: "".to_string(), - }) + Ok(rx) } - async fn run(&self, child_processes: Arc>>) -> AgentResult { - let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) + async fn run( + &self, + child_processes: Arc>>, + ) -> AgentResult> { + // wait for build to finish + self.build_notifier + .subscribe() + .recv() + .await + .map_err(|_| AgentError::BuildNotifier)? + .map_err(|_| AgentError::BuildFailed)?; + + let mut child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) .spawn() .expect("Failed to run function"); @@ -164,22 +161,21 @@ impl Agent for RustAgent { child_processes.lock().await.insert(child.id().unwrap()); } - let output = child - .wait_with_output() - .await - .expect("Failed to wait on child"); + let (tx, rx) = mpsc::channel(1); + let child_stdout = child.stdout.take().unwrap(); + let tx_stdout = tx.clone(); + let child_stderr = child.stderr.take().unwrap(); + let tx_stderr = tx; - let agent_output = AgentOutput { - exit_code: output.status.code().unwrap(), - stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(), - stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(), - }; + tokio::spawn(async move { + process_utils::send_stdout_to_tx(child_stdout, tx_stdout.clone()).await; + let _ = process_utils::send_exit_status_to_tx(&mut child, tx_stdout).await; + }); - if !output.status.success() { - println!("Run failed: {:?}", agent_output); - return Err(AgentError::BuildFailed(agent_output)); - } + tokio::spawn(async move { + process_utils::send_stderr_to_tx(child_stderr, tx_stderr).await; + }); - Ok(agent_output) + Ok(rx) } } diff --git a/src/agent/src/lib.rs b/src/agent/src/lib.rs index 0c43daf..76281e3 100644 --- a/src/agent/src/lib.rs +++ b/src/agent/src/lib.rs @@ -1,4 +1,3 @@ -use agents::AgentOutput; use std::fmt; mod agents; @@ -9,7 +8,8 @@ pub enum AgentError { OpenConfigFileError(std::io::Error), ParseConfigError(toml::de::Error), InvalidLanguage(String), - BuildFailed(AgentOutput), + BuildNotifier, + BuildFailed, } impl fmt::Display for AgentError { @@ -17,8 +17,9 @@ impl fmt::Display for AgentError { match self { AgentError::OpenConfigFileError(e) => write!(f, "Failed to open config file: {}", e), AgentError::ParseConfigError(e) => write!(f, "Failed to parse config file: {}", e), - AgentError::BuildFailed(output) => write!(f, "Build failed: {:?}", output), AgentError::InvalidLanguage(e) => write!(f, "Invalid language: {}", e), + AgentError::BuildNotifier => write!(f, "Could not get notification from build notifier"), + AgentError::BuildFailed => write!(f, "Build has failed"), } } } diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index 1cbeb61..c51d2d7 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -7,7 +7,7 @@ use crate::{ }; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{mpsc::Receiver, Mutex}; #[cfg(feature = "debug-agent")] use crate::agents::debug; @@ -28,7 +28,7 @@ impl Runner { Language::Debug => Box::new(debug::DebugAgent::from(config.clone())), }; - Runner { + Self { config, agent, child_processes, @@ -43,8 +43,8 @@ impl Runner { Ok(Self::new(config, child_processes)) } - pub async fn run(&self) -> AgentResult { - let result = match self.config.action { + pub async fn run(&self) -> AgentResult> { + let rx = match self.config.action { Action::Prepare => { self.agent .prepare(Arc::clone(&self.child_processes)) @@ -52,17 +52,16 @@ impl Runner { } Action::Run => self.agent.run(Arc::clone(&self.child_processes)).await?, Action::PrepareAndRun => { - let res = self + // should merge with run rx? + let _ = self .agent .prepare(Arc::clone(&self.child_processes)) .await?; - println!("Prepare result {:?}", res); + self.agent.run(Arc::clone(&self.child_processes)).await? } }; - println!("Result: {:?}", result); - - Ok(result) + Ok(rx) } } diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index 996d256..1dcaaa1 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -1,10 +1,10 @@ use super::runner::Runner; -use crate::agent::{self, execute_response::Stage, ExecuteRequest, ExecuteResponse, SignalRequest}; +use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest}; use agent::workload_runner_server::WorkloadRunner; use once_cell::sync::Lazy; use std::collections::HashSet; use std::{process, sync::Arc}; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response}; @@ -20,30 +20,21 @@ impl WorkloadRunner for WorkloadRunnerService { type ExecuteStream = ReceiverStream>; async fn execute(&self, req: Request) -> Result { - let (tx, rx) = tokio::sync::mpsc::channel(4); - - let execute_request = req.into_inner(); - - let runner = Runner::new_from_execute_request(execute_request, CHILD_PROCESSES.clone()) + let runner = Runner::new_from_execute_request(req.into_inner(), CHILD_PROCESSES.clone()) .map_err(|e| tonic::Status::internal(e.to_string()))?; - let res = runner + let mut run_rx = runner .run() .await .map_err(|e| tonic::Status::internal(e.to_string()))?; - let _ = tx - .send(Ok(ExecuteResponse { - stage: Stage::Done as i32, - stdout: res.stdout, - stderr: res.stderr, - exit_code: res.exit_code, - })) - .await - .map_err(|e| { - println!("Failed to send response: {:?}", e); - tonic::Status::internal("Failed to send response") - })?; + let (tx, rx) = mpsc::channel(1); + tokio::spawn(async move { + while let Some(agent_output) = run_rx.recv().await { + println!("Sending to the gRPC client: {:?}", agent_output); + let _ = tx.send(Ok(agent_output.into())).await; + } + }); Ok(Response::new(ReceiverStream::new(rx))) } From e78718835faaf25e06ae98545fd603bbacc83c0a Mon Sep 17 00:00:00 2001 From: Mateo Fernandez Date: Thu, 30 May 2024 03:11:59 +0200 Subject: [PATCH 2/7] feat(agent): merge 'prepare' and 'run' streams fix(agent): prevent 'run' if 'prepare' failed Signed-off-by: Mateo Fernandez --- src/agent/src/agents/mod.rs | 61 ++++++++++++++++++++----------- src/agent/src/agents/rust.rs | 43 ++++++++++++---------- src/agent/src/workload/runner.rs | 25 +++++++++++-- src/agent/src/workload/service.rs | 6 +-- 4 files changed, 88 insertions(+), 47 deletions(-) diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index d4db775..65afd21 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -84,17 +84,19 @@ mod process_utils { io::{AsyncBufReadExt, BufReader}, process::{ChildStderr, ChildStdout}, sync::mpsc, + task::JoinHandle, }; /// Spawn a tokio thread and send each line of `stdout`` to the `tx` given as a parameter. - pub async fn send_stdout_to_tx(stdout: ChildStdout, tx: mpsc::Sender) { + pub async fn send_stdout_to_tx( + stdout: ChildStdout, + tx: mpsc::Sender, + ) -> JoinHandle<()> { tokio::spawn(async move { let reader = BufReader::new(stdout); let mut reader_lines = reader.lines(); while let Ok(Some(line)) = reader_lines.next_line().await { - println!("Got line from stdout: {:?}", line); - let _ = tx .send(AgentOutput { stage: Stage::Running, @@ -104,18 +106,19 @@ mod process_utils { }) .await; } - }); + }) } /// Same as [`send_stdout_to_tx`]. - pub async fn send_stderr_to_tx(stderr: ChildStderr, tx: mpsc::Sender) { + pub async fn send_stderr_to_tx( + stderr: ChildStderr, + tx: mpsc::Sender, + ) -> JoinHandle<()> { tokio::spawn(async move { let reader = BufReader::new(stderr); let mut reader_lines = reader.lines(); while let Ok(Some(line)) = reader_lines.next_line().await { - println!("Got line from stderr: {:?}", line); - let _ = tx .send(AgentOutput { stage: Stage::Running, @@ -125,28 +128,44 @@ mod process_utils { }) .await; } - }); + }) } /// Function to wait for the `child` to finish and send the result to the `tx` given as a parameter. pub async fn send_exit_status_to_tx( - child: &mut tokio::process::Child, + mut child: tokio::process::Child, tx: mpsc::Sender, + send_done: bool, ) -> Result<(), ()> { - let exit_status = child.wait().await; + let exit_status = child.wait().await.map(|status| status.code()); match exit_status { - Ok(code) => { - let _ = tx - .send(AgentOutput { - stage: Stage::Done, - stdout: None, - stderr: None, - exit_code: code.code(), - }) - .await; - - Ok(()) + Ok(exit_code) => { + if exit_code != Some(0_i32) { + let _ = tx + .send(AgentOutput { + stage: Stage::Failed, + stdout: None, + stderr: None, + exit_code, + }) + .await; + + Err(()) + } else { + if send_done { + let _ = tx + .send(AgentOutput { + stage: Stage::Done, + stdout: None, + stderr: None, + exit_code, + }) + .await; + } + + Ok(()) + } } Err(e) => { let _ = tx diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 78db2f9..75bbf51 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -115,25 +115,29 @@ impl Agent for RustAgent { let is_release = self.rust_config.build.release; let tx_build_notifier = self.build_notifier.clone(); - let (tx, rx) = mpsc::channel(1); + let (tx, rx) = mpsc::channel(10); tokio::spawn(async move { - process_utils::send_stderr_to_tx(child.stderr.take().unwrap(), tx.clone()).await; - let build_result = process_utils::send_exit_status_to_tx(&mut child, tx).await; - - // Once finished: copy the binary to /tmp - // We could imagine a more complex scenario where we would put this in an artifact repository (like S3) - let binary_path = match is_release { - true => format!("{}/target/release/{}", &function_dir, workload_name), - false => format!("{}/target/debug/{}", &function_dir, workload_name), - }; - - std::fs::copy(binary_path, format!("/tmp/{}", workload_name)) - .expect("Unable to copy binary"); + let _ = process_utils::send_stderr_to_tx(child.stderr.take().unwrap(), tx.clone()).await.await; + let build_result = process_utils::send_exit_status_to_tx(child, tx, false).await; + // if error in build, short-circuit the execution + if build_result.is_err() { + let _ = tx_build_notifier.send(Err(())); + } else { + // Once finished: copy the binary to /tmp + // We could imagine a more complex scenario where we would put this in an artifact repository (like S3) + let binary_path = match is_release { + true => format!("{}/target/release/{}", &function_dir, workload_name), + false => format!("{}/target/debug/{}", &function_dir, workload_name), + }; + + std::fs::copy(binary_path, format!("/tmp/{}", workload_name)) + .expect("Unable to copy binary"); + + // notify when build is done + let _ = tx_build_notifier.send(build_result); + } std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory"); - - // notify when build is done - let _ = tx_build_notifier.send(build_result); }); Ok(rx) @@ -151,6 +155,7 @@ impl Agent for RustAgent { .map_err(|_| AgentError::BuildNotifier)? .map_err(|_| AgentError::BuildFailed)?; + println!("Starting run()"); let mut child = Command::new(format!("/tmp/{}", self.workload_config.workload_name)) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -161,15 +166,15 @@ impl Agent for RustAgent { child_processes.lock().await.insert(child.id().unwrap()); } - let (tx, rx) = mpsc::channel(1); + let (tx, rx) = mpsc::channel(10); let child_stdout = child.stdout.take().unwrap(); let tx_stdout = tx.clone(); let child_stderr = child.stderr.take().unwrap(); let tx_stderr = tx; tokio::spawn(async move { - process_utils::send_stdout_to_tx(child_stdout, tx_stdout.clone()).await; - let _ = process_utils::send_exit_status_to_tx(&mut child, tx_stdout).await; + let _ = process_utils::send_stdout_to_tx(child_stdout, tx_stdout.clone()).await.await; + let _ = process_utils::send_exit_status_to_tx(child, tx_stdout, true).await; }); tokio::spawn(async move { diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index c51d2d7..2f1bd5f 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -43,7 +43,7 @@ impl Runner { Ok(Self::new(config, child_processes)) } - pub async fn run(&self) -> AgentResult> { + pub async fn run(self) -> AgentResult> { let rx = match self.config.action { Action::Prepare => { self.agent @@ -52,13 +52,30 @@ impl Runner { } Action::Run => self.agent.run(Arc::clone(&self.child_processes)).await?, Action::PrepareAndRun => { - // should merge with run rx? - let _ = self + let (tx1, rx) = tokio::sync::mpsc::channel::(10); + let tx2 = tx1.clone(); + + // Merges the two receivers given as parameters and returns only one + let mut rx_prepare = self .agent .prepare(Arc::clone(&self.child_processes)) .await?; + tokio::spawn(async move { + while let Some(output) = rx_prepare.recv().await { + let _ = tx1.send(output).await; + } + }); + + tokio::spawn(async move { + let rx_run = self.agent.run(Arc::clone(&self.child_processes)).await; + if let Ok(mut rx_run) = rx_run { + while let Some(output) = rx_run.recv().await { + let _ = tx2.clone().send(output).await; + } + } + }); - self.agent.run(Arc::clone(&self.child_processes)).await? + rx } }; diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index 1dcaaa1..bcce6ac 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -23,14 +23,14 @@ impl WorkloadRunner for WorkloadRunnerService { let runner = Runner::new_from_execute_request(req.into_inner(), CHILD_PROCESSES.clone()) .map_err(|e| tonic::Status::internal(e.to_string()))?; - let mut run_rx = runner + let mut runner_rx = runner .run() .await .map_err(|e| tonic::Status::internal(e.to_string()))?; - let (tx, rx) = mpsc::channel(1); + let (tx, rx) = mpsc::channel(10); tokio::spawn(async move { - while let Some(agent_output) = run_rx.recv().await { + while let Some(agent_output) = runner_rx.recv().await { println!("Sending to the gRPC client: {:?}", agent_output); let _ = tx.send(Ok(agent_output.into())).await; } From 8b115e8f04b6d67c0af8ef13e59b6228b16624e9 Mon Sep 17 00:00:00 2001 From: Mateo Fernandez Date: Thu, 30 May 2024 03:37:17 +0200 Subject: [PATCH 3/7] feat(proto): update proto to keep the 'stage' field Signed-off-by: Mateo Fernandez --- proto/vmm.proto | 8 ++++---- src/api/src/service.rs | 33 +++++++++++++++++++++++++++++---- src/vmm/src/grpc/server.rs | 1 + 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/proto/vmm.proto b/proto/vmm.proto index 8244446..34ec802 100644 --- a/proto/vmm.proto +++ b/proto/vmm.proto @@ -26,9 +26,10 @@ message ExecuteResponse { DEBUG = 5; } - string stdout = 1; - string stderr = 2; - int32 exit_code = 3; + Stage stage = 1; + optional string stdout = 2; + optional string stderr = 3; + optional int32 exit_code = 4; } service VmmService { @@ -41,7 +42,6 @@ message RunVmmRequest { Language language = 2; string code = 3; LogLevel log_level = 4; - } message RunVmmResponse { diff --git a/src/api/src/service.rs b/src/api/src/service.rs index 243c287..9d4a2a4 100644 --- a/src/api/src/service.rs +++ b/src/api/src/service.rs @@ -1,5 +1,5 @@ use crate::client::{ - vmmorchestrator::{ExecuteResponse, RunVmmRequest, ShutdownVmRequest, ShutdownVmResponse}, + vmmorchestrator::{execute_response::Stage, ExecuteResponse, RunVmmRequest, ShutdownVmRequest, ShutdownVmResponse}, VmmClient, }; use actix_web::{post, web, HttpRequest, HttpResponse, Responder}; @@ -49,14 +49,39 @@ pub async fn run(req_body: web::Json) -> impl Responder { #[derive(Debug, Serialize)] pub struct ExecuteJsonResponse { - pub stdout: String, - pub stderr: String, - pub exit_code: i32, + pub stage: StageJson, + pub stdout: Option, + pub stderr: Option, + pub exit_code: Option, +} + +#[derive(Debug, Serialize)] +pub enum StageJson { + Pending, + Building, + Running, + Done, + Failed, + Debug, +} + +impl From for StageJson { + fn from(value: Stage) -> Self { + match value { + Stage::Pending => StageJson::Pending, + Stage::Building => StageJson::Building, + Stage::Running => StageJson::Running, + Stage::Done => StageJson::Done, + Stage::Failed => StageJson::Failed, + Stage::Debug => StageJson::Debug, + } + } } impl From for ExecuteJsonResponse { fn from(value: ExecuteResponse) -> Self { Self { + stage: Stage::from_i32(value.stage).unwrap().into(), stdout: value.stdout, stderr: value.stderr, exit_code: value.exit_code, diff --git a/src/vmm/src/grpc/server.rs b/src/vmm/src/grpc/server.rs index 898a60f..9bac2ad 100644 --- a/src/vmm/src/grpc/server.rs +++ b/src/vmm/src/grpc/server.rs @@ -250,6 +250,7 @@ impl VmmServiceTrait for VmmService { // Process each message as it arrives while let Some(response) = response_stream.message().await? { let vmm_response = vmmorchestrator::ExecuteResponse { + stage: response.stage, stdout: response.stdout, stderr: response.stderr, exit_code: response.exit_code, From 3914d4af3c29da37f7068c97c5c57698da298d2b Mon Sep 17 00:00:00 2001 From: Mateo Fernandez Date: Thu, 30 May 2024 11:59:11 +0200 Subject: [PATCH 4/7] fix(agent): wait for stdout/stderr to be sent fix(agent): add option on process_utils functions Signed-off-by: Mateo Fernandez --- src/agent/src/agents/mod.rs | 6 ++++-- src/agent/src/agents/rust.rs | 14 +++++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/agent/src/agents/mod.rs b/src/agent/src/agents/mod.rs index 65afd21..ff5212a 100644 --- a/src/agent/src/agents/mod.rs +++ b/src/agent/src/agents/mod.rs @@ -91,6 +91,7 @@ mod process_utils { pub async fn send_stdout_to_tx( stdout: ChildStdout, tx: mpsc::Sender, + stage: Option, ) -> JoinHandle<()> { tokio::spawn(async move { let reader = BufReader::new(stdout); @@ -99,7 +100,7 @@ mod process_utils { while let Ok(Some(line)) = reader_lines.next_line().await { let _ = tx .send(AgentOutput { - stage: Stage::Running, + stage: stage.unwrap_or(Stage::Running), stdout: Some(line), stderr: None, exit_code: None, @@ -113,6 +114,7 @@ mod process_utils { pub async fn send_stderr_to_tx( stderr: ChildStderr, tx: mpsc::Sender, + stage: Option, ) -> JoinHandle<()> { tokio::spawn(async move { let reader = BufReader::new(stderr); @@ -121,7 +123,7 @@ mod process_utils { while let Ok(Some(line)) = reader_lines.next_line().await { let _ = tx .send(AgentOutput { - stage: Stage::Running, + stage: stage.unwrap_or(Stage::Running), stdout: Some(line), stderr: None, exit_code: None, diff --git a/src/agent/src/agents/rust.rs b/src/agent/src/agents/rust.rs index 75bbf51..90aed45 100644 --- a/src/agent/src/agents/rust.rs +++ b/src/agent/src/agents/rust.rs @@ -1,4 +1,5 @@ use super::{Agent, AgentOutput}; +use crate::agent::execute_response::Stage; use crate::agents::process_utils; use crate::{workload, AgentError, AgentResult}; use async_trait::async_trait; @@ -117,7 +118,10 @@ impl Agent for RustAgent { let (tx, rx) = mpsc::channel(10); tokio::spawn(async move { - let _ = process_utils::send_stderr_to_tx(child.stderr.take().unwrap(), tx.clone()).await.await; + let stderr = child.stderr.take().unwrap(); + let _ = process_utils::send_stderr_to_tx(stderr, tx.clone(), Some(Stage::Building)) + .await + .await; let build_result = process_utils::send_exit_status_to_tx(child, tx, false).await; // if error in build, short-circuit the execution if build_result.is_err() { @@ -173,12 +177,16 @@ impl Agent for RustAgent { let tx_stderr = tx; tokio::spawn(async move { - let _ = process_utils::send_stdout_to_tx(child_stdout, tx_stdout.clone()).await.await; + let _ = process_utils::send_stdout_to_tx(child_stdout, tx_stdout.clone(), None) + .await + .await; let _ = process_utils::send_exit_status_to_tx(child, tx_stdout, true).await; }); tokio::spawn(async move { - process_utils::send_stderr_to_tx(child_stderr, tx_stderr).await; + let _ = process_utils::send_stderr_to_tx(child_stderr, tx_stderr, None) + .await + .await; }); Ok(rx) From aa805153d307e71f2bc190e5d0da9ece5c9cae96 Mon Sep 17 00:00:00 2001 From: Mateo Fernandez Date: Thu, 30 May 2024 11:59:25 +0200 Subject: [PATCH 5/7] fix(vmm): process message in a tokio thread to not block the current flow Signed-off-by: Mateo Fernandez --- src/vmm/src/grpc/server.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/vmm/src/grpc/server.rs b/src/vmm/src/grpc/server.rs index 9bac2ad..50a4e24 100644 --- a/src/vmm/src/grpc/server.rs +++ b/src/vmm/src/grpc/server.rs @@ -248,15 +248,17 @@ impl VmmServiceTrait for VmmService { let mut response_stream = client.execute(agent_request).await?; // Process each message as it arrives - while let Some(response) = response_stream.message().await? { - let vmm_response = vmmorchestrator::ExecuteResponse { - stage: response.stage, - stdout: response.stdout, - stderr: response.stderr, - exit_code: response.exit_code, - }; - tx.send(Ok(vmm_response)).await.unwrap(); - } + tokio::spawn(async move { + while let Ok(Some(response)) = response_stream.message().await { + let vmm_response = vmmorchestrator::ExecuteResponse { + stage: response.stage, + stdout: response.stdout, + stderr: response.stderr, + exit_code: response.exit_code, + }; + let _ = tx.send(Ok(vmm_response)).await; + } + }); } Err(e) => { error!("ERROR {:?}", e); From abe0f3de7842b7a347a2bedbc968a3caccc44fc3 Mon Sep 17 00:00:00 2001 From: Mateo Fernandez Date: Thu, 30 May 2024 14:27:21 +0200 Subject: [PATCH 6/7] fix(lint): to pass CI, cargo fmt Signed-off-by: Mateo Fernandez --- src/agent/src/lib.rs | 4 +++- src/api/src/service.rs | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/agent/src/lib.rs b/src/agent/src/lib.rs index 76281e3..48a4a5c 100644 --- a/src/agent/src/lib.rs +++ b/src/agent/src/lib.rs @@ -18,7 +18,9 @@ impl fmt::Display for AgentError { AgentError::OpenConfigFileError(e) => write!(f, "Failed to open config file: {}", e), AgentError::ParseConfigError(e) => write!(f, "Failed to parse config file: {}", e), AgentError::InvalidLanguage(e) => write!(f, "Invalid language: {}", e), - AgentError::BuildNotifier => write!(f, "Could not get notification from build notifier"), + AgentError::BuildNotifier => { + write!(f, "Could not get notification from build notifier") + } AgentError::BuildFailed => write!(f, "Build has failed"), } } diff --git a/src/api/src/service.rs b/src/api/src/service.rs index 9d4a2a4..4db3fd8 100644 --- a/src/api/src/service.rs +++ b/src/api/src/service.rs @@ -1,5 +1,8 @@ use crate::client::{ - vmmorchestrator::{execute_response::Stage, ExecuteResponse, RunVmmRequest, ShutdownVmRequest, ShutdownVmResponse}, + vmmorchestrator::{ + execute_response::Stage, ExecuteResponse, RunVmmRequest, ShutdownVmRequest, + ShutdownVmResponse, + }, VmmClient, }; use actix_web::{post, web, HttpRequest, HttpResponse, Responder}; From 64b43e64975d836429f5b5bdb612a256d2962871 Mon Sep 17 00:00:00 2001 From: Mateo Fernandez Date: Thu, 30 May 2024 17:35:27 +0200 Subject: [PATCH 7/7] doc(example): add println with loop count Signed-off-by: Mateo Fernandez --- src/cli/examples/main.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/cli/examples/main.rs b/src/cli/examples/main.rs index 772f207..06d6a9a 100644 --- a/src/cli/examples/main.rs +++ b/src/cli/examples/main.rs @@ -17,5 +17,10 @@ fn fibonacci(n: u32) -> u64 { fn main() { let n = 10; - println!("Fibonacci number at position {} is: {}", n, fibonacci(n)); + + for i in 0..=10 { + println!("🚀 Loop {}", i); + println!("Fibonacci number at position {} is: {}", n, fibonacci(n)); + std::thread::sleep(std::time::Duration::from_secs(1)); + } }