Skip to content

Commit

Permalink
feat(agent/grpc): add gRPC server (#27)
Browse files Browse the repository at this point in the history
* feat(agent): add and compile workload proto service

Signed-off-by: Martin Moreira de Jesus <[email protected]>

* refactor(agent/workload): add mod.rs

Signed-off-by: Martin Moreira de Jesus <[email protected]>

* feat(agent/grpc): add service skeleton

Signed-off-by: Martin Moreira de Jesus <[email protected]>

* feat(agent/grpc): run agent on request and send result

Signed-off-by: Martin Moreira de Jesus <[email protected]>

* fix(agent/grpc): add server config in file

Signed-off-by: Martin Moreira de Jesus <[email protected]>

* fix(agent/grpc): typo

Signed-off-by: Martin Moreira de Jesus <[email protected]>

* feat(agent/grpc): add ungraceful shutdown

Signed-off-by: Martin Moreira de Jesus <[email protected]>

* nitpick: move agent.proto at root proto folder

Signed-off-by: Matéo Fernandez <[email protected]>

* refactor(grpc): use map_err instead of match

Signed-off-by: Matéo Fernandez <[email protected]>

---------

Signed-off-by: Martin Moreira de Jesus <[email protected]>
Signed-off-by: Matéo Fernandez <[email protected]>
Co-authored-by: Matéo Fernandez <[email protected]>
  • Loading branch information
mmoreiradj and mfernd authored Apr 25, 2024
1 parent 2338841 commit decb5bd
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 15 deletions.
31 changes: 31 additions & 0 deletions proto/agent.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
syntax = "proto3";
package cloudlet.agent;
import "google/protobuf/empty.proto";

message ExecuteRequest {}

message ExecuteResponse {
enum Stage {
PENDING = 0;
BUILDING = 1;
RUNNING = 2;
DONE = 3;
FAILED = 4;
DEBUG = 5;
}

string stdout = 1;
string stderr = 2;
int32 exit_code = 3;
}

message SignalRequest {
enum Signal {
KILL = 0;
}
}

service WorkloadRunner {
rpc Execute(ExecuteRequest) returns (stream ExecuteResponse) {}
rpc Signal(SignalRequest) returns (google.protobuf.Empty) {}
}
10 changes: 9 additions & 1 deletion src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,17 @@ path = "src/lib.rs"

[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
prost = "0.12.4"
rand = "0.8.5"
serde = { version = "1.0.197", features = ["derive"] }
tokio = { version = "1.37.0", features = ["full"] }
tokio-stream = "0.1.15"
toml = "0.8.12"
tonic = "0.11"

[build-dependencies]
tonic-build = "0.11"

[features]
debug-agent = []
debug-agent = []

4 changes: 4 additions & 0 deletions src/agent/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../../proto/agent.proto")?;
Ok(())
}
4 changes: 4 additions & 0 deletions src/agent/examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ workload-name = "fibonacci"
language = "rust"
action = "prepare-and-run"

[server]
address = "localhost"
port = 50051

[build]
source-code-path = "CHANGEME/cloudlet/src/agent/examples/main.rs"
release = true
20 changes: 16 additions & 4 deletions src/agent/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use agents::AgentOutput;
use std::fmt;

mod agents;
pub mod workload {
pub mod config;
pub mod runner;
}
pub mod workload;

#[derive(Debug)]
pub enum AgentError {
Expand All @@ -13,4 +11,18 @@ pub enum AgentError {
BuildFailed(AgentOutput),
}

impl fmt::Display for AgentError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AgentError::OpenConfigFileError(e) => write!(f, "Failed to open config file: {}", e),
AgentError::ParseConfigError(e) => write!(f, "Failed to parse config file: {}", e),
AgentError::BuildFailed(output) => write!(f, "Build failed: {:?}", output),
}
}
}

pub type AgentResult<T> = Result<T, AgentError>;

pub mod agent {
tonic::include_proto!("cloudlet.agent");
}
28 changes: 24 additions & 4 deletions src/agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
use agent::workload::{config::Config, runner::Runner};
use agent::{
agent::workload_runner_server::WorkloadRunnerServer,
workload::{config::Config, runner::Runner, service::WorkloadRunnerService},
};
use clap::Parser;
use std::path::PathBuf;
use std::{net::ToSocketAddrs, path::PathBuf};
use tonic::transport::Server;

