Skip to content

Commit

Permalink
refactor: use async_trait instead of Pin<Box<Future<>>>
Browse files Browse the repository at this point in the history
Signed-off-by: Mateo Fernandez <[email protected]>
  • Loading branch information
mfernd committed May 3, 2024
1 parent dd8822b commit 795e553
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 149 deletions.
5 changes: 3 additions & 2 deletions src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ name = "agent"
path = "src/lib.rs"

[dependencies]
async-trait = "0.1.80"
clap = { version = "4.5.4", features = ["derive", "env"] }
nix = { version = "0.28.0", features = ["signal"] }
once_cell = "1.19.0"
prost = "0.12.4"
rand = "0.8.5"
serde = { version = "1.0.197", features = ["derive"] }
tokio = { version = "1.37.0", features = ["full"] }
tokio-stream = "0.1.15"
toml = "0.8.12"
tonic = "0.11"
nix = { version = "0.28.0", features = ["signal"] }
once_cell = "1.19.0"

[build-dependencies]
tonic-build = "0.11"
Expand Down
74 changes: 32 additions & 42 deletions src/agent/src/agents/debug.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use super::AgentOutput;
use crate::agents::Agent;
use crate::{workload, AgentResult};
use async_trait::async_trait;
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::Mutex;
Expand All @@ -19,53 +18,44 @@ impl From<workload::config::Config> for DebugAgent {
}
}

