Skip to content

Commit

Permalink
fix(agent): wait for stdout/stderr to be sent
Browse files Browse the repository at this point in the history
fix(agent): add option on process_utils functions

Signed-off-by: Mateo Fernandez <[email protected]>
  • Loading branch information
mfernd committed May 30, 2024
1 parent 8b115e8 commit 3914d4a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
6 changes: 4 additions & 2 deletions src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ mod process_utils {
pub async fn send_stdout_to_tx(
stdout: ChildStdout,
tx: mpsc::Sender<AgentOutput>,
stage: Option<Stage>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let reader = BufReader::new(stdout);
Expand All @@ -99,7 +100,7 @@ mod process_utils {
while let Ok(Some(line)) = reader_lines.next_line().await {
let _ = tx
.send(AgentOutput {
stage: Stage::Running,
stage: stage.unwrap_or(Stage::Running),
stdout: Some(line),
stderr: None,
exit_code: None,
Expand All @@ -113,6 +114,7 @@ mod process_utils {
pub async fn send_stderr_to_tx(
stderr: ChildStderr,
tx: mpsc::Sender<AgentOutput>,
stage: Option<Stage>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let reader = BufReader::new(stderr);
Expand All @@ -121,7 +123,7 @@ mod process_utils {
while let Ok(Some(line)) = reader_lines.next_line().await {
let _ = tx
.send(AgentOutput {
stage: Stage::Running,
stage: stage.unwrap_or(Stage::Running),
stdout: Some(line),
stderr: None,
exit_code: None,
Expand Down
14 changes: 11 additions & 3 deletions src/agent/src/agents/rust.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{Agent, AgentOutput};
use crate::agent::execute_response::Stage;
use crate::agents::process_utils;
use crate::{workload, AgentError, AgentResult};
use async_trait::async_trait;
Expand Down Expand Up @@ -117,7 +118,10 @@ impl Agent for RustAgent {

let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
let _ = process_utils::send_stderr_to_tx(child.stderr.take().unwrap(), tx.clone()).await.await;
let stderr = child.stderr.take().unwrap();
let _ = process_utils::send_stderr_to_tx(stderr, tx.clone(), Some(Stage::Building))
.await
.await;
let build_result = process_utils::send_exit_status_to_tx(child, tx, false).await;
// if error in build, short-circuit the execution
if build_result.is_err() {
Expand Down Expand Up @@ -173,12 +177,16 @@ impl Agent for RustAgent {
let tx_stderr = tx;

tokio::spawn(async move {
let _ = process_utils::send_stdout_to_tx(child_stdout, tx_stdout.clone()).await.await;
let _ = process_utils::send_stdout_to_tx(child_stdout, tx_stdout.clone(), None)
.await
.await;
let _ = process_utils::send_exit_status_to_tx(child, tx_stdout, true).await;
});

tokio::spawn(async move {
process_utils::send_stderr_to_tx(child_stderr, tx_stderr).await;
let _ = process_utils::send_stderr_to_tx(child_stderr, tx_stderr, None)
.await
.await;
});

Ok(rx)
Expand Down

0 comments on commit 3914d4a

Please sign in to comment.