Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: continuous logs (prepare and run) #49

Merged
merged 7 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions proto/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ message ExecuteResponse {
DEBUG = 5;
}

string stdout = 1;
string stderr = 2;
Stage stage = 3;
int32 exit_code = 4;
Stage stage = 1;
optional string stdout = 2;
optional string stderr = 3;
optional int32 exit_code = 4;
}

message SignalRequest {
Expand Down
8 changes: 4 additions & 4 deletions proto/vmm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ message ExecuteResponse {
DEBUG = 5;
}

string stdout = 1;
string stderr = 2;
int32 exit_code = 3;
Stage stage = 1;
optional string stdout = 2;
optional string stderr = 3;
optional int32 exit_code = 4;
}

service VmmService {
Expand All @@ -41,7 +42,6 @@ message RunVmmRequest {
Language language = 2;
string code = 3;
LogLevel log_level = 4;

}

message RunVmmResponse {
Expand Down
56 changes: 42 additions & 14 deletions src/agent/src/agents/debug.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::AgentOutput;
use crate::agent::execute_response::Stage;
use crate::agents::Agent;
use crate::{workload, AgentResult};
use async_trait::async_trait;
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::Mutex;

pub struct DebugAgent {
Expand All @@ -20,7 +22,7 @@ impl From<workload::config::Config> for DebugAgent {

#[async_trait]
impl Agent for DebugAgent {
async fn prepare(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
async fn prepare(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<Receiver<AgentOutput>> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

println!("Function directory: {}", dir);
Expand All @@ -37,25 +39,51 @@ impl Agent for DebugAgent {
)
.expect("Unable to write debug.txt file");

Ok(AgentOutput {
exit_code: 0,
stdout: "Build successfully!".into(),
stderr: String::default(),
})
let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
let _ = tx
.send(AgentOutput {
stage: Stage::Building,
stdout: Some("Build successfully!".into()),
stderr: None,
exit_code: None,
})
.await;
});

Ok(rx)
}

async fn run(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
async fn run(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<Receiver<AgentOutput>> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

let content = std::fs::read_to_string(format!("{}/debug.txt", &dir))
.expect("Unable to read debug.txt file");
let content = std::fs::read_to_string(format!("{}/debug.txt", &dir));

std::fs::remove_dir_all(dir).expect("Unable to remove directory");

Ok(AgentOutput {
exit_code: 0,
stdout: content,
stderr: String::default(),
})
let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
if let Ok(content) = content {
let _ = tx
.send(AgentOutput {
stage: Stage::Done,
stdout: Some(content),
stderr: None,
exit_code: Some(0),
})
.await;
}

let _ = tx
.send(AgentOutput {
stage: Stage::Failed,
stdout: None,
stderr: Some("unable to read debug.txt".into()),
exit_code: Some(1),
})
.await;
});

Ok(rx)
}
}
143 changes: 136 additions & 7 deletions src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,46 @@
use crate::{AgentError, AgentResult};
use crate::{
agent::{execute_response::Stage, ExecuteResponse},
AgentError, AgentResult,
};
use async_trait::async_trait;
use serde::Deserialize;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};

#[cfg(feature = "debug-agent")]
pub mod debug;
pub mod rust;

#[derive(Debug, Clone)]
pub struct AgentOutput {
pub exit_code: i32,
pub stdout: String,
pub stderr: String,
pub stage: Stage,
pub stdout: Option<String>,
pub stderr: Option<String>,
pub exit_code: Option<i32>,
}

impl From<AgentOutput> for ExecuteResponse {
fn from(value: AgentOutput) -> Self {
Self {
stage: value.stage as i32,
stdout: value.stdout,
stderr: value.stderr,
exit_code: value.exit_code,
}
}
}

#[async_trait]
pub trait Agent {
async fn prepare(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
async fn run(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
async fn prepare(
&self,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> AgentResult<mpsc::Receiver<AgentOutput>>;
async fn run(
&self,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> AgentResult<mpsc::Receiver<AgentOutput>>;
}

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -55,3 +76,111 @@ impl TryFrom<&str> for Language {
}
}
}

mod process_utils {
use super::AgentOutput;
use crate::agent::execute_response::Stage;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{ChildStderr, ChildStdout},
sync::mpsc,
task::JoinHandle,
};

/// Spawn a tokio thread and send each line of `stdout`` to the `tx` given as a parameter.
pub async fn send_stdout_to_tx(
stdout: ChildStdout,
tx: mpsc::Sender<AgentOutput>,
stage: Option<Stage>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut reader_lines = reader.lines();

while let Ok(Some(line)) = reader_lines.next_line().await {
let _ = tx
.send(AgentOutput {
stage: stage.unwrap_or(Stage::Running),
stdout: Some(line),
stderr: None,
exit_code: None,
})
.await;
}
})
}

/// Same as [`send_stdout_to_tx`].
pub async fn send_stderr_to_tx(
stderr: ChildStderr,
tx: mpsc::Sender<AgentOutput>,
stage: Option<Stage>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut reader_lines = reader.lines();

while let Ok(Some(line)) = reader_lines.next_line().await {
let _ = tx
.send(AgentOutput {
stage: stage.unwrap_or(Stage::Running),
stdout: Some(line),
stderr: None,
exit_code: None,
})
.await;
}
})
}

/// Function to wait for the `child` to finish and send the result to the `tx` given as a parameter.
pub async fn send_exit_status_to_tx(
mut child: tokio::process::Child,
tx: mpsc::Sender<AgentOutput>,
send_done: bool,
) -> Result<(), ()> {
let exit_status = child.wait().await.map(|status| status.code());

match exit_status {
Ok(exit_code) => {
if exit_code != Some(0_i32) {
let _ = tx
.send(AgentOutput {
stage: Stage::Failed,
stdout: None,
stderr: None,
exit_code,
})
.await;

Err(())
} else {
if send_done {
let _ = tx
.send(AgentOutput {
stage: Stage::Done,
stdout: None,
stderr: None,
exit_code,
})
.await;
}

Ok(())
}
}
Err(e) => {
let _ = tx
.send(AgentOutput {
stage: Stage::Failed,
stdout: None,
stderr: Some(e.to_string()),
exit_code: None,
})
.await;

Err(())
}
}
}
}
Loading