#[async_trait]
impl Agent for DebugAgent {
fn prepare(
&self,
_: &Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>> {
Box::pin(async {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

println!("Function directory: {}", dir);

create_dir_all(&dir).expect("Unable to create directory");

std::fs::write(
format!("{}/debug.txt", &dir),
format!(
"Debug agent for {} - written at {:?}",
self.workload_config.workload_name,
SystemTime::now(),
),
)
.expect("Unable to write debug.txt file");

Ok(AgentOutput {
exit_code: 0,
stdout: "Build successfully!".into(),
stderr: String::default(),
})
async fn prepare(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

println!("Function directory: {}", dir);

create_dir_all(&dir).expect("Unable to create directory");

std::fs::write(
format!("{}/debug.txt", &dir),
format!(
"Debug agent for {} - written at {:?}",
self.workload_config.workload_name,
SystemTime::now(),
),
)
.expect("Unable to write debug.txt file");

Ok(AgentOutput {
exit_code: 0,
stdout: "Build successfully!".into(),
stderr: String::default(),
})
}

fn run(
&self,
_: &Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>> {
Box::pin(async {
let dir = format!("/tmp/{}", self.workload_config.workload_name);
async fn run(&self, _: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let dir = format!("/tmp/{}", self.workload_config.workload_name);

let content = std::fs::read_to_string(format!("{}/debug.txt", &dir))
.expect("Unable to read debug.txt file");
let content = std::fs::read_to_string(format!("{}/debug.txt", &dir))
.expect("Unable to read debug.txt file");

std::fs::remove_dir_all(dir).expect("Unable to remove directory");
std::fs::remove_dir_all(dir).expect("Unable to remove directory");

Ok(AgentOutput {
exit_code: 0,
stdout: content,
stderr: String::default(),
})
Ok(AgentOutput {
exit_code: 0,
stdout: content,
stderr: String::default(),
})
}
}
14 changes: 4 additions & 10 deletions src/agent/src/agents/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::{AgentError, AgentResult};
use async_trait::async_trait;
use serde::Deserialize;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;

Expand All @@ -17,15 +16,10 @@ pub struct AgentOutput {
pub stderr: String,
}

#[async_trait]
pub trait Agent {
fn prepare<'a>(
&'a self,
child_processes: &'a Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>>;
fn run<'a>(
&'a self,
child_processes: &'a Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>>;
async fn prepare(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
async fn run(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput>;
}

#[derive(Debug, Clone, Deserialize)]
Expand Down
172 changes: 83 additions & 89 deletions src/agent/src/agents/rust.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use super::{Agent, AgentOutput};
use crate::{workload, AgentError, AgentResult};
use async_trait::async_trait;
use rand::distributions::{Alphanumeric, DistString};
use serde::Deserialize;
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::process::Command;
use tokio::sync::Mutex;
Expand All @@ -29,8 +28,8 @@ pub struct RustAgent {
impl RustAgent {
async fn build(
&self,
function_dir: &String,
child_processes: &Arc<Mutex<HashSet<u32>>>,
function_dir: &str,
child_processes: Arc<Mutex<HashSet<u32>>>,
) -> AgentResult<AgentOutput> {
if self.rust_config.build.release {
let child = Command::new("cargo")
Expand All @@ -40,7 +39,9 @@ impl RustAgent {
.spawn()
.expect("Failed to build function");

child_processes.lock().await.insert(child.id().unwrap());
{
child_processes.lock().await.insert(child.id().unwrap());
}

let output = child
.wait_with_output()
Expand Down Expand Up @@ -85,107 +86,100 @@ impl From<workload::config::Config> for RustAgent {
}
}

#[async_trait]
impl Agent for RustAgent {
fn prepare<'a>(
&'a self,
child_processes: &'a Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>> {
Box::pin(async {
let function_dir = format!(
"/tmp/{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
);

println!("Function directory: {}", function_dir);

create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory");

std::fs::write(
format!("{}/src/main.rs", &function_dir),
&self.workload_config.code,
)
.expect("Unable to write main.rs file");

let cargo_toml = format!(
r#"
async fn prepare(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let function_dir = format!(
"/tmp/{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
);

println!("Function directory: {}", function_dir);

create_dir_all(format!("{}/src", &function_dir)).expect("Unable to create directory");

std::fs::write(
format!("{}/src/main.rs", &function_dir),
&self.workload_config.code,
)
.expect("Unable to write main.rs file");

let cargo_toml = format!(
r#"
[package]
name = "{}"
version = "0.1.0"
edition = "2018"
edition = "2021"
"#,
self.workload_config.workload_name
);
self.workload_config.workload_name
);

std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml)
.expect("Unable to write Cargo.toml file");
std::fs::write(format!("{}/Cargo.toml", &function_dir), cargo_toml)
.expect("Unable to write Cargo.toml file");

let result = self.build(&function_dir, child_processes).await?;
let result = self.build(&function_dir, child_processes).await?;

if result.exit_code != 0 {
println!("Build failed: {:?}", result);
return Err(AgentError::BuildFailed(AgentOutput {
exit_code: result.exit_code,
stdout: result.stdout,
stderr: result.stderr,
}));
}

// Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3)
let binary_path = match self.rust_config.build.release {
true => format!(
"{}/target/release/{}",
&function_dir, self.workload_config.workload_name
),
false => format!(
"{}/target/debug/{}",
&function_dir, self.workload_config.workload_name
),
};

std::fs::copy(
binary_path,
format!("/tmp/{}", self.workload_config.workload_name),
)
.expect("Unable to copy binary");

std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory");

Ok(AgentOutput {
if result.exit_code != 0 {
println!("Build failed: {:?}", result);
return Err(AgentError::BuildFailed(AgentOutput {
exit_code: result.exit_code,
stdout: "Build successful".to_string(),
stderr: "".to_string(),
})
stdout: result.stdout,
stderr: result.stderr,
}));
}

// Copy the binary to /tmp, we could imagine a more complex scenario where we would put this in an artifact repository (like S3)
let binary_path = match self.rust_config.build.release {
true => format!(
"{}/target/release/{}",
&function_dir, self.workload_config.workload_name
),
false => format!(
"{}/target/debug/{}",
&function_dir, self.workload_config.workload_name
),
};

std::fs::copy(
binary_path,
format!("/tmp/{}", self.workload_config.workload_name),
)
.expect("Unable to copy binary");

std::fs::remove_dir_all(&function_dir).expect("Unable to remove directory");

Ok(AgentOutput {
exit_code: result.exit_code,
stdout: "Build successful".to_string(),
stderr: "".to_string(),
})
}

fn run<'a>(
&'a self,
child_processes: &'a Arc<Mutex<HashSet<u32>>>,
) -> Pin<Box<dyn Future<Output = AgentResult<AgentOutput>> + Send + '_>> {
Box::pin(async {
let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name))
.spawn()
.expect("Failed to run function");
async fn run(&self, child_processes: Arc<Mutex<HashSet<u32>>>) -> AgentResult<AgentOutput> {
let child = Command::new(format!("/tmp/{}", self.workload_config.workload_name))
.spawn()
.expect("Failed to run function");

{
child_processes.lock().await.insert(child.id().unwrap());
}

let output = child
.wait_with_output()
.await
.expect("Failed to wait on child");
let output = child
.wait_with_output()
.await
.expect("Failed to wait on child");

let agent_output = AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(),
};
let agent_output = AgentOutput {
exit_code: output.status.code().unwrap(),
stdout: std::str::from_utf8(&output.stdout).unwrap().to_string(),
stderr: std::str::from_utf8(&output.stderr).unwrap().to_string(),
};

if !output.status.success() {
println!("Run failed: {:?}", agent_output);
return Err(AgentError::BuildFailed(agent_output));
}
if !output.status.success() {
println!("Run failed: {:?}", agent_output);
return Err(AgentError::BuildFailed(agent_output));
}

Ok(agent_output)
})
Ok(agent_output)
}
}
Loading

0 comments on commit 795e553

Please sign in to comment.