Skip to content

Commit

Permalink
feat(agent): add channels for streaming logs of build and run stage
Browse files Browse the repository at this point in the history
fix(proto): use optional field for non-used output and exit_code

Signed-off-by: Mateo Fernandez <[email protected]>
  • Loading branch information
mfernd committed May 29, 2024
1 parent 1b67058 commit b4ca402
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 154 deletions.
8 changes: 4 additions & 4 deletions proto/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ message ExecuteResponse {
DEBUG = 5;
}

string stdout = 1;
string stderr = 2;
Stage stage = 3;
int32 exit_code = 4;
Stage stage = 1;
optional string stdout = 2;
optional string stderr = 3;
optional int32 exit_code = 4;
}

message SignalRequest {
Expand Down
56 changes: 42 additions & 14 deletions src/agent/src/agents/debug.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::AgentOutput;
use crate::agent::execute_response::Stage;
use crate::agents::Agent;
use crate::{workload, AgentResult};
use async_trait::async_trait;
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::Mutex;

pub struct DebugAgent {
Expand All @@ -20,7 +22,7 @@ impl From<workload::config::Config> for DebugAgent {

#[async_trait]
impl Agent for DebugAgent {
async fn prepare(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
async fn prepare(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<Receiver<AgentOutput>> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

println!("Function directory: {}", dir);
Expand All @@ -37,25 +39,51 @@ impl Agent for DebugAgent {
)
.expect("Unable to write debug.txt file");

Ok(AgentOutput {
exit_code: 0,
stdout: "Build successfully!".into(),
stderr: String::default(),
})
let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
let _ = tx
.send(AgentOutput {
stage: Stage::Building,
stdout: Some("Build successfully!".into()),
stderr: None,
exit_code: None,
})
.await;
});

Ok(rx)
}

async fn run(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
async fn run(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<Receiver<AgentOutput>> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

let content = std::fs::read_to_string(format!("{}/debug.txt", &dir))
.expect("Unable to read debug.txt file");
let content = std::fs::read_to_string(format!("{}/debug.txt", &dir));

std::fs::remove_dir_all(dir).expect("Unable to remove directory");

Ok(AgentOutput {
exit_code: 0,
stdout: content,
stderr: String::default(),
})
let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
if let Ok(content) = content {
let _ = tx
.send(AgentOutput {
stage: Stage::Done,
stdout: Some(content),
stderr: None,
exit_code: Some(0),
})
.await;
}

let _ = tx
.send(AgentOutput {
stage: Stage::Failed,
stdout: None,
stderr: Some("unable to read debug.txt".into()),
exit_code: Some(1),
})
.await;
});

Ok(rx)
}
}
122 changes: 115 additions & 7 deletions src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,46 @@
use crate::{AgentError, AgentResult};
use crate::{
agent::{execute_response::Stage, ExecuteResponse},
AgentError, AgentResult,
};
use async_trait::async_trait;
use serde::Deserialize;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};

#[cfg(feature = "debug-agent")]
pub mod debug;
pub mod rust;

#[derive(Debug, Clone)]
pub struct AgentOutput {
pub exit_code: i32,
pub stdout: String,
pub stderr: String,
pub stage: Stage,
pub stdout: Option<String>,
pub stderr: Option<String>,
pub exit_code: Option<i32>,
}

impl From<AgentOutput> for ExecuteResponse {
fn from(value: AgentOutput) -> Self {
Self {
stage: value.stage as i32,
stdout: value.stdout,
stderr: value.stderr,
exit_code: value.exit_code,
}
}
}

#[async_trait]
pub trait Agent {
async fn prepare(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
async fn run(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
async fn prepare(
&self,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> AgentResult<mpsc::Receiver<AgentOutput>>;
async fn run(
&self,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> AgentResult<mpsc::Receiver<AgentOutput>>;
}

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -55,3 +76,90 @@ impl TryFrom<&str> for Language {
}
}
}

mod process_utils {
use super::AgentOutput;
use crate::agent::execute_response::Stage;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{ChildStderr, ChildStdout},
sync::mpsc,
};

/// Spawn a tokio thread and send each line of `stdout`` to the `tx` given as a parameter.
pub async fn send_stdout_to_tx(stdout: ChildStdout, tx: mpsc::Sender<AgentOutput>) {
tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut reader_lines = reader.lines();

while let Ok(Some(line)) = reader_lines.next_line().await {
println!("Got line from stdout: {:?}", line);

let _ = tx
.send(AgentOutput {
stage: Stage::Running,
stdout: Some(line),
stderr: None,
exit_code: None,
})
.await;
}
});
}

/// Same as [`send_stdout_to_tx`].
pub async fn send_stderr_to_tx(stderr: ChildStderr, tx: mpsc::Sender<AgentOutput>) {
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut reader_lines = reader.lines();

while let Ok(Some(line)) = reader_lines.next_line().await {
println!("Got line from stderr: {:?}", line);

let _ = tx
.send(AgentOutput {
stage: Stage::Running,
stdout: Some(line),
stderr: None,
exit_code: None,
})
.await;
}
});
}

/// Function to wait for the `child` to finish and send the result to the `tx` given as a parameter.
pub async fn send_exit_status_to_tx(
child: &mut tokio::process::Child,
tx: mpsc::Sender<AgentOutput>,
) -> Result<(), ()> {
let exit_status = child.wait().await;

match exit_status {
Ok(code) => {
let _ = tx
.send(AgentOutput {
stage: Stage::Done,
stdout: None,
stderr: None,
exit_code: code.code(),
})
.await;

Ok(())
}
Err(e) => {
let _ = tx
.send(AgentOutput {
stage: Stage::Failed,
stdout: None,
stderr: Some(e.to_string()),
exit_code: None,
})
.await;

Err(())
}
}
}
}
Loading

0 comments on commit b4ca402

Please sign in to comment.