Skip to content

Commit

Permalink
feat: continuous logs (prepare and run) (#49)
Browse files Browse the repository at this point in the history
* feat(agent): add channels for streaming logs of build and run stage

fix(proto): use optional field for non-used output and exit_code

Signed-off-by: Mateo Fernandez <[email protected]>

* feat(agent): merge 'prepare' and 'run' streams

fix(agent): prevent 'run' if 'prepare' failed

Signed-off-by: Mateo Fernandez <[email protected]>

* feat(proto): update proto to keep the 'stage' field

Signed-off-by: Mateo Fernandez <[email protected]>

* fix(agent): wait for stdout/stderr to be sent

fix(agent): add option on process_utils functions

Signed-off-by: Mateo Fernandez <[email protected]>

* fix(vmm): process message in a tokio thread to not block the current flow

Signed-off-by: Mateo Fernandez <[email protected]>

* fix(lint): to pass CI, cargo fmt

Signed-off-by: Mateo Fernandez <[email protected]>

* doc(example): add println with loop count

Signed-off-by: Mateo Fernandez <[email protected]>

---------

Signed-off-by: Mateo Fernandez <[email protected]>
  • Loading branch information
mfernd authored May 30, 2024
1 parent 1b67058 commit 025a997
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 174 deletions.
8 changes: 4 additions & 4 deletions proto/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ message ExecuteResponse {
DEBUG = 5;
}

string stdout = 1;
string stderr = 2;
Stage stage = 3;
int32 exit_code = 4;
Stage stage = 1;
optional string stdout = 2;
optional string stderr = 3;
optional int32 exit_code = 4;
}

message SignalRequest {
Expand Down
8 changes: 4 additions & 4 deletions proto/vmm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ message ExecuteResponse {
DEBUG = 5;
}

string stdout = 1;
string stderr = 2;
int32 exit_code = 3;
Stage stage = 1;
optional string stdout = 2;
optional string stderr = 3;
optional int32 exit_code = 4;
}

service VmmService {
Expand All @@ -41,7 +42,6 @@ message RunVmmRequest {
Language language = 2;
string code = 3;
LogLevel log_level = 4;

}

message RunVmmResponse {
Expand Down
56 changes: 42 additions & 14 deletions src/agent/src/agents/debug.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::AgentOutput;
use crate::agent::execute_response::Stage;
use crate::agents::Agent;
use crate::{workload, AgentResult};
use async_trait::async_trait;
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::Mutex;

pub struct DebugAgent {
Expand All @@ -20,7 +22,7 @@ impl From<workload::config::Config> for DebugAgent {

#[async_trait]
impl Agent for DebugAgent {
async fn prepare(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
async fn prepare(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<Receiver<AgentOutput>> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

println!("Function directory: {}", dir);
Expand All @@ -37,25 +39,51 @@ impl Agent for DebugAgent {
)
.expect("Unable to write debug.txt file");

Ok(AgentOutput {
exit_code: 0,
stdout: "Build successfully!".into(),
stderr: String::default(),
})
let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
let _ = tx
.send(AgentOutput {
stage: Stage::Building,
stdout: Some("Build successfully!".into()),
stderr: None,
exit_code: None,
})
.await;
});

Ok(rx)
}

async fn run(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
async fn run(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<Receiver<AgentOutput>> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

let content = std::fs::read_to_string(format!("{}/debug.txt", &dir))
.expect("Unable to read debug.txt file");
let content = std::fs::read_to_string(format!("{}/debug.txt", &dir));

std::fs::remove_dir_all(dir).expect("Unable to remove directory");

Ok(AgentOutput {
exit_code: 0,
stdout: content,
stderr: String::default(),
})
let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
if let Ok(content) = content {
let _ = tx
.send(AgentOutput {
stage: Stage::Done,
stdout: Some(content),
stderr: None,
exit_code: Some(0),
})
.await;
}

let _ = tx
.send(AgentOutput {
stage: Stage::Failed,
stdout: None,
stderr: Some("unable to read debug.txt".into()),
exit_code: Some(1),
})
.await;
});

Ok(rx)
}
}
143 changes: 136 additions & 7 deletions src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,46 @@
use crate::{AgentError, AgentResult};
use crate::{
agent::{execute_response::Stage, ExecuteResponse},
AgentError, AgentResult,
};
use async_trait::async_trait;
use serde::Deserialize;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};

#[cfg(feature = "debug-agent")]
pub mod debug;
pub mod rust;

#[derive(Debug, Clone)]
pub struct AgentOutput {
pub exit_code: i32,
pub stdout: String,
pub stderr: String,
pub stage: Stage,
pub stdout: Option<String>,
pub stderr: Option<String>,
pub exit_code: Option<i32>,
}

impl From<AgentOutput> for ExecuteResponse {
fn from(value: AgentOutput) -> Self {
Self {
stage: value.stage as i32,
stdout: value.stdout,
stderr: value.stderr,
exit_code: value.exit_code,
}
}
}

#[async_trait]
pub trait Agent {
async fn prepare(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
async fn run(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
async fn prepare(
&self,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> AgentResult<mpsc::Receiver<AgentOutput>>;
async fn run(
&self,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> AgentResult<mpsc::Receiver<AgentOutput>>;
}

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -55,3 +76,111 @@ impl TryFrom<&str> for Language {
}
}
}

mod process_utils {
use super::AgentOutput;
use crate::agent::execute_response::Stage;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{ChildStderr, ChildStdout},
sync::mpsc,
task::JoinHandle,
};

/// Spawn a tokio thread and send each line of `stdout`` to the `tx` given as a parameter.
pub async fn send_stdout_to_tx(
stdout: ChildStdout,
tx: mpsc::Sender<AgentOutput>,
stage: Option<Stage>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut reader_lines = reader.lines();

while let Ok(Some(line)) = reader_lines.next_line().await {
let _ = tx
.send(AgentOutput {
stage: stage.unwrap_or(Stage::Running),
stdout: Some(line),
stderr: None,
exit_code: None,
})
.await;
}
})
}

/// Same as [`send_stdout_to_tx`].
pub async fn send_stderr_to_tx(
stderr: ChildStderr,
tx: mpsc::Sender<AgentOutput>,
stage: Option<Stage>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut reader_lines = reader.lines();

while let Ok(Some(line)) = reader_lines.next_line().await {
let _ = tx
.send(AgentOutput {
stage: stage.unwrap_or(Stage::Running),
stdout: Some(line),
stderr: None,
exit_code: None,
})
.await;
}
})
}

/// Function to wait for the `child` to finish and send the result to the `tx` given as a parameter.
pub async fn send_exit_status_to_tx(
mut child: tokio::process::Child,
tx: mpsc::Sender<AgentOutput>,
send_done: bool,
) -> Result<(), ()> {
let exit_status = child.wait().await.map(|status| status.code());

match exit_status {
Ok(exit_code) => {
if exit_code != Some(0_i32) {
let _ = tx
.send(AgentOutput {
stage: Stage::Failed,
stdout: None,
stderr: None,
exit_code,
})
.await;

Err(())
} else {
if send_done {
let _ = tx
.send(AgentOutput {
stage: Stage::Done,
stdout: None,
stderr: None,
exit_code,
})
.await;
}

Ok(())
}
}
Err(e) => {
let _ = tx
.send(AgentOutput {
stage: Stage::Failed,
stdout: None,
stderr: Some(e.to_string()),
exit_code: None,
})
.await;

Err(())
}
}
}
}
Loading

0 comments on commit 025a997

Please sign in to comment.