Skip to content

Commit

Permalink
fix: move agent.proto in root folder and use better error handling in…
Browse files Browse the repository at this point in the history
… grpc service

Signed-off-by: Matéo Fernandez <[email protected]>
  • Loading branch information
mfernd committed Apr 25, 2024
1 parent edfdc2b commit 11a5e26
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 34 deletions.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/agent/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/agent.proto")?;
tonic_build::compile_protos("../../proto/agent.proto")?;
Ok(())
}
58 changes: 25 additions & 33 deletions src/agent/src/workload/service.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::{process::exit, sync::Arc};

use super::runner::Runner;
use crate::agent::{self, ExecuteRequest, ExecuteResponse, SignalRequest};

use agent::workload_runner_server::WorkloadRunner;

use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response};

use super::runner::Runner;

type Result<T> = std::result::Result<Response<T>, tonic::Status>;

pub struct WorkloadRunnerService {
Expand All @@ -18,7 +14,7 @@ pub struct WorkloadRunnerService {

impl WorkloadRunnerService {
pub fn new(runner: Runner) -> Self {
WorkloadRunnerService {
Self {
runner: Arc::new(Mutex::new(runner)),
}
}
Expand All @@ -32,36 +28,32 @@ impl WorkloadRunner for WorkloadRunnerService {
let (tx, rx) = tokio::sync::mpsc::channel(4);

// We assume there's only one request at a time
let runner = match self.runner.try_lock() {
Ok(runner) => runner,
Err(_) => {
return Err(tonic::Status::unavailable("Runner is busy"));
}
};

let res = match runner.run() {
Ok(res) => res,
Err(err) => {
return Err(tonic::Status::internal(err.to_string()));
}
};

let _ = tx
.send(Ok(ExecuteResponse {
stdout: res.stdout,
stderr: res.stderr,
exit_code: res.exit_code,
}))
.await
.map_err(|e| {
println!("Failed to send response: {:?}", e);
tonic::Status::internal("Failed to send response")
})?;
let runner = self
.runner
.try_lock()
.map_err(|e| tonic::Status::unavailable(format!("Runner is busy: {:?}", e)))?;

let res = runner
.run()
.map_err(|e| tonic::Status::internal(e.to_string()))?;

tx.send(Ok(ExecuteResponse {
stdout: res.stdout,
stderr: res.stderr,
exit_code: res.exit_code,
}))
.await
.map_err(|e| {
println!("Failed to send response: {:?}", e);
tonic::Status::internal("Failed to send response")
})?;

Ok(Response::new(ReceiverStream::new(rx)))
}

async fn signal(&self, _: Request<SignalRequest>) -> Result<()> {
unreachable!();
// should apply the given signal on the runner process
// example: kill the process if running
todo!();
}
}

0 comments on commit 11a5e26

Please sign in to comment.