From decb5bd2cdb8b516475ddaaf5e93e6e7618be1fd Mon Sep 17 00:00:00 2001 From: Martin Moreira de Jesus Date: Thu, 25 Apr 2024 21:52:47 +0200 Subject: [PATCH] feat(agent/grpc): add gRPC server (#27) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(agent): add and compile workload proto service Signed-off-by: Martin Moreira de Jesus * refactor(agent/workload): add mod.rs Signed-off-by: Martin Moreira de Jesus * feat(agent/grpc): add service skeleton Signed-off-by: Martin Moreira de Jesus * feat(agent/grpc): run agent on request and send result Signed-off-by: Martin Moreira de Jesus * fix(agent/grpc): add server config in file Signed-off-by: Martin Moreira de Jesus * fix(agent/grpc): typo Signed-off-by: Martin Moreira de Jesus * feat(agent/grpc): add ungraceful shutdown Signed-off-by: Martin Moreira de Jesus * nitpick: move agent.proto at root proto folder Signed-off-by: Matéo Fernandez * refactor(grpc): use map_err instead of match Signed-off-by: Matéo Fernandez --------- Signed-off-by: Martin Moreira de Jesus Signed-off-by: Matéo Fernandez Co-authored-by: Matéo Fernandez --- proto/agent.proto | 31 +++++++++++++++++ src/agent/Cargo.toml | 10 +++++- src/agent/build.rs | 4 +++ src/agent/examples/config.toml | 4 +++ src/agent/src/lib.rs | 20 ++++++++--- src/agent/src/main.rs | 28 ++++++++++++--- src/agent/src/workload/config.rs | 9 +++++ src/agent/src/workload/mod.rs | 3 ++ src/agent/src/workload/runner.rs | 14 ++++---- src/agent/src/workload/service.rs | 58 +++++++++++++++++++++++++++++++ 10 files changed, 166 insertions(+), 15 deletions(-) create mode 100644 proto/agent.proto create mode 100644 src/agent/build.rs create mode 100644 src/agent/src/workload/mod.rs create mode 100644 src/agent/src/workload/service.rs diff --git a/proto/agent.proto b/proto/agent.proto new file mode 100644 index 0000000..5b6f521 --- /dev/null +++ b/proto/agent.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; +package cloudlet.agent; +import "google/protobuf/empty.proto"; + +message ExecuteRequest {} + +message ExecuteResponse { + enum Stage { + PENDING = 0; + BUILDING = 1; + RUNNING = 2; + DONE = 3; + FAILED = 4; + DEBUG = 5; + } + + string stdout = 1; + string stderr = 2; + int32 exit_code = 3; +} + +message SignalRequest { + enum Signal { + KILL = 0; + } +} + +service WorkloadRunner { + rpc Execute(ExecuteRequest) returns (stream ExecuteResponse) {} + rpc Signal(SignalRequest) returns (google.protobuf.Empty) {} +} diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 9e11866..18c2cf0 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -9,9 +9,17 @@ path = "src/lib.rs" [dependencies] clap = { version = "4.5.4", features = ["derive"] } +prost = "0.12.4" rand = "0.8.5" serde = { version = "1.0.197", features = ["derive"] } +tokio = { version = "1.37.0", features = ["full"] } +tokio-stream = "0.1.15" toml = "0.8.12" +tonic = "0.11" + +[build-dependencies] +tonic-build = "0.11" [features] -debug-agent = [] \ No newline at end of file +debug-agent = [] + diff --git a/src/agent/build.rs b/src/agent/build.rs new file mode 100644 index 0000000..d370e28 --- /dev/null +++ b/src/agent/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("../../proto/agent.proto")?; + Ok(()) +} diff --git a/src/agent/examples/config.toml b/src/agent/examples/config.toml index 6b681cb..231a186 100644 --- a/src/agent/examples/config.toml +++ b/src/agent/examples/config.toml @@ -2,6 +2,10 @@ workload-name = "fibonacci" language = "rust" action = "prepare-and-run" +[server] +address = "localhost" +port = 50051 + [build] source-code-path = "CHANGEME/cloudlet/src/agent/examples/main.rs" release = true diff --git a/src/agent/src/lib.rs b/src/agent/src/lib.rs index 2a70017..9332611 100644 --- a/src/agent/src/lib.rs +++ b/src/agent/src/lib.rs @@ -1,10 +1,8 @@ use agents::AgentOutput; +use std::fmt; mod agents; -pub mod workload { - pub mod config; - pub mod runner; -} +pub mod workload; #[derive(Debug)] pub enum AgentError { @@ -13,4 +11,18 @@ pub enum AgentError { BuildFailed(AgentOutput), } +impl fmt::Display for AgentError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AgentError::OpenConfigFileError(e) => write!(f, "Failed to open config file: {}", e), + AgentError::ParseConfigError(e) => write!(f, "Failed to parse config file: {}", e), + AgentError::BuildFailed(output) => write!(f, "Build failed: {:?}", output), + } + } +} + pub type AgentResult = Result; + +pub mod agent { + tonic::include_proto!("cloudlet.agent"); +} diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index 675a8fe..b9673c9 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -1,6 +1,10 @@ -use agent::workload::{config::Config, runner::Runner}; +use agent::{ + agent::workload_runner_server::WorkloadRunnerServer, + workload::{config::Config, runner::Runner, service::WorkloadRunnerService}, +}; use clap::Parser; -use std::path::PathBuf; +use std::{net::ToSocketAddrs, path::PathBuf}; +use tonic::transport::Server; #[derive(Debug, Parser)] struct Args { @@ -8,11 +12,27 @@ struct Args { config: PathBuf, } -fn main() { +#[tokio::main] +async fn main() -> Result<(), Box> { let args = Args::parse(); let config = Config::from_file(&args.config).unwrap(); + + let bind_address = format!("{}:{}", config.server.address, config.server.port) + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + let runner = Runner::new(config); - runner.run().unwrap(); + let server = WorkloadRunnerService::new(runner); + + Server::builder() + .add_service(WorkloadRunnerServer::new(server)) + .serve(bind_address) + .await + .unwrap(); + + Ok(()) } diff --git a/src/agent/src/workload/config.rs b/src/agent/src/workload/config.rs index cf13934..b21f71c 100644 --- a/src/agent/src/workload/config.rs +++ b/src/agent/src/workload/config.rs @@ -2,6 +2,13 @@ use crate::{agents::Language, AgentError, AgentResult}; use serde::Deserialize; use std::path::PathBuf; +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct Server { + pub address: String, + pub port: u16, +} + /// Generic agent configuration. #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "kebab-case")] @@ -12,6 +19,8 @@ pub struct Config { pub language: Language, /// Action to perform. pub action: Action, + /// Server configuration. + pub server: Server, /// Rest of the configuration as a string. #[serde(skip)] pub config_string: String, diff --git a/src/agent/src/workload/mod.rs b/src/agent/src/workload/mod.rs new file mode 100644 index 0000000..3bfc4f8 --- /dev/null +++ b/src/agent/src/workload/mod.rs @@ -0,0 +1,3 @@ +pub mod config; +pub mod runner; +pub mod service; diff --git a/src/agent/src/workload/runner.rs b/src/agent/src/workload/runner.rs index cb07d80..c921ccd 100644 --- a/src/agent/src/workload/runner.rs +++ b/src/agent/src/workload/runner.rs @@ -1,22 +1,24 @@ -use super::config::{Action, Config}; use crate::{ - agents::{rust, Agent, Language}, + agents::{rust, Agent, AgentOutput, Language}, + workload::config::Action, AgentResult, }; #[cfg(feature = "debug-agent")] use crate::agents::debug; +use super::config::Config; + /// Runner for a workload. /// Will execute the workload based on the inner agent (language). pub struct Runner { config: Config, - agent: Box, + agent: Box, } impl Runner { pub fn new(config: Config) -> Self { - let agent: Box = match config.language { + let agent: Box = match config.language { Language::Rust => Box::new(rust::RustAgent::from(config.clone())), #[cfg(feature = "debug-agent")] Language::Debug => Box::new(debug::DebugAgent::from(config.clone())), @@ -25,7 +27,7 @@ impl Runner { Runner { config, agent } } - pub fn run(&self) -> AgentResult<()> { + pub fn run(&self) -> AgentResult { let result = match self.config.action { Action::Prepare => self.agent.prepare()?, Action::Run => self.agent.run()?, @@ -38,6 +40,6 @@ impl Runner { println!("Result: {:?}", result); - Ok(()) + Ok(result) } } diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs new file mode 100644 index 0000000..dca0e13 --- /dev/null +++ b/src/agent/src/workload/service.rs @@ -0,0 +1,58 @@ +use super::runner::Runner; +use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest}; +use agent::workload_runner_server::WorkloadRunner; +use std::{process, sync::Arc}; +use tokio::sync::Mutex; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response}; + +type Result = std::result::Result, tonic::Status>; + +pub struct WorkloadRunnerService { + runner: Arc>, +} + +impl WorkloadRunnerService { + pub fn new(runner: Runner) -> Self { + WorkloadRunnerService { + runner: Arc::new(Mutex::new(runner)), + } + } +} + +#[tonic::async_trait] +impl WorkloadRunner for WorkloadRunnerService { + type ExecuteStream = ReceiverStream>; + + async fn execute(&self, _: Request) -> Result { + let (tx, rx) = tokio::sync::mpsc::channel(4); + + // We assume there's only one request at a time + let runner = self + .runner + .try_lock() + .map_err(|e| tonic::Status::unavailable(format!("Runner is busy: {:?}", e)))?; + + let res = runner + .run() + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let _ = tx + .send(Ok(ExecuteResponse { + stdout: res.stdout, + stderr: res.stderr, + exit_code: res.exit_code, + })) + .await + .map_err(|e| { + println!("Failed to send response: {:?}", e); + tonic::Status::internal("Failed to send response") + })?; + + Ok(Response::new(ReceiverStream::new(rx))) + } + + async fn signal(&self, _: Request) -> Result<()> { + process::exit(0); + } +}