#[derive(Debug, Parser)]
struct Args {
#[clap(short, long, default_value = "/etc/cloudlet/agent/config.toml")]
config: PathBuf,
}

fn main() {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

let config = Config::from_file(&args.config).unwrap();

let bind_address = format!("{}:{}", config.server.address, config.server.port)
.to_socket_addrs()
.unwrap()
.next()
.unwrap();

let runner = Runner::new(config);

runner.run().unwrap();
let server = WorkloadRunnerService::new(runner);

Server::builder()
.add_service(WorkloadRunnerServer::new(server))
.serve(bind_address)
.await
.unwrap();

Ok(())
}
9 changes: 9 additions & 0 deletions src/agent/src/workload/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ use crate::{agents::Language, AgentError, AgentResult};
use serde::Deserialize;
use std::path::PathBuf;

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Server {
pub address: String,
pub port: u16,
}

/// Generic agent configuration.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -12,6 +19,8 @@ pub struct Config {
pub language: Language,
/// Action to perform.
pub action: Action,
/// Server configuration.
pub server: Server,
/// Rest of the configuration as a string.
#[serde(skip)]
pub config_string: String,
Expand Down
3 changes: 3 additions & 0 deletions src/agent/src/workload/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod config;
pub mod runner;
pub mod service;
14 changes: 8 additions & 6 deletions src/agent/src/workload/runner.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use super::config::{Action, Config};
use crate::{
agents::{rust, Agent, Language},
agents::{rust, Agent, AgentOutput, Language},
workload::config::Action,
AgentResult,
};

#[cfg(feature = "debug-agent")]
use crate::agents::debug;

use super::config::Config;

/// Runner for a workload.
/// Will execute the workload based on the inner agent (language).
pub struct Runner {
config: Config,
agent: Box<dyn Agent>,
agent: Box<dyn Agent + Sync + Send>,
}

impl Runner {
pub fn new(config: Config) -> Self {
let agent: Box<dyn Agent> = match config.language {
let agent: Box<dyn Agent + Sync + Send> = match config.language {
Language::Rust => Box::new(rust::RustAgent::from(config.clone())),
#[cfg(feature = "debug-agent")]
Language::Debug => Box::new(debug::DebugAgent::from(config.clone())),
Expand All @@ -25,7 +27,7 @@ impl Runner {
Runner { config, agent }
}

pub fn run(&self) -> AgentResult<()> {
pub fn run(&self) -> AgentResult<AgentOutput> {
let result = match self.config.action {
Action::Prepare => self.agent.prepare()?,
Action::Run => self.agent.run()?,
Expand All @@ -38,6 +40,6 @@ impl Runner {

println!("Result: {:?}", result);

Ok(())
Ok(result)
}
}
58 changes: 58 additions & 0 deletions src/agent/src/workload/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use super::runner::Runner;
use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest};
use agent::workload_runner_server::WorkloadRunner;
use std::{process, sync::Arc};
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response};

type Result<T> = std::result::Result<Response<T>, tonic::Status>;

pub struct WorkloadRunnerService {
runner: Arc<Mutex<Runner>>,
}

impl WorkloadRunnerService {
pub fn new(runner: Runner) -> Self {
WorkloadRunnerService {
runner: Arc::new(Mutex::new(runner)),
}
}
}

#[tonic::async_trait]
impl WorkloadRunner for WorkloadRunnerService {
type ExecuteStream = ReceiverStream<std::result::Result<ExecuteResponse, tonic::Status>>;

async fn execute(&self, _: Request<ExecuteRequest>) -> Result<Self::ExecuteStream> {
let (tx, rx) = tokio::sync::mpsc::channel(4);

// We assume there's only one request at a time
let runner = self
.runner
.try_lock()
.map_err(|e| tonic::Status::unavailable(format!("Runner is busy: {:?}", e)))?;

let res = runner
.run()
.map_err(|e| tonic::Status::internal(e.to_string()))?;

let _ = tx
.send(Ok(ExecuteResponse {
stdout: res.stdout,
stderr: res.stderr,
exit_code: res.exit_code,
}))
.await
.map_err(|e| {
println!("Failed to send response: {:?}", e);
tonic::Status::internal("Failed to send response")
})?;

Ok(Response::new(ReceiverStream::new(rx)))
}

async fn signal(&self, _: Request<SignalRequest>) -> Result<()> {
process::exit(0);
}
}

0 comments on commit decb5bd

Please sign in to comment.