Skip to content

Commit

Permalink
feat(agent): merge 'prepare' and 'run' streams
Browse files Browse the repository at this point in the history
fix(agent): prevent 'run' if 'prepare' failed

Signed-off-by: Mateo Fernandez <[email protected]>
  • Loading branch information
mfernd committed May 30, 2024
1 parent b4ca402 commit e787188
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 47 deletions.
61 changes: 40 additions & 21 deletions src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,19 @@ mod process_utils {
io::{AsyncBufReadExt, BufReader},
process::{ChildStderr, ChildStdout},
sync::mpsc,
task::JoinHandle,
};

/// Spawn a tokio thread and send each line of `stdout`` to the `tx` given as a parameter.
pub async fn send_stdout_to_tx(stdout: ChildStdout, tx: mpsc::Sender<AgentOutput>) {
pub async fn send_stdout_to_tx(
stdout: ChildStdout,
tx: mpsc::Sender<AgentOutput>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut reader_lines = reader.lines();

while let Ok(Some(line)) = reader_lines.next_line().await {
println!("Got line from stdout: {:?}", line);

let _ = tx
.send(AgentOutput {
stage: Stage::Running,
Expand All @@ -104,18 +106,19 @@ mod process_utils {
})
.await;
}
});
})
}

/// Same as [`send_stdout_to_tx`].
pub async fn send_stderr_to_tx(stderr: ChildStderr, tx: mpsc::Sender<AgentOutput>) {
pub async fn send_stderr_to_tx(
stderr: ChildStderr,
tx: mpsc::Sender<AgentOutput>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut reader_lines = reader.lines();

while let Ok(Some(line)) = reader_lines.next_line().await {
println!("Got line from stderr: {:?}", line);

let _ = tx
.send(AgentOutput {
stage: Stage::Running,
Expand All @@ -125,28 +128,44 @@ mod process_utils {
})
.await;
}
});
})
}

/// Function to wait for the `child` to finish and send the result to the `tx` given as a parameter.
pub async fn send_exit_status_to_tx(
child: &mut tokio::process::Child,
mut child: tokio::process::Child,
tx: mpsc::Sender<AgentOutput>,
send_done: bool,
) -> Result<(), ()> {
let exit_status = child.wait().await;
let exit_status = child.wait().await.map(|status| status.code());

match exit_status {
Ok(code) => {
let _ = tx
.send(AgentOutput {
stage: Stage::Done,
stdout: None,
stderr: None,
exit_code: code.code(),
})
.await;

Ok(())
Ok(exit_code) => {
if exit_code != Some(0_i32) {
let _ = tx
.send(AgentOutput {
stage: Stage::Failed,
stdout: None,
stderr: None,
exit_code,
})
.await;

Err(())
} else {
if send_done {
let _ = tx
.send(AgentOutput {
stage: Stage::Done,
stdout: None,
stderr: None,
exit_code,
})
.await;
}

Ok(())
}
}
Err(e) => {
let _ = tx
Expand Down
43 changes: 24 additions & 19 deletions src/agent/src/agents/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,29 @@ impl Agent for RustAgent {
let is_release = self.rust_config.build.release;
let tx_build_notifier = self.build_notifier.clone();

let (tx, rx) = mpsc::channel(1);
let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
process_utils::send_stderr_to_tx(child.stderr.take().unwrap(), tx.clone()).await;
let build_result = process_utils::send_exit_status_to_tx(&mut child, tx).await;

// Once finished: copy the binary to /tmp
// We could imagine a more complex scenario where we would put this in an artifact repository (like S3)
let binary_path = match is_release {
true => format!("{}/target/release/{}", &function_dir, workload_name),
false => format!("{}/target/debug/{}", &function_dir, workload_name),
};

std::fs::copy(binary_path, format!("/tmp/{}", workload_name))
.expect("Unable to copy binary");
let _ = process_utils::send_stderr_to_tx(child.stderr.take().unwrap(), tx.clone()).await.await;
let build_result = process_utils::send_exit_status_to_tx(child, tx, false).await;
// if error in build, short-circuit the execution
if build_result.is_err() {
let _ = tx_build_notifier.send(Err(()));
} else {
// Once finished: copy the binary to /tmp
// We could imagine a more complex scenario where we would put this in an artifact repository (like S3)
let binary_path = match is_release {
true => format!("{}/target/release/{}", &function_dir, workload_name),
false => format!("{}/target/debug/{}", &function_dir, workload_name),
};

std::fs::copy(binary_path, format!("/tmp/{}", workload_name))
.expect("Unable to copy binary");

// notify when build is done
let _ = tx_build_notifier.send(build_result);
}

std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory");

// notify when build is done
let _ = tx_build_notifier.send(build_result);
});

Ok(rx)
Expand All @@ -151,6 +155,7 @@ impl Agent for RustAgent {
.map_err(|_| AgentError::BuildNotifier)?
.map_err(|_| AgentError::BuildFailed)?;

println!("Starting run()");
let mut child = Command::new(format!("/tmp/{}", self.workload_config.workload_name))
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand All @@ -161,15 +166,15 @@ impl Agent for RustAgent {
child_processes.lock().await.insert(child.id().unwrap());
}

let (tx, rx) = mpsc::channel(1);
let (tx, rx) = mpsc::channel(10);
let child_stdout = child.stdout.take().unwrap();
let tx_stdout = tx.clone();
let child_stderr = child.stderr.take().unwrap();
let tx_stderr = tx;

tokio::spawn(async move {
process_utils::send_stdout_to_tx(child_stdout, tx_stdout.clone()).await;
let _ = process_utils::send_exit_status_to_tx(&mut child, tx_stdout).await;
let _ = process_utils::send_stdout_to_tx(child_stdout, tx_stdout.clone()).await.await;
let _ = process_utils::send_exit_status_to_tx(child, tx_stdout, true).await;
});

tokio::spawn(async move {
Expand Down
25 changes: 21 additions & 4 deletions src/agent/src/workload/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Runner {
Ok(Self::new(config, child_processes))
}

pub async fn run(&self) -> AgentResult<Receiver<AgentOutput>> {
pub async fn run(self) -> AgentResult<Receiver<AgentOutput>> {
let rx = match self.config.action {
Action::Prepare => {
self.agent
Expand All @@ -52,13 +52,30 @@ impl Runner {
}
Action::Run => self.agent.run(Arc::clone(&self.child_processes)).await?,
Action::PrepareAndRun => {
// should merge with run rx?
let _ = self
let (tx1, rx) = tokio::sync::mpsc::channel::<AgentOutput>(10);
let tx2 = tx1.clone();

// Merges the two receivers given as parameters and returns only one
let mut rx_prepare = self
.agent
.prepare(Arc::clone(&self.child_processes))
.await?;
tokio::spawn(async move {
while let Some(output) = rx_prepare.recv().await {
let _ = tx1.send(output).await;
}
});

tokio::spawn(async move {
let rx_run = self.agent.run(Arc::clone(&self.child_processes)).await;
if let Ok(mut rx_run) = rx_run {
while let Some(output) = rx_run.recv().await {
let _ = tx2.clone().send(output).await;
}
}
});

self.agent.run(Arc::clone(&self.child_processes)).await?
rx
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/agent/src/workload/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ impl WorkloadRunner for WorkloadRunnerService {
let runner = Runner::new_from_execute_request(req.into_inner(), CHILD_PROCESSES.clone())
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let mut run_rx = runner
let mut runner_rx = runner
.run()
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let (tx, rx) = mpsc::channel(1);
let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
while let Some(agent_output) = run_rx.recv().await {
while let Some(agent_output) = runner_rx.recv().await {
println!("Sending to the gRPC client: {:?}", agent_output);
let _ = tx.send(Ok(agent_output.into())).await;
}
Expand Down

0 comments on commit e787188

Please sign in to comment.