From 11a5e260ab44f93db576f88bbed4f9d9cd7fc534 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C3=A9o=20Fernandez?= Date: Thu, 25 Apr 2024 21:14:27 +0200 Subject: [PATCH] fix: move agent.proto in root folder and use better error handling in grpc service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Matéo Fernandez --- {src/proto => proto}/agent.proto | 0 src/agent/build.rs | 2 +- src/agent/src/workload/service.rs | 58 +++++++++++++------------------ 3 files changed, 26 insertions(+), 34 deletions(-) rename {src/proto => proto}/agent.proto (100%) diff --git a/src/proto/agent.proto b/proto/agent.proto similarity index 100% rename from src/proto/agent.proto rename to proto/agent.proto diff --git a/src/agent/build.rs b/src/agent/build.rs index 2dcec82..d370e28 100644 --- a/src/agent/build.rs +++ b/src/agent/build.rs @@ -1,4 +1,4 @@ fn main() -> Result<(), Box> { - tonic_build::compile_protos("../proto/agent.proto")?; + tonic_build::compile_protos("../../proto/agent.proto")?; Ok(()) } diff --git a/src/agent/src/workload/service.rs b/src/agent/src/workload/service.rs index 3de5455..53c8a1e 100644 --- a/src/agent/src/workload/service.rs +++ b/src/agent/src/workload/service.rs @@ -1,15 +1,11 @@ -use std::{process::exit, sync::Arc}; - +use super::runner::Runner; use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest}; - use agent::workload_runner_server::WorkloadRunner; - +use std::sync::Arc; use tokio::sync::Mutex; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response}; -use super::runner::Runner; - type Result = std::result::Result, tonic::Status>; pub struct WorkloadRunnerService { @@ -18,7 +14,7 @@ pub struct WorkloadRunnerService { impl WorkloadRunnerService { pub fn new(runner: Runner) -> Self { - WorkloadRunnerService { + Self { runner: Arc::new(Mutex::new(runner)), } } @@ -32,36 +28,32 @@ impl WorkloadRunner for WorkloadRunnerService { let (tx, rx) = tokio::sync::mpsc::channel(4); // We assume there's only one request at a time - let runner = match self.runner.try_lock() { - Ok(runner) => runner, - Err(_) => { - return Err(tonic::Status::unavailable("Runner is busy")); - } - }; - - let res = match runner.run() { - Ok(res) => res, - Err(err) => { - return Err(tonic::Status::internal(err.to_string())); - } - }; - - let _ = tx - .send(Ok(ExecuteResponse { - stdout: res.stdout, - stderr: res.stderr, - exit_code: res.exit_code, - })) - .await - .map_err(|e| { - println!("Failed to send response: {:?}", e); - tonic::Status::internal("Failed to send response") - })?; + let runner = self + .runner + .try_lock() + .map_err(|e| tonic::Status::unavailable(format!("Runner is busy: {:?}", e)))?; + + let res = runner + .run() + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + tx.send(Ok(ExecuteResponse { + stdout: res.stdout, + stderr: res.stderr, + exit_code: res.exit_code, + })) + .await + .map_err(|e| { + println!("Failed to send response: {:?}", e); + tonic::Status::internal("Failed to send response") + })?; Ok(Response::new(ReceiverStream::new(rx))) } async fn signal(&self, _: Request) -> Result<()> { - unreachable!(); + // should apply the given signal on the runner process + // example: kill the process if running + todo!(); } }