Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent/grpc): add gRPC server #27

Merged
merged 9 commits into from
Apr 25, 2024
Merged
31 changes: 31 additions & 0 deletions proto/agent.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
syntax = "proto3";
package cloudlet.agent;
import "google/protobuf/empty.proto";

message ExecuteRequest {}

message ExecuteResponse {
enum Stage {
PENDING = 0;
BUILDING = 1;
RUNNING = 2;
DONE = 3;
FAILED = 4;
DEBUG = 5;
}

string stdout = 1;
string stderr = 2;
int32 exit_code = 3;
}

message SignalRequest {
enum Signal {
KILL = 0;
}
}

service WorkloadRunner {
rpc Execute(ExecuteRequest) returns (stream ExecuteResponse) {}
rpc Signal(SignalRequest) returns (google.protobuf.Empty) {}
}
10 changes: 9 additions & 1 deletion src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,17 @@ path = "src/lib.rs"

[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
prost = "0.12.4"
rand = "0.8.5"
serde = { version = "1.0.197", features = ["derive"] }
tokio = { version = "1.37.0", features = ["full"] }
tokio-stream = "0.1.15"
toml = "0.8.12"
tonic = "0.11"

[build-dependencies]
tonic-build = "0.11"

[features]
debug-agent = []
debug-agent = []

4 changes: 4 additions & 0 deletions src/agent/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../../proto/agent.proto")?;
Ok(())
}
4 changes: 4 additions & 0 deletions src/agent/examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ workload-name = "fibonacci"
language = "rust"
action = "prepare-and-run"

[server]
address = "localhost"
port = 50051

[build]
source-code-path = "CHANGEME/cloudlet/src/agent/examples/main.rs"
release = true
20 changes: 16 additions & 4 deletions src/agent/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use agents::AgentOutput;
use std::fmt;

mod agents;
pub mod workload {
pub mod config;
pub mod runner;
}
pub mod workload;

#[derive(Debug)]
pub enum AgentError {
Expand All @@ -13,4 +11,18 @@ pub enum AgentError {
BuildFailed(AgentOutput),
}

impl fmt::Display for AgentError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AgentError::OpenConfigFileError(e) => write!(f, "Failed to open config file: {}", e),
AgentError::ParseConfigError(e) => write!(f, "Failed to parse config file: {}", e),
AgentError::BuildFailed(output) => write!(f, "Build failed: {:?}", output),
}
}
}

pub type AgentResult<T> = Result<T, AgentError>;

pub mod agent {
tonic::include_proto!("cloudlet.agent");
}
28 changes: 24 additions & 4 deletions src/agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
use agent::workload::{config::Config, runner::Runner};
use agent::{
agent::workload_runner_server::WorkloadRunnerServer,
workload::{config::Config, runner::Runner, service::WorkloadRunnerService},
};
use clap::Parser;
use std::path::PathBuf;
use std::{net::ToSocketAddrs, path::PathBuf};
use tonic::transport::Server;

#[derive(Debug, Parser)]
struct Args {
#[clap(short, long, default_value = "/etc/cloudlet/agent/config.toml")]
config: PathBuf,
}

fn main() {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

let config = Config::from_file(&args.config).unwrap();

let bind_address = format!("{}:{}", config.server.address, config.server.port)
.to_socket_addrs()
.unwrap()
.next()
.unwrap();

let runner = Runner::new(config);

runner.run().unwrap();
let server = WorkloadRunnerService::new(runner);

Server::builder()
.add_service(WorkloadRunnerServer::new(server))
.serve(bind_address)
.await
.unwrap();

Ok(())
}
9 changes: 9 additions & 0 deletions src/agent/src/workload/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ use crate::{agents::Language, AgentError, AgentResult};
use serde::Deserialize;
use std::path::PathBuf;

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Server {
pub address: String,
pub port: u16,
}

/// Generic agent configuration.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -12,6 +19,8 @@ pub struct Config {
pub language: Language,
/// Action to perform.
pub action: Action,
/// Server configuration.
pub server: Server,
/// Rest of the configuration as a string.
#[serde(skip)]
pub config_string: String,
Expand Down
3 changes: 3 additions & 0 deletions src/agent/src/workload/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod config;
pub mod runner;
pub mod service;
14 changes: 8 additions & 6 deletions src/agent/src/workload/runner.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use super::config::{Action, Config};
use crate::{
agents::{rust, Agent, Language},
agents::{rust, Agent, AgentOutput, Language},
workload::config::Action,
AgentResult,
};

#[cfg(feature = "debug-agent")]
use crate::agents::debug;

use super::config::Config;

/// Runner for a workload.
/// Will execute the workload based on the inner agent (language).
pub struct Runner {
config: Config,
agent: Box<dyn Agent>,
agent: Box<dyn Agent + Sync + Send>,
}

impl Runner {
pub fn new(config: Config) -> Self {
let agent: Box<dyn Agent> = match config.language {
let agent: Box<dyn Agent + Sync + Send> = match config.language {
Language::Rust => Box::new(rust::RustAgent::from(config.clone())),
#[cfg(feature = "debug-agent")]
Language::Debug => Box::new(debug::DebugAgent::from(config.clone())),
Expand All @@ -25,7 +27,7 @@ impl Runner {
Runner { config, agent }
}

pub fn run(&self) -> AgentResult<()> {
pub fn run(&self) -> AgentResult<AgentOutput> {
mfernd marked this conversation as resolved.
Show resolved Hide resolved
let result = match self.config.action {
Action::Prepare => self.agent.prepare()?,
Action::Run => self.agent.run()?,
Expand All @@ -38,6 +40,6 @@ impl Runner {

println!("Result: {:?}", result);

Ok(())
Ok(result)
}
}
58 changes: 58 additions & 0 deletions src/agent/src/workload/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use super::runner::Runner;
use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest};
use agent::workload_runner_server::WorkloadRunner;
use std::{process, sync::Arc};
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response};

type Result<T> = std::result::Result<Response<T>, tonic::Status>;

pub struct WorkloadRunnerService {
runner: Arc<Mutex<Runner>>,
}

impl WorkloadRunnerService {
pub fn new(runner: Runner) -> Self {
WorkloadRunnerService {
runner: Arc::new(Mutex::new(runner)),
}
}
}

#[tonic::async_trait]
impl WorkloadRunner for WorkloadRunnerService {
type ExecuteStream = ReceiverStream<std::result::Result<ExecuteResponse, tonic::Status>>;

async fn execute(&self, _: Request<ExecuteRequest>) -> Result<Self::ExecuteStream> {
let (tx, rx) = tokio::sync::mpsc::channel(4);

// We assume there's only one request at a time
let runner = self
.runner
.try_lock()
.map_err(|e| tonic::Status::unavailable(format!("Runner is busy: {:?}", e)))?;

let res = runner
.run()
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let _ = tx
.send(Ok(ExecuteResponse {
stdout: res.stdout,
stderr: res.stderr,
exit_code: res.exit_code,
}))
.await
.map_err(|e| {
println!("Failed to send response: {:?}", e);
tonic::Status::internal("Failed to send response")
})?;

Ok(Response::new(ReceiverStream::new(rx)))
}

async fn signal(&self, _: Request<SignalRequest>) -> Result<()> {
process::exit(0);
}
}