Skip to content

Commit

Permalink
refactor: error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed May 14, 2024
1 parent 63a54d1 commit c453d29
Showing 1 changed file with 27 additions and 26 deletions.
53 changes: 27 additions & 26 deletions src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;

use enum_as_inner::EnumAsInner;
use hydroflow::scheduled::graph::Hydroflow;
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{broadcast, mpsc, Mutex};

use crate::adapter::error::{Error, EvalSnafu, FlowAlreadyExistSnafu, InternalSnafu};
Expand All @@ -32,6 +32,8 @@ use crate::repr::{self, DiffRow};

pub type SharedBuf = Arc<Mutex<VecDeque<DiffRow>>>;

type ReqId = usize;

/// Create both worker(`!Send`) and worker handle(`Send + Sync`)
pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
let (itc_client, itc_server) = create_inter_thread_call();
Expand Down Expand Up @@ -107,17 +109,15 @@ pub struct WorkerHandle {

impl WorkerHandle {
/// create task, return task id
///
#[allow(clippy::too_many_arguments)]
pub async fn create_flow(&self, create_reqs: Request) -> Result<Option<FlowId>, Error> {
if !matches!(create_reqs, Request::Create { .. }) {
return InternalSnafu {
ensure!(
matches!(create_reqs, Request::Create { .. }),
InternalSnafu {
reason: format!(
"Flow Node/Worker itc failed, expect Request::Create, found {create_reqs:?}"
),
}
.fail();
}
);

let ret = self
.itc_client
Expand All @@ -139,14 +139,13 @@ impl WorkerHandle {
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<bool, Error> {
let req = Request::Remove { flow_id };
let ret = self.itc_client.lock().await.call_blocking(req).await?;
if let Response::Remove { result } = ret {
Ok(result)
} else {

ret.into_remove().map_err(|ret| {
InternalSnafu {
reason: format!("Flow Node/Worker failed, expect Response::Remove, found {ret:?}"),
}
.fail()
}
.build()
})
}

/// trigger running the worker, will not block, and will run the worker parallelly
Expand All @@ -163,19 +162,15 @@ impl WorkerHandle {
pub async fn contains_flow(&self, flow_id: FlowId) -> Result<bool, Error> {
let req = Request::ContainTask { flow_id };
let ret = self.itc_client.lock().await.call_blocking(req).await?;
if let Response::ContainTask {
result: task_contain_result,
} = ret
{
Ok(task_contain_result)
} else {

ret.into_contain_task().map_err(|ret| {
InternalSnafu {
reason: format!(
"Flow Node/Worker itc failed, expect Response::ContainTask, found {ret:?}"
),
}
.fail()
}
.build()
})
}

/// shutdown the worker
Expand Down Expand Up @@ -285,7 +280,7 @@ impl<'s> Worker<'s> {
/// handle request, return response if any, Err if receive shutdown signal
///
/// return `Err(())` if receive shutdown request
fn handle_req(&mut self, req_id: usize, req: Request) -> Result<Option<(usize, Response)>, ()> {
fn handle_req(&mut self, req_id: ReqId, req: Request) -> Result<Option<(ReqId, Response)>, ()> {
let ret = match req {
Request::Create {
flow_id,
Expand Down Expand Up @@ -388,7 +383,7 @@ fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer)
};
(client, server)
}
type ReqId = usize;

#[derive(Debug)]
struct InterThreadCallClient {
call_id: AtomicUsize,
Expand All @@ -405,18 +400,23 @@ impl InterThreadCallClient {
.send((call_id, req))
.map_err(from_send_error)
}

/// call blocking, and return the result
async fn call_blocking(&mut self, req: Request) -> Result<Response, Error> {
// TODO(discord9): relax memory order later
let call_id = self.call_id.fetch_add(1, Ordering::SeqCst);
self.arg_sender
.send((call_id, req))
.map_err(from_send_error)?;

// TODO(discord9): better inter thread call impl, i.e. support multiple client(also consider if it's necessary)
// since one node manger might manage multiple worker, but one worker should only belong to one node manager
let (ret_call_id, ret) = self.ret_recv.recv().await.ok_or_else(||InternalSnafu {
reason: "InterThreadCallClient call_blocking failed, ret_recv has been closed and there are no remaining messages in the channel's buffer",
}.build())?;
let (ret_call_id, ret) = self
.ret_recv
.recv()
.await
.context(InternalSnafu { reason: "InterThreadCallClient call_blocking failed, ret_recv has been closed and there are no remaining messages in the channel's buffer" })?;

ensure!(
ret_call_id == call_id,
InternalSnafu {
Expand All @@ -443,7 +443,7 @@ impl InterThreadCallServer {
}

/// Send response back to the client
pub fn resp(&self, call_id: usize, resp: Response) -> Result<(), Error> {
pub fn resp(&self, call_id: ReqId, resp: Response) -> Result<(), Error> {
self.ret_sender
.send((call_id, resp))
.map_err(from_send_error)
Expand All @@ -465,6 +465,7 @@ mod test {
use crate::expr::Id;
use crate::plan::Plan;
use crate::repr::{RelationType, Row};

#[tokio::test]
pub async fn test_simple_get_with_worker_and_handle() {
let (tx, rx) = oneshot::channel();
Expand Down

0 comments on commit c453d29

Please sign in to comment.