From ce1917c61ee41a62ad0e510c30167585a8efa515 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 14 May 2024 14:27:18 +0800 Subject: [PATCH 1/5] feat: flow worker --- src/flow/src/adapter/worker.rs | 510 +++++++++++++++++++++++++++++++++ 1 file changed, 510 insertions(+) create mode 100644 src/flow/src/adapter/worker.rs diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs new file mode 100644 index 000000000000..a9d779054e4a --- /dev/null +++ b/src/flow/src/adapter/worker.rs @@ -0,0 +1,510 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! For single-thread flow worker + +use std::collections::{BTreeMap, VecDeque}; +use std::sync::Arc; + +use hydroflow::scheduled::graph::Hydroflow; +use snafu::ResultExt; +use tokio::sync::{broadcast, mpsc, Mutex}; + +use crate::adapter::error::{Error, EvalSnafu}; +use crate::adapter::FlowId; +use crate::compute::{Context, DataflowState, ErrCollector}; +use crate::expr::error::InternalSnafu; +use crate::expr::GlobalId; +use crate::plan::TypedPlan; +use crate::repr::{self, DiffRow}; + +pub type SharedBuf = Arc>>; + +/// Create both worker(`!Send`) and worker handle(`Send + Sync`) +pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) { + let (itc_client, itc_server) = create_inter_thread_call(); + let worker_handle = WorkerHandle { + itc_client: Mutex::new(itc_client), + }; + let worker = Worker { + task_states: BTreeMap::new(), + itc_server: Arc::new(Mutex::new(itc_server)), + }; + (worker_handle, worker) +} + +/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState` + +pub(crate) struct ActiveDataflowState<'subgraph> { + df: Hydroflow<'subgraph>, + state: DataflowState, + err_collector: ErrCollector, +} + +impl std::fmt::Debug for ActiveDataflowState<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ActiveDataflowState") + .field("df", &"") + .field("state", &self.state) + .field("err_collector", &self.err_collector) + .finish() + } +} + +impl Default for ActiveDataflowState<'_> { + fn default() -> Self { + ActiveDataflowState { + df: Hydroflow::new(), + state: DataflowState::default(), + err_collector: ErrCollector::default(), + } + } +} + +impl<'subgraph> ActiveDataflowState<'subgraph> { + /// Create a new render context, assigned with given global id + pub fn new_ctx<'ctx>(&'ctx mut self, global_id: GlobalId) -> Context<'ctx, 'subgraph> + where + 'subgraph: 'ctx, + { + Context { + id: global_id, + df: &mut self.df, + compute_state: &mut self.state, + err_collector: self.err_collector.clone(), + input_collection: Default::default(), + local_scope: Default::default(), + } + } + + pub fn set_current_ts(&mut self, ts: repr::Timestamp) { + self.state.set_current_ts(ts); + } + + /// Run all available subgraph + /// + /// return true if any subgraph actually executed + pub fn run_available(&mut self) -> bool { + self.state.run_available_with_schedule(&mut self.df) + } +} + +#[derive(Debug)] +pub struct WorkerHandle { + itc_client: Mutex, +} + +impl WorkerHandle { + /// create task, return task id + /// + #[allow(clippy::too_many_arguments)] + pub async fn create_flow( + &self, + task_id: FlowId, + plan: TypedPlan, + sink_id: GlobalId, + sink_sender: mpsc::UnboundedSender, + source_ids: &[GlobalId], + src_recvs: Vec>, + expire_when: Option, + create_if_not_exist: bool, + err_collector: ErrCollector, + ) -> Result, Error> { + let req = Request::Create { + task_id, + plan, + sink_id, + sink_sender, + source_ids: source_ids.to_vec(), + src_recvs, + expire_when, + create_if_not_exist, + err_collector, + }; + + let ret = self.itc_client.lock().await.call_blocking(req).await?; + if let Response::Create { + result: task_create_result, + } = ret + { + task_create_result + } else { + InternalSnafu { + reason: format!( + "Flow Node/Worker itc failed, expect Response::Create, found {ret:?}" + ), + } + .fail() + .with_context(|_| EvalSnafu {}) + } + } + + /// remove task, return task id + pub async fn remove_flow(&self, task_id: FlowId) -> Result { + let req = Request::Remove { task_id }; + let ret = self.itc_client.lock().await.call_blocking(req).await?; + if let Response::Remove { result } = ret { + Ok(result) + } else { + InternalSnafu { + reason: format!("Flow Node/Worker failed, expect Response::Remove, found {ret:?}"), + } + .fail() + .with_context(|_| EvalSnafu {}) + } + } + + /// trigger running the worker, will not block, and will run the worker parallelly + /// + /// will set the current timestamp to `now` for all dataflows before running them + pub async fn run_available(&self, now: repr::Timestamp) { + self.itc_client + .lock() + .await + .call_non_blocking(Request::RunAvail { now }) + .await; + } + + pub async fn contains_flow(&self, task_id: FlowId) -> Result { + let req = Request::ContainTask { task_id }; + let ret = self + .itc_client + .lock() + .await + .call_blocking(req) + .await + .unwrap(); + if let Response::ContainTask { + result: task_contain_result, + } = ret + { + Ok(task_contain_result) + } else { + InternalSnafu { + reason: format!( + "Flow Node/Worker itc failed, expect Response::ContainTask, found {ret:?}" + ), + } + .fail() + .with_context(|_| EvalSnafu {}) + } + } + + /// shutdown the worker + pub async fn shutdown(&self) { + self.itc_client + .lock() + .await + .call_non_blocking(Request::Shutdown) + .await; + } +} + +/// The actual worker that does the work and contain active state +#[derive(Debug)] +pub struct Worker<'subgraph> { + /// Task states + pub(crate) task_states: BTreeMap>, + itc_server: Arc>, +} + +impl<'s> Worker<'s> { + #[allow(clippy::too_many_arguments)] + pub fn create_flow( + &mut self, + task_id: FlowId, + plan: TypedPlan, + sink_id: GlobalId, + sink_sender: mpsc::UnboundedSender, + source_ids: &[GlobalId], + src_recvs: Vec>, + // TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead + expire_when: Option, + create_if_not_exist: bool, + err_collector: ErrCollector, + ) -> Result, Error> { + let _ = expire_when; + if create_if_not_exist { + // check if the task already exists + if self.task_states.contains_key(&task_id) { + return Ok(None); + } + } + + let mut cur_task_state = ActiveDataflowState::<'s> { + err_collector, + ..Default::default() + }; + + { + let mut ctx = cur_task_state.new_ctx(sink_id); + for (source_id, src_recv) in source_ids.iter().zip(src_recvs) { + let bundle = ctx.render_source(src_recv)?; + ctx.insert_global(*source_id, bundle); + } + + let rendered = ctx.render_plan(plan.plan)?; + ctx.render_unbounded_sink(rendered, sink_sender); + } + self.task_states.insert(task_id, cur_task_state); + Ok(Some(task_id)) + } + + /// remove task, return true if a task is removed + pub fn remove_flow(&mut self, task_id: FlowId) -> bool { + self.task_states.remove(&task_id).is_some() + } + + /// Run the worker, blocking, until shutdown signal is received + pub fn run(&mut self) { + loop { + let (req_id, req) = self.itc_server.blocking_lock().blocking_recv().unwrap(); + + let ret = self.handle_req(req_id, req); + match ret { + Ok(Some((id, resp))) => { + self.itc_server.blocking_lock().resp(id, resp); + } + Ok(None) => continue, + Err(()) => { + break; + } + } + } + } + + /// run with tick acquired from tick manager(usually means system time) + /// TODO(discord9): better tick management + pub fn run_tick(&mut self, now: repr::Timestamp) { + for (_task_id, task_state) in self.task_states.iter_mut() { + task_state.set_current_ts(now); + task_state.run_available(); + } + } + /// handle request, return response if any, Err if receive shutdown signal + fn handle_req(&mut self, req_id: usize, req: Request) -> Result, ()> { + let ret = match req { + Request::Create { + task_id, + plan, + sink_id, + sink_sender, + source_ids, + src_recvs, + expire_when, + create_if_not_exist, + err_collector, + } => { + let task_create_result = self.create_flow( + task_id, + plan, + sink_id, + sink_sender, + &source_ids, + src_recvs, + expire_when, + create_if_not_exist, + err_collector, + ); + Some(( + req_id, + Response::Create { + result: task_create_result, + }, + )) + } + Request::Remove { task_id } => { + let ret = self.remove_flow(task_id); + Some((req_id, Response::Remove { result: ret })) + } + Request::RunAvail { now } => { + self.run_tick(now); + None + } + Request::ContainTask { task_id } => { + let ret = self.task_states.contains_key(&task_id); + Some((req_id, Response::ContainTask { result: ret })) + } + Request::Shutdown => return Err(()), + }; + Ok(ret) + } +} + +#[derive(Debug)] +enum Request { + Create { + task_id: FlowId, + plan: TypedPlan, + sink_id: GlobalId, + sink_sender: mpsc::UnboundedSender, + source_ids: Vec, + src_recvs: Vec>, + expire_when: Option, + create_if_not_exist: bool, + err_collector: ErrCollector, + }, + Remove { + task_id: FlowId, + }, + /// Trigger the worker to run, useful after input buffer is full + RunAvail { + now: repr::Timestamp, + }, + ContainTask { + task_id: FlowId, + }, + Shutdown, +} + +#[derive(Debug)] +enum Response { + Create { + result: Result, Error>, + // TODO(discord9): add flow err_collector + }, + Remove { + result: bool, + }, + ContainTask { + result: bool, + }, +} + +fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) { + let (arg_send, arg_recv) = mpsc::unbounded_channel(); + let (ret_send, ret_recv) = mpsc::unbounded_channel(); + let client = InterThreadCallClient { + call_id: Arc::new(Mutex::new(0)), + arg_sender: arg_send, + ret_recv, + }; + let server = InterThreadCallServer { + arg_recv, + ret_sender: ret_send, + }; + (client, server) +} + +#[derive(Debug)] +struct InterThreadCallClient { + call_id: Arc>, + arg_sender: mpsc::UnboundedSender<(usize, Request)>, + ret_recv: mpsc::UnboundedReceiver<(usize, Response)>, +} + +impl InterThreadCallClient { + /// call without expecting responses or blocking + async fn call_non_blocking(&self, req: Request) { + let call_id = { + let mut call_id = self.call_id.lock().await; + *call_id += 1; + *call_id + }; + self.arg_sender.send((call_id, req)).unwrap(); + } + /// call blocking, and return the result + async fn call_blocking(&mut self, req: Request) -> Result { + let call_id = { + let mut call_id = self.call_id.lock().await; + *call_id += 1; + *call_id + }; + self.arg_sender.send((call_id, req)).unwrap(); + // TODO(discord9): better inter thread call impl + let (ret_call_id, ret) = self.ret_recv.recv().await.unwrap(); + if ret_call_id != call_id { + return InternalSnafu { + reason: "call id mismatch, worker/worker handler should be in sync", + } + .fail() + .with_context(|_| EvalSnafu {}); + } + Ok(ret) + } +} + +#[derive(Debug)] +struct InterThreadCallServer { + pub arg_recv: mpsc::UnboundedReceiver<(usize, Request)>, + pub ret_sender: mpsc::UnboundedSender<(usize, Response)>, +} + +impl InterThreadCallServer { + pub async fn recv(&mut self) -> Option<(usize, Request)> { + self.arg_recv.recv().await + } + + pub fn blocking_recv(&mut self) -> Option<(usize, Request)> { + self.arg_recv.blocking_recv() + } + + /// Send response back to the client + pub fn resp(&self, call_id: usize, resp: Response) { + self.ret_sender.send((call_id, resp)).unwrap(); + } +} + +#[cfg(test)] +mod test { + use tokio::sync::oneshot; + + use super::*; + use crate::adapter::FlowTickManager; + use crate::expr::Id; + use crate::plan::Plan; + use crate::repr::{RelationType, Row}; + #[tokio::test] + pub async fn test_simple_get_with_worker_and_handle() { + let flow_tick = FlowTickManager::new(); + let (tx, rx) = oneshot::channel(); + let worker_thread_handle = std::thread::spawn(move || { + let (handle, mut worker) = create_worker(); + tx.send(handle).unwrap(); + worker.run(); + }); + let handle = rx.await.unwrap(); + let src_ids = vec![GlobalId::User(1)]; + let (tx, rx) = broadcast::channel::(1024); + let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::(); + let (task_id, plan) = ( + 1, + TypedPlan { + plan: Plan::Get { + id: Id::Global(GlobalId::User(1)), + }, + typ: RelationType::new(vec![]), + }, + ); + handle + .create_flow( + task_id, + plan, + GlobalId::User(1), + sink_tx, + &src_ids, + vec![rx], + None, + true, + ErrCollector::default(), + ) + .await + .unwrap(); + tx.send((Row::empty(), 0, 0)).unwrap(); + handle.run_available(flow_tick.tick()).await; + assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty()); + handle.shutdown().await; + worker_thread_handle.join().unwrap(); + } +} From 519ce7d4863f58fb567ec85ef3fb79dcd812a8b4 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 14 May 2024 14:38:39 +0800 Subject: [PATCH 2/5] chore: fix after cherry pick --- src/flow/src/adapter.rs | 9 +++++++++ src/flow/src/adapter/worker.rs | 4 +--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 9eb68a02c5f4..aef545438918 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -19,3 +19,12 @@ pub(crate) mod error; pub(crate) mod node_context; pub(crate) use node_context::FlownodeContext; + +mod worker; + +pub const PER_REQ_MAX_ROW_CNT: usize = 8192; + +// TODO: refactor common types for flow to a separate module +/// FlowId is a unique identifier for a flow task +pub type FlowId = u64; +pub type TableName = [String; 3]; diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index a9d779054e4a..ca5f06a98dba 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -461,13 +461,11 @@ mod test { use tokio::sync::oneshot; use super::*; - use crate::adapter::FlowTickManager; use crate::expr::Id; use crate::plan::Plan; use crate::repr::{RelationType, Row}; #[tokio::test] pub async fn test_simple_get_with_worker_and_handle() { - let flow_tick = FlowTickManager::new(); let (tx, rx) = oneshot::channel(); let worker_thread_handle = std::thread::spawn(move || { let (handle, mut worker) = create_worker(); @@ -502,7 +500,7 @@ mod test { .await .unwrap(); tx.send((Row::empty(), 0, 0)).unwrap(); - handle.run_available(flow_tick.tick()).await; + handle.run_available(0).await; assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty()); handle.shutdown().await; worker_thread_handle.join().unwrap(); From 63a54d1c03bb72d722ddee447f63c5e5b772a0ed Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 14 May 2024 17:56:19 +0800 Subject: [PATCH 3/5] refactor: error handling --- Cargo.lock | 13 ++ src/flow/Cargo.toml | 1 + src/flow/src/adapter.rs | 7 +- src/flow/src/adapter/error.rs | 29 +++- src/flow/src/adapter/worker.rs | 251 ++++++++++++++++----------------- 5 files changed, 165 insertions(+), 136 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dcb3b9de8bbd..f636849c0ff3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3528,6 +3528,18 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" +[[package]] +name = "enum-as-inner" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ffccbb6966c05b32ef8fbac435df276c4ae4d3dc55a8cd0eb9745e6c12f546a" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "enum-iterator" version = "1.5.0" @@ -3816,6 +3828,7 @@ dependencies = [ "datafusion-common 37.0.0", "datafusion-expr 37.0.0", "datatypes", + "enum-as-inner", "enum_dispatch", "futures", "greptime-proto", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 510c92d6e8cf..8f9c2ba8af97 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -27,6 +27,7 @@ futures = "0.3" # it is the same with upstream repo async-trait.workspace = true common-meta.workspace = true +enum-as-inner = "0.6.0" greptime-proto.workspace = true hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index aef545438918..8179ca5807f9 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -18,13 +18,8 @@ pub(crate) mod error; pub(crate) mod node_context; -pub(crate) use node_context::FlownodeContext; +pub(crate) use node_context::{FlowId, FlownodeContext, TableName}; mod worker; pub const PER_REQ_MAX_ROW_CNT: usize = 8192; - -// TODO: refactor common types for flow to a separate module -/// FlowId is a unique identifier for a flow task -pub type FlowId = u64; -pub type TableName = [String; 3]; diff --git a/src/flow/src/adapter/error.rs b/src/flow/src/adapter/error.rs index 3cc74b900d41..2406dc5ea79d 100644 --- a/src/flow/src/adapter/error.rs +++ b/src/flow/src/adapter/error.rs @@ -24,6 +24,7 @@ use datatypes::value::Value; use servers::define_into_tonic_status; use snafu::{Location, Snafu}; +use crate::adapter::FlowId; use crate::expr::EvalError; /// This error is used to represent all possible errors that can occur in the flow module. @@ -39,7 +40,11 @@ pub enum Error { }, #[snafu(display("Internal error"))] - Internal { location: Location, reason: String }, + Internal { + reason: String, + #[snafu(implicit)] + location: Location, + }, /// TODO(discord9): add detailed location of column #[snafu(display("Failed to eval stream"))] @@ -71,6 +76,20 @@ pub enum Error { location: Location, }, + #[snafu(display("Flow not found, id={id}"))] + FlowNotFound { + id: FlowId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Flow already exist, id={id}"))] + FlowAlreadyExist { + id: FlowId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to join task"))] JoinTask { #[snafu(source)] @@ -168,10 +187,12 @@ impl ErrorExt for Error { Self::Eval { .. } | &Self::JoinTask { .. } | &Self::Datafusion { .. } => { StatusCode::Internal } - &Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, - Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } => { - StatusCode::TableNotFound + &Self::TableAlreadyExist { .. } | Self::FlowAlreadyExist { .. } => { + StatusCode::TableAlreadyExists } + Self::TableNotFound { .. } + | Self::TableNotFoundMeta { .. } + | Self::FlowNotFound { .. } => StatusCode::TableNotFound, Self::InvalidQueryPlan { .. } | Self::InvalidQuerySubstrait { .. } | Self::InvalidQueryProst { .. } diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index ca5f06a98dba..8e7a78146783 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -15,16 +15,17 @@ //! For single-thread flow worker use std::collections::{BTreeMap, VecDeque}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use enum_as_inner::EnumAsInner; use hydroflow::scheduled::graph::Hydroflow; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use tokio::sync::{broadcast, mpsc, Mutex}; -use crate::adapter::error::{Error, EvalSnafu}; +use crate::adapter::error::{Error, EvalSnafu, FlowAlreadyExistSnafu, InternalSnafu}; use crate::adapter::FlowId; use crate::compute::{Context, DataflowState, ErrCollector}; -use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; use crate::plan::TypedPlan; use crate::repr::{self, DiffRow}; @@ -45,7 +46,6 @@ pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) { } /// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState` - pub(crate) struct ActiveDataflowState<'subgraph> { df: Hydroflow<'subgraph>, state: DataflowState, @@ -109,50 +109,35 @@ impl WorkerHandle { /// create task, return task id /// #[allow(clippy::too_many_arguments)] - pub async fn create_flow( - &self, - task_id: FlowId, - plan: TypedPlan, - sink_id: GlobalId, - sink_sender: mpsc::UnboundedSender, - source_ids: &[GlobalId], - src_recvs: Vec>, - expire_when: Option, - create_if_not_exist: bool, - err_collector: ErrCollector, - ) -> Result, Error> { - let req = Request::Create { - task_id, - plan, - sink_id, - sink_sender, - source_ids: source_ids.to_vec(), - src_recvs, - expire_when, - create_if_not_exist, - err_collector, - }; + pub async fn create_flow(&self, create_reqs: Request) -> Result, Error> { + if !matches!(create_reqs, Request::Create { .. }) { + return InternalSnafu { + reason: format!( + "Flow Node/Worker itc failed, expect Request::Create, found {create_reqs:?}" + ), + } + .fail(); + } - let ret = self.itc_client.lock().await.call_blocking(req).await?; - if let Response::Create { - result: task_create_result, - } = ret - { - task_create_result - } else { + let ret = self + .itc_client + .lock() + .await + .call_blocking(create_reqs) + .await?; + ret.into_create().map_err(|ret| { InternalSnafu { reason: format!( "Flow Node/Worker itc failed, expect Response::Create, found {ret:?}" ), } - .fail() - .with_context(|_| EvalSnafu {}) - } + .build() + })? } /// remove task, return task id - pub async fn remove_flow(&self, task_id: FlowId) -> Result { - let req = Request::Remove { task_id }; + pub async fn remove_flow(&self, flow_id: FlowId) -> Result { + let req = Request::Remove { flow_id }; let ret = self.itc_client.lock().await.call_blocking(req).await?; if let Response::Remove { result } = ret { Ok(result) @@ -161,30 +146,23 @@ impl WorkerHandle { reason: format!("Flow Node/Worker failed, expect Response::Remove, found {ret:?}"), } .fail() - .with_context(|_| EvalSnafu {}) } } /// trigger running the worker, will not block, and will run the worker parallelly /// /// will set the current timestamp to `now` for all dataflows before running them - pub async fn run_available(&self, now: repr::Timestamp) { + pub async fn run_available(&self, now: repr::Timestamp) -> Result<(), Error> { self.itc_client .lock() .await .call_non_blocking(Request::RunAvail { now }) - .await; + .await } - pub async fn contains_flow(&self, task_id: FlowId) -> Result { - let req = Request::ContainTask { task_id }; - let ret = self - .itc_client - .lock() - .await - .call_blocking(req) - .await - .unwrap(); + pub async fn contains_flow(&self, flow_id: FlowId) -> Result { + let req = Request::ContainTask { flow_id }; + let ret = self.itc_client.lock().await.call_blocking(req).await?; if let Response::ContainTask { result: task_contain_result, } = ret @@ -197,17 +175,16 @@ impl WorkerHandle { ), } .fail() - .with_context(|_| EvalSnafu {}) } } /// shutdown the worker - pub async fn shutdown(&self) { + pub async fn shutdown(&self) -> Result<(), Error> { self.itc_client .lock() .await .call_non_blocking(Request::Shutdown) - .await; + .await } } @@ -223,7 +200,7 @@ impl<'s> Worker<'s> { #[allow(clippy::too_many_arguments)] pub fn create_flow( &mut self, - task_id: FlowId, + flow_id: FlowId, plan: TypedPlan, sink_id: GlobalId, sink_sender: mpsc::UnboundedSender, @@ -235,12 +212,12 @@ impl<'s> Worker<'s> { err_collector: ErrCollector, ) -> Result, Error> { let _ = expire_when; - if create_if_not_exist { - // check if the task already exists - if self.task_states.contains_key(&task_id) { - return Ok(None); - } - } + let already_exist = self.task_states.contains_key(&flow_id); + match (already_exist, create_if_not_exist) { + (true, true) => return Ok(None), + (true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?, + (false, _) => (), + }; let mut cur_task_state = ActiveDataflowState::<'s> { err_collector, @@ -257,24 +234,37 @@ impl<'s> Worker<'s> { let rendered = ctx.render_plan(plan.plan)?; ctx.render_unbounded_sink(rendered, sink_sender); } - self.task_states.insert(task_id, cur_task_state); - Ok(Some(task_id)) + self.task_states.insert(flow_id, cur_task_state); + Ok(Some(flow_id)) } /// remove task, return true if a task is removed - pub fn remove_flow(&mut self, task_id: FlowId) -> bool { - self.task_states.remove(&task_id).is_some() + pub fn remove_flow(&mut self, flow_id: FlowId) -> bool { + self.task_states.remove(&flow_id).is_some() } /// Run the worker, blocking, until shutdown signal is received pub fn run(&mut self) { loop { - let (req_id, req) = self.itc_server.blocking_lock().blocking_recv().unwrap(); + let (req_id, req) = if let Some(ret) = self.itc_server.blocking_lock().blocking_recv() { + ret + } else { + common_telemetry::error!( + "Worker's itc server has been closed unexpectedly, shutting down worker now." + ); + break; + }; let ret = self.handle_req(req_id, req); match ret { Ok(Some((id, resp))) => { - self.itc_server.blocking_lock().resp(id, resp); + if let Err(err) = self.itc_server.blocking_lock().resp(id, resp) { + common_telemetry::error!( + "Worker's itc server has been closed unexpectedly, shutting down worker: {}", + err + ); + break; + }; } Ok(None) => continue, Err(()) => { @@ -287,16 +277,18 @@ impl<'s> Worker<'s> { /// run with tick acquired from tick manager(usually means system time) /// TODO(discord9): better tick management pub fn run_tick(&mut self, now: repr::Timestamp) { - for (_task_id, task_state) in self.task_states.iter_mut() { + for (_flow_id, task_state) in self.task_states.iter_mut() { task_state.set_current_ts(now); task_state.run_available(); } } /// handle request, return response if any, Err if receive shutdown signal + /// + /// return `Err(())` if receive shutdown request fn handle_req(&mut self, req_id: usize, req: Request) -> Result, ()> { let ret = match req { Request::Create { - task_id, + flow_id, plan, sink_id, sink_sender, @@ -307,7 +299,7 @@ impl<'s> Worker<'s> { err_collector, } => { let task_create_result = self.create_flow( - task_id, + flow_id, plan, sink_id, sink_sender, @@ -324,16 +316,16 @@ impl<'s> Worker<'s> { }, )) } - Request::Remove { task_id } => { - let ret = self.remove_flow(task_id); + Request::Remove { flow_id } => { + let ret = self.remove_flow(flow_id); Some((req_id, Response::Remove { result: ret })) } Request::RunAvail { now } => { self.run_tick(now); None } - Request::ContainTask { task_id } => { - let ret = self.task_states.contains_key(&task_id); + Request::ContainTask { flow_id } => { + let ret = self.task_states.contains_key(&flow_id); Some((req_id, Response::ContainTask { result: ret })) } Request::Shutdown => return Err(()), @@ -342,10 +334,10 @@ impl<'s> Worker<'s> { } } -#[derive(Debug)] -enum Request { +#[derive(Debug, EnumAsInner)] +pub enum Request { Create { - task_id: FlowId, + flow_id: FlowId, plan: TypedPlan, sink_id: GlobalId, sink_sender: mpsc::UnboundedSender, @@ -356,19 +348,19 @@ enum Request { err_collector: ErrCollector, }, Remove { - task_id: FlowId, + flow_id: FlowId, }, /// Trigger the worker to run, useful after input buffer is full RunAvail { now: repr::Timestamp, }, ContainTask { - task_id: FlowId, + flow_id: FlowId, }, Shutdown, } -#[derive(Debug)] +#[derive(Debug, EnumAsInner)] enum Response { Create { result: Result, Error>, @@ -386,7 +378,7 @@ fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) let (arg_send, arg_recv) = mpsc::unbounded_channel(); let (ret_send, ret_recv) = mpsc::unbounded_channel(); let client = InterThreadCallClient { - call_id: Arc::new(Mutex::new(0)), + call_id: AtomicUsize::new(0), arg_sender: arg_send, ret_recv, }; @@ -396,49 +388,49 @@ fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) }; (client, server) } - +type ReqId = usize; #[derive(Debug)] struct InterThreadCallClient { - call_id: Arc>, - arg_sender: mpsc::UnboundedSender<(usize, Request)>, - ret_recv: mpsc::UnboundedReceiver<(usize, Response)>, + call_id: AtomicUsize, + arg_sender: mpsc::UnboundedSender<(ReqId, Request)>, + ret_recv: mpsc::UnboundedReceiver<(ReqId, Response)>, } impl InterThreadCallClient { /// call without expecting responses or blocking - async fn call_non_blocking(&self, req: Request) { - let call_id = { - let mut call_id = self.call_id.lock().await; - *call_id += 1; - *call_id - }; - self.arg_sender.send((call_id, req)).unwrap(); + async fn call_non_blocking(&self, req: Request) -> Result<(), Error> { + // TODO(discord9): relax memory order later + let call_id = self.call_id.fetch_add(1, Ordering::SeqCst); + self.arg_sender + .send((call_id, req)) + .map_err(from_send_error) } /// call blocking, and return the result async fn call_blocking(&mut self, req: Request) -> Result { - let call_id = { - let mut call_id = self.call_id.lock().await; - *call_id += 1; - *call_id - }; - self.arg_sender.send((call_id, req)).unwrap(); - // TODO(discord9): better inter thread call impl - let (ret_call_id, ret) = self.ret_recv.recv().await.unwrap(); - if ret_call_id != call_id { - return InternalSnafu { + // TODO(discord9): relax memory order later + let call_id = self.call_id.fetch_add(1, Ordering::SeqCst); + self.arg_sender + .send((call_id, req)) + .map_err(from_send_error)?; + // TODO(discord9): better inter thread call impl, i.e. support multiple client(also consider if it's necessary) + // since one node manger might manage multiple worker, but one worker should only belong to one node manager + let (ret_call_id, ret) = self.ret_recv.recv().await.ok_or_else(||InternalSnafu { + reason: "InterThreadCallClient call_blocking failed, ret_recv has been closed and there are no remaining messages in the channel's buffer", + }.build())?; + ensure!( + ret_call_id == call_id, + InternalSnafu { reason: "call id mismatch, worker/worker handler should be in sync", } - .fail() - .with_context(|_| EvalSnafu {}); - } + ); Ok(ret) } } #[derive(Debug)] struct InterThreadCallServer { - pub arg_recv: mpsc::UnboundedReceiver<(usize, Request)>, - pub ret_sender: mpsc::UnboundedSender<(usize, Response)>, + pub arg_recv: mpsc::UnboundedReceiver<(ReqId, Request)>, + pub ret_sender: mpsc::UnboundedSender<(ReqId, Response)>, } impl InterThreadCallServer { @@ -451,9 +443,18 @@ impl InterThreadCallServer { } /// Send response back to the client - pub fn resp(&self, call_id: usize, resp: Response) { - self.ret_sender.send((call_id, resp)).unwrap(); + pub fn resp(&self, call_id: usize, resp: Response) -> Result<(), Error> { + self.ret_sender + .send((call_id, resp)) + .map_err(from_send_error) + } +} + +fn from_send_error(err: mpsc::error::SendError) -> Error { + InternalSnafu { + reason: format!("InterThreadCallServer resp failed: {}", err), } + .build() } #[cfg(test)] @@ -476,7 +477,7 @@ mod test { let src_ids = vec![GlobalId::User(1)]; let (tx, rx) = broadcast::channel::(1024); let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::(); - let (task_id, plan) = ( + let (flow_id, plan) = ( 1, TypedPlan { plan: Plan::Get { @@ -485,24 +486,22 @@ mod test { typ: RelationType::new(vec![]), }, ); - handle - .create_flow( - task_id, - plan, - GlobalId::User(1), - sink_tx, - &src_ids, - vec![rx], - None, - true, - ErrCollector::default(), - ) - .await - .unwrap(); + let create_reqs = Request::Create { + flow_id, + plan, + sink_id: GlobalId::User(1), + sink_sender: sink_tx, + source_ids: src_ids, + src_recvs: vec![rx], + expire_when: None, + create_if_not_exist: true, + err_collector: ErrCollector::default(), + }; + handle.create_flow(create_reqs).await.unwrap(); tx.send((Row::empty(), 0, 0)).unwrap(); - handle.run_available(0).await; + handle.run_available(0).await.unwrap(); assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty()); - handle.shutdown().await; + handle.shutdown().await.unwrap(); worker_thread_handle.join().unwrap(); } } From c453d29854d80f267016e25849f2b0c12a584ac8 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 14 May 2024 15:09:04 +0000 Subject: [PATCH 4/5] refactor: error handling Signed-off-by: Zhenchi --- src/flow/src/adapter/worker.rs | 53 +++++++++++++++++----------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 8e7a78146783..b1cea2956ab3 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use enum_as_inner::EnumAsInner; use hydroflow::scheduled::graph::Hydroflow; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::{broadcast, mpsc, Mutex}; use crate::adapter::error::{Error, EvalSnafu, FlowAlreadyExistSnafu, InternalSnafu}; @@ -32,6 +32,8 @@ use crate::repr::{self, DiffRow}; pub type SharedBuf = Arc>>; +type ReqId = usize; + /// Create both worker(`!Send`) and worker handle(`Send + Sync`) pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) { let (itc_client, itc_server) = create_inter_thread_call(); @@ -107,17 +109,15 @@ pub struct WorkerHandle { impl WorkerHandle { /// create task, return task id - /// - #[allow(clippy::too_many_arguments)] pub async fn create_flow(&self, create_reqs: Request) -> Result, Error> { - if !matches!(create_reqs, Request::Create { .. }) { - return InternalSnafu { + ensure!( + matches!(create_reqs, Request::Create { .. }), + InternalSnafu { reason: format!( "Flow Node/Worker itc failed, expect Request::Create, found {create_reqs:?}" ), } - .fail(); - } + ); let ret = self .itc_client @@ -139,14 +139,13 @@ impl WorkerHandle { pub async fn remove_flow(&self, flow_id: FlowId) -> Result { let req = Request::Remove { flow_id }; let ret = self.itc_client.lock().await.call_blocking(req).await?; - if let Response::Remove { result } = ret { - Ok(result) - } else { + + ret.into_remove().map_err(|ret| { InternalSnafu { reason: format!("Flow Node/Worker failed, expect Response::Remove, found {ret:?}"), } - .fail() - } + .build() + }) } /// trigger running the worker, will not block, and will run the worker parallelly @@ -163,19 +162,15 @@ impl WorkerHandle { pub async fn contains_flow(&self, flow_id: FlowId) -> Result { let req = Request::ContainTask { flow_id }; let ret = self.itc_client.lock().await.call_blocking(req).await?; - if let Response::ContainTask { - result: task_contain_result, - } = ret - { - Ok(task_contain_result) - } else { + + ret.into_contain_task().map_err(|ret| { InternalSnafu { reason: format!( "Flow Node/Worker itc failed, expect Response::ContainTask, found {ret:?}" ), } - .fail() - } + .build() + }) } /// shutdown the worker @@ -285,7 +280,7 @@ impl<'s> Worker<'s> { /// handle request, return response if any, Err if receive shutdown signal /// /// return `Err(())` if receive shutdown request - fn handle_req(&mut self, req_id: usize, req: Request) -> Result, ()> { + fn handle_req(&mut self, req_id: ReqId, req: Request) -> Result, ()> { let ret = match req { Request::Create { flow_id, @@ -388,7 +383,7 @@ fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) }; (client, server) } -type ReqId = usize; + #[derive(Debug)] struct InterThreadCallClient { call_id: AtomicUsize, @@ -405,6 +400,7 @@ impl InterThreadCallClient { .send((call_id, req)) .map_err(from_send_error) } + /// call blocking, and return the result async fn call_blocking(&mut self, req: Request) -> Result { // TODO(discord9): relax memory order later @@ -412,11 +408,15 @@ impl InterThreadCallClient { self.arg_sender .send((call_id, req)) .map_err(from_send_error)?; + // TODO(discord9): better inter thread call impl, i.e. support multiple client(also consider if it's necessary) // since one node manger might manage multiple worker, but one worker should only belong to one node manager - let (ret_call_id, ret) = self.ret_recv.recv().await.ok_or_else(||InternalSnafu { - reason: "InterThreadCallClient call_blocking failed, ret_recv has been closed and there are no remaining messages in the channel's buffer", - }.build())?; + let (ret_call_id, ret) = self + .ret_recv + .recv() + .await + .context(InternalSnafu { reason: "InterThreadCallClient call_blocking failed, ret_recv has been closed and there are no remaining messages in the channel's buffer" })?; + ensure!( ret_call_id == call_id, InternalSnafu { @@ -443,7 +443,7 @@ impl InterThreadCallServer { } /// Send response back to the client - pub fn resp(&self, call_id: usize, resp: Response) -> Result<(), Error> { + pub fn resp(&self, call_id: ReqId, resp: Response) -> Result<(), Error> { self.ret_sender .send((call_id, resp)) .map_err(from_send_error) @@ -465,6 +465,7 @@ mod test { use crate::expr::Id; use crate::plan::Plan; use crate::repr::{RelationType, Row}; + #[tokio::test] pub async fn test_simple_get_with_worker_and_handle() { let (tx, rx) = oneshot::channel(); From 0bb580837cb0c54c42431961d5d7671bdba6047a Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 14 May 2024 15:49:03 +0000 Subject: [PATCH 5/5] chore: merge origin/main Signed-off-by: Zhenchi --- src/flow/src/adapter/worker.rs | 2 +- src/flow/src/compute/render.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index b1cea2956ab3..42da2e3d111d 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -226,7 +226,7 @@ impl<'s> Worker<'s> { ctx.insert_global(*source_id, bundle); } - let rendered = ctx.render_plan(plan.plan)?; + let rendered = ctx.render_plan(plan)?; ctx.render_unbounded_sink(rendered, sink_sender); } self.task_states.insert(flow_id, cur_task_state); diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index c1c4b37cbb4e..0476c8a6e5ac 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -113,11 +113,11 @@ impl<'referred, 'df> Context<'referred, 'df> { reduce_plan, } => self.render_reduce(input, key_val_plan, reduce_plan), Plan::Join { .. } => NotImplementedSnafu { - reason: "Join is still WIP".to_string(), + reason: "Join is still WIP", } .fail(), Plan::Union { .. } => NotImplementedSnafu { - reason: "Union is still WIP".to_string(), + reason: "Union is still WIP", } .fail(), }