diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index ac3f35ad0b8a3..22da99e58a414 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -165,7 +165,7 @@ impl StreamService for StreamServiceImpl { } self.mgr - .send_barrier(&barrier, req.actor_ids_to_send, req.actor_ids_to_collect) + .send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect) .await?; Ok(Response::new(InjectBarrierResponse { diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 4008ac1bf3b6a..ffc29c2d25dac 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -219,7 +219,7 @@ where .into())); // Collect barriers to local barrier manager - self.context.lock_barrier_manager().collect(id, &barrier); + self.context.barrier_manager().collect(id, &barrier); // Then stop this actor if asked if barrier.is_stop(id) { diff --git a/src/stream/src/executor/chain.rs b/src/stream/src/executor/chain.rs index a4e03cf295641..0228f826a4bac 100644 --- a/src/stream/src/executor/chain.rs +++ b/src/stream/src/executor/chain.rs @@ -125,7 +125,6 @@ impl Executor for ChainExecutor { #[cfg(test)] mod test { use std::default::Default; - use std::sync::Arc; use futures::StreamExt; use risingwave_common::array::stream_chunk::StreamChunkTestExt; @@ -144,8 +143,7 @@ mod test { #[tokio::test] async fn test_basic() { let barrier_manager = LocalBarrierManager::for_test(); - let progress = - CreateMviewProgress::for_test(Arc::new(parking_lot::Mutex::new(barrier_manager))); + let progress = CreateMviewProgress::for_test(barrier_manager); let actor_id = progress.actor_id(); let schema = Schema::new(vec![Field::unnamed(DataType::Int64)]); diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index cc55d720083a6..1ce6f27ded573 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -155,7 +155,6 @@ impl Executor for ValuesExecutor { #[cfg(test)] mod tests { - use std::sync::Arc; use futures::StreamExt; use risingwave_common::array::{ @@ -174,8 +173,7 @@ mod tests { #[tokio::test] async fn test_values() { let barrier_manager = LocalBarrierManager::for_test(); - let progress = - CreateMviewProgress::for_test(Arc::new(parking_lot::Mutex::new(barrier_manager))); + let progress = CreateMviewProgress::for_test(barrier_manager); let actor_id = progress.actor_id(); let (tx, barrier_receiver) = unbounded_channel(); let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]); diff --git a/src/stream/src/from_proto/barrier_recv.rs b/src/stream/src/from_proto/barrier_recv.rs index 4fd779105a319..8d834642147f7 100644 --- a/src/stream/src/from_proto/barrier_recv.rs +++ b/src/stream/src/from_proto/barrier_recv.rs @@ -37,7 +37,7 @@ impl ExecutorBuilder for BarrierRecvExecutorBuilder { let (sender, barrier_receiver) = unbounded_channel(); stream .context - .lock_barrier_manager() + .barrier_manager() .register_sender(params.actor_context.id, sender); Ok(BarrierRecvExecutor::new(params.actor_context, params.info, barrier_receiver).boxed()) diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index 5ed8145178d85..601c1ec3ad585 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -36,7 +36,7 @@ impl ExecutorBuilder for NowExecutorBuilder { let (sender, barrier_receiver) = unbounded_channel(); stream .context - .lock_barrier_manager() + .barrier_manager() .register_sender(params.actor_context.id, sender); let state_table = diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 8234188e9a4fc..7927ed3aa1827 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -44,7 +44,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { let (sender, barrier_receiver) = unbounded_channel(); stream .context - .lock_barrier_manager() + .barrier_manager() .register_sender(params.actor_context.id, sender); let system_params = params.env.system_params_manager_ref().get_params(); diff --git a/src/stream/src/from_proto/values.rs b/src/stream/src/from_proto/values.rs index 4518e575e8602..a45393e200a5e 100644 --- a/src/stream/src/from_proto/values.rs +++ b/src/stream/src/from_proto/values.rs @@ -39,7 +39,7 @@ impl ExecutorBuilder for ValuesExecutorBuilder { let (sender, barrier_receiver) = unbounded_channel(); stream .context - .lock_barrier_manager() + .barrier_manager() .register_sender(params.actor_context.id, sender); let progress = stream .context diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 03b044f74ef74..0a8eede9e172a 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -13,18 +13,18 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use anyhow::anyhow; use prometheus::HistogramTimer; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; use self::managed_state::ManagedBarrierState; use crate::error::{StreamError, StreamResult}; -use crate::executor::*; use crate::task::ActorId; mod managed_state; @@ -35,6 +35,11 @@ mod tests; pub use progress::CreateMviewProgress; use risingwave_storage::StateStoreImpl; +use crate::executor::monitor::StreamingMetrics; +use crate::executor::Barrier; +use crate::task::barrier_manager::progress::BackfillState; +use crate::task::barrier_manager::LocalBarrierEvent::{ReportActorCollected, ReportActorFailure}; + /// If enabled, all actors will be grouped in the same tracing span within one epoch. /// Note that this option will significantly increase the overhead of tracing. pub const ENABLE_BARRIER_AGGREGATION: bool = false; @@ -49,29 +54,53 @@ pub struct CollectResult { pub kind: BarrierKind, } -enum BarrierState { - /// `Local` mode should be only used for tests. In this mode, barriers are not managed or - /// collected, and there's no way to know whether or when a barrier is finished. +enum LocalBarrierEvent { + RegisterSender { + actor_id: ActorId, + sender: UnboundedSender, + }, + InjectBarrier { + barrier: Barrier, + actor_ids_to_send: Vec, + actor_ids_to_collect: Vec, + result_sender: oneshot::Sender>, + }, + Reset, + ReportActorCollected { + actor_id: ActorId, + barrier: Barrier, + }, + ReportActorFailure { + actor_id: ActorId, + err: StreamError, + }, + CollectEpoch { + epoch: u64, + result_sender: oneshot::Sender>, + }, + ReportCreateProgress { + current_epoch: u64, + actor: ActorId, + state: BackfillState, + }, #[cfg(test)] - Local, - - /// In `Managed` mode, barriers are sent and collected according to the request from meta - /// service. When the barrier is finished, the caller can be notified about this. - Managed(ManagedBarrierState), + Flush(oneshot::Sender<()>), } -/// [`LocalBarrierManager`] manages barrier control flow, used by local stream manager. -/// Specifically, [`LocalBarrierManager`] serve barrier injection from meta server, send the +/// [`LocalBarrierWorker`] manages barrier control flow, used by local stream manager. +/// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the /// barriers to and collect them from all actors, and finally report the progress. -pub struct LocalBarrierManager { +struct LocalBarrierWorker { /// Stores all streaming job source sender. senders: HashMap>>, /// Current barrier collection state. - state: BarrierState, + state: ManagedBarrierState, /// Save collect `CompleteReceiver`. collect_complete_receiver: HashMap, + + streaming_metrics: Arc, } /// Information used after collection. @@ -79,27 +108,77 @@ pub struct CompleteReceiver { /// Notify all actors of completion of collection. pub complete_receiver: Option>>, /// `barrier_inflight_timer`'s metrics. - pub barrier_inflight_timer: Option, + pub barrier_inflight_timer: HistogramTimer, /// The kind of barrier. pub kind: BarrierKind, } -impl LocalBarrierManager { - fn with_state(state: BarrierState) -> Self { +impl LocalBarrierWorker { + fn new(state_store: StateStoreImpl, streaming_metrics: Arc) -> Self { Self { senders: HashMap::new(), - state, + state: ManagedBarrierState::new(state_store), collect_complete_receiver: HashMap::default(), + streaming_metrics, } } - /// Create a [`LocalBarrierManager`] with managed mode. - pub fn new(state_store: StateStoreImpl) -> Self { - Self::with_state(BarrierState::Managed(ManagedBarrierState::new(state_store))) + async fn run(mut self, mut event_rx: UnboundedReceiver) { + while let Some(event) = event_rx.recv().await { + match event { + LocalBarrierEvent::RegisterSender { actor_id, sender } => { + self.register_sender(actor_id, sender); + } + LocalBarrierEvent::InjectBarrier { + barrier, + actor_ids_to_send, + actor_ids_to_collect, + result_sender, + } => { + let timer = self + .streaming_metrics + .barrier_inflight_latency + .start_timer(); + let result = + self.send_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect, timer); + let _ = result_sender.send(result).inspect_err(|e| { + warn!(err=?e, "fail to send inject barrier result"); + }); + } + LocalBarrierEvent::Reset => { + self.reset(); + } + ReportActorCollected { actor_id, barrier } => self.collect(actor_id, &barrier), + ReportActorFailure { actor_id, err } => { + self.notify_failure(actor_id, err); + } + LocalBarrierEvent::CollectEpoch { + epoch, + result_sender, + } => { + let result = self.remove_collect_rx(epoch); + let _ = result_sender.send(result).inspect_err(|e| { + warn!(err=?e.as_ref().map(|_|()), "fail to send collect epoch result"); + }); + } + LocalBarrierEvent::ReportCreateProgress { + current_epoch, + actor, + state, + } => { + self.update_create_mview_progress(current_epoch, actor, state); + } + #[cfg(test)] + LocalBarrierEvent::Flush(sender) => sender.send(()).unwrap(), + } + } } +} +// event handler +impl LocalBarrierWorker { /// Register sender for source actors, used to send barriers. - pub fn register_sender(&mut self, actor_id: ActorId, sender: UnboundedSender) { + fn register_sender(&mut self, actor_id: ActorId, sender: UnboundedSender) { tracing::debug!( target: "events::stream::barrier::manager", actor_id = actor_id, @@ -108,28 +187,16 @@ impl LocalBarrierManager { self.senders.entry(actor_id).or_default().push(sender); } - /// Return all senders. - pub fn all_senders(&self) -> HashSet { - self.senders.keys().cloned().collect() - } - /// Broadcast a barrier to all senders. Save a receiver which will get notified when this /// barrier is finished, in managed mode. - pub fn send_barrier( + fn send_barrier( &mut self, barrier: &Barrier, actor_ids_to_send: impl IntoIterator, actor_ids_to_collect: impl IntoIterator, - timer: Option, + timer: HistogramTimer, ) -> StreamResult<()> { - let to_send = { - let to_send: HashSet = actor_ids_to_send.into_iter().collect(); - match &self.state { - #[cfg(test)] - BarrierState::Local if to_send.is_empty() => self.senders.keys().cloned().collect(), - _ => to_send, - } - }; + let to_send: HashSet = actor_ids_to_send.into_iter().collect(); let to_collect: HashSet = actor_ids_to_collect.into_iter().collect(); debug!( target: "events::stream::barrier::manager::send", @@ -139,19 +206,11 @@ impl LocalBarrierManager { to_collect ); - let rx = match &mut self.state { - #[cfg(test)] - BarrierState::Local => None, - - BarrierState::Managed(state) => { - // There must be some actors to collect from. - assert!(!to_collect.is_empty()); + // There must be some actors to collect from. + assert!(!to_collect.is_empty()); - let (tx, rx) = oneshot::channel(); - state.transform_to_issued(barrier, to_collect, tx)?; - Some(rx) - } - }; + let (tx, rx) = oneshot::channel(); + self.state.transform_to_issued(barrier, to_collect, tx)?; for actor_id in to_send { match self.senders.get(&actor_id) { @@ -192,7 +251,7 @@ impl LocalBarrierManager { self.collect_complete_receiver.insert( barrier.epoch.prev, CompleteReceiver { - complete_receiver: rx, + complete_receiver: Some(rx), barrier_inflight_timer: timer, kind: barrier.kind, }, @@ -201,7 +260,7 @@ impl LocalBarrierManager { } /// Use `prev_epoch` to remove collect rx and return rx. - pub fn remove_collect_rx(&mut self, prev_epoch: u64) -> StreamResult { + fn remove_collect_rx(&mut self, prev_epoch: u64) -> StreamResult { // It's still possible that `collect_complete_receiver` does not contain the target epoch // when receiving collect_barrier request. Because `collect_complete_receiver` could // be cleared when CN is under recovering. We should return error rather than panic. @@ -217,55 +276,116 @@ impl LocalBarrierManager { } /// Reset all internal states. - pub fn reset(&mut self) { + fn reset(&mut self) { self.senders.clear(); self.collect_complete_receiver.clear(); - match &mut self.state { - #[cfg(test)] - BarrierState::Local => {} - - BarrierState::Managed(managed_state) => { - managed_state.clear_all_states(); - } - } + self.state.clear_all_states(); } - /// When a [`StreamConsumer`] (typically [`DispatchExecutor`]) get a barrier, it should report + /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report /// and collect this barrier with its own `actor_id` using this function. - pub fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) { - match &mut self.state { - #[cfg(test)] - BarrierState::Local => {} - - BarrierState::Managed(managed_state) => { - managed_state.collect(actor_id, barrier); - } - } + fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) { + self.state.collect(actor_id, barrier) } /// When a actor exit unexpectedly, it should report this event using this function, so meta /// will notice actor's exit while collecting. - pub fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { - match &mut self.state { - #[cfg(test)] - BarrierState::Local => {} + fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { + self.state.notify_failure(actor_id, err) + } +} - BarrierState::Managed(managed_state) => { - managed_state.notify_failure(actor_id, err); - } +#[derive(Clone)] +pub struct LocalBarrierManager { + barrier_event_sender: UnboundedSender, +} + +impl LocalBarrierManager { + /// Create a [`LocalBarrierWorker`] with managed mode. + pub fn new(state_store: StateStoreImpl, streaming_metrics: Arc) -> Self { + let (tx, rx) = unbounded_channel(); + let worker = LocalBarrierWorker::new(state_store, streaming_metrics); + let _join_handle = tokio::spawn(worker.run(rx)); + Self { + barrier_event_sender: tx, } } + + fn send_event(&self, event: LocalBarrierEvent) { + self.barrier_event_sender + .send(event) + .expect("should be able to send event") + } +} + +impl LocalBarrierManager { + /// Register sender for source actors, used to send barriers. + pub fn register_sender(&self, actor_id: ActorId, sender: UnboundedSender) { + self.send_event(LocalBarrierEvent::RegisterSender { actor_id, sender }); + } + + /// Broadcast a barrier to all senders. Save a receiver which will get notified when this + /// barrier is finished, in managed mode. + pub async fn send_barrier( + &self, + barrier: Barrier, + actor_ids_to_send: impl IntoIterator, + actor_ids_to_collect: impl IntoIterator, + ) -> StreamResult<()> { + let (tx, rx) = oneshot::channel(); + self.send_event(LocalBarrierEvent::InjectBarrier { + barrier, + actor_ids_to_send: actor_ids_to_send.into_iter().collect(), + actor_ids_to_collect: actor_ids_to_collect.into_iter().collect(), + result_sender: tx, + }); + rx.await.expect("should receive response") + } + + /// Use `prev_epoch` to remove collect rx and return rx. + pub async fn remove_collect_rx(&self, prev_epoch: u64) -> StreamResult { + let (tx, rx) = oneshot::channel(); + self.send_event(LocalBarrierEvent::CollectEpoch { + epoch: prev_epoch, + result_sender: tx, + }); + rx.await.expect("should receive response") + } + + /// Reset all internal states. + pub fn reset(&self) { + self.send_event(LocalBarrierEvent::Reset) + } + + /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report + /// and collect this barrier with its own `actor_id` using this function. + pub fn collect(&self, actor_id: ActorId, barrier: &Barrier) { + self.send_event(ReportActorCollected { + actor_id, + barrier: barrier.clone(), + }) + } + + /// When a actor exit unexpectedly, it should report this event using this function, so meta + /// will notice actor's exit while collecting. + pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) { + self.send_event(ReportActorFailure { actor_id, err }) + } } #[cfg(test)] impl LocalBarrierManager { pub fn for_test() -> Self { - Self::with_state(BarrierState::Local) + Self::new( + StateStoreImpl::for_test(), + Arc::new(StreamingMetrics::unused()), + ) } - /// Returns whether [`BarrierState`] is `Local`. - pub fn is_local_mode(&self) -> bool { - !matches!(self.state, BarrierState::Managed(_)) + pub async fn flush_all_events(&self) { + let (tx, rx) = oneshot::channel(); + self.send_event(LocalBarrierEvent::Flush(tx)); + rx.await.unwrap() } } diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index e3ce3873146ed..2b83050a57d9a 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use super::{BarrierState, LocalBarrierManager}; +use super::LocalBarrierManager; +use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress; +use crate::task::barrier_manager::LocalBarrierWorker; use crate::task::{ActorId, SharedContext}; type ConsumedEpoch = u64; @@ -26,25 +26,33 @@ pub(super) enum BackfillState { Done(ConsumedRows), } -impl LocalBarrierManager { - fn update_create_mview_progress( +impl LocalBarrierWorker { + pub(crate) fn update_create_mview_progress( &mut self, current_epoch: u64, actor: ActorId, state: BackfillState, ) { - match &mut self.state { - #[cfg(test)] - BarrierState::Local => {} + self.state + .create_mview_progress + .entry(current_epoch) + .or_default() + .insert(actor, state); + } +} - BarrierState::Managed(managed_state) => { - managed_state - .create_mview_progress - .entry(current_epoch) - .or_default() - .insert(actor, state); - } - } +impl LocalBarrierManager { + fn update_create_mview_progress( + &self, + current_epoch: u64, + actor: ActorId, + state: BackfillState, + ) { + self.send_event(ReportCreateProgress { + current_epoch, + actor, + state, + }) } } @@ -79,7 +87,7 @@ impl LocalBarrierManager { /// for arrangement backfill. We can use that to estimate the progress as well, and avoid recording /// `row_count` state for it. pub struct CreateMviewProgress { - barrier_manager: Arc>, + barrier_manager: LocalBarrierManager, /// The id of the actor containing the backfill executors. backfill_actor_id: ActorId, @@ -88,10 +96,7 @@ pub struct CreateMviewProgress { } impl CreateMviewProgress { - pub fn new( - barrier_manager: Arc>, - backfill_actor_id: ActorId, - ) -> Self { + pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self { Self { barrier_manager, backfill_actor_id, @@ -100,7 +105,7 @@ impl CreateMviewProgress { } #[cfg(test)] - pub fn for_test(barrier_manager: Arc>) -> Self { + pub fn for_test(barrier_manager: LocalBarrierManager) -> Self { Self::new(barrier_manager, 0) } @@ -110,7 +115,7 @@ impl CreateMviewProgress { fn update_inner(&mut self, current_epoch: u64, state: BackfillState) { self.state = Some(state); - self.barrier_manager.lock().update_create_mview_progress( + self.barrier_manager.update_create_mview_progress( current_epoch, self.backfill_actor_id, state, diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index fa129b411b0f2..49e8f0819f7fb 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -21,8 +21,7 @@ use super::*; #[tokio::test] async fn test_managed_barrier_collection() -> StreamResult<()> { - let mut manager = LocalBarrierManager::new(StateStoreImpl::for_test()); - assert!(!manager.is_local_mode()); + let manager = LocalBarrierManager::for_test(); let register_sender = |actor_id: u32| { let (barrier_tx, barrier_rx) = unbounded_channel(); @@ -43,9 +42,10 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { let epoch = 114514; let barrier = Barrier::new_test_barrier(epoch); manager - .send_barrier(&barrier, actor_ids.clone(), actor_ids, None) + .send_barrier(barrier.clone(), actor_ids.clone(), actor_ids) + .await .unwrap(); - let mut complete_receiver = manager.remove_collect_rx(barrier.epoch.prev)?; + let mut complete_receiver = manager.remove_collect_rx(barrier.epoch.prev).await?; // Collect barriers from actors let collected_barriers = rxs .iter_mut() @@ -59,6 +59,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { // Report to local barrier manager for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() { manager.collect(actor_id, &barrier); + manager.flush_all_events().await; let notified = complete_receiver .complete_receiver .as_mut() @@ -73,8 +74,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { #[tokio::test] async fn test_managed_barrier_collection_before_send_request() -> StreamResult<()> { - let mut manager = LocalBarrierManager::new(StateStoreImpl::for_test()); - assert!(!manager.is_local_mode()); + let manager = LocalBarrierManager::for_test(); let register_sender = |actor_id: u32| { let (barrier_tx, barrier_rx) = unbounded_channel(); @@ -107,9 +107,10 @@ async fn test_managed_barrier_collection_before_send_request() -> StreamResult<( // Send the barrier to all actors manager - .send_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect, None) + .send_barrier(barrier.clone(), actor_ids_to_send, actor_ids_to_collect) + .await .unwrap(); - let mut complete_receiver = manager.remove_collect_rx(barrier.epoch.prev)?; + let mut complete_receiver = manager.remove_collect_rx(barrier.epoch.prev).await?; // Collect barriers from actors let collected_barriers = rxs @@ -124,6 +125,7 @@ async fn test_managed_barrier_collection_before_send_request() -> StreamResult<( // Report to local barrier manager for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() { manager.collect(actor_id, &barrier); + manager.flush_all_events().await; let notified = complete_receiver .complete_receiver .as_mut() diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index 42ad57aa86d79..b1f1da526bcc5 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -34,6 +34,8 @@ pub use env::*; use risingwave_storage::StateStoreImpl; pub use stream_manager::*; +use crate::executor::monitor::StreamingMetrics; + pub type ConsumableChannelPair = (Option, Option); pub type ActorId = u32; pub type FragmentId = u32; @@ -77,7 +79,7 @@ pub struct SharedContext { // disconnected. pub(crate) compute_client_pool: ComputeClientPool, - pub(crate) barrier_manager: Arc>, + pub(crate) barrier_manager: LocalBarrierManager, pub(crate) config: StreamingConfig, } @@ -91,13 +93,18 @@ impl std::fmt::Debug for SharedContext { } impl SharedContext { - pub fn new(addr: HostAddr, state_store: StateStoreImpl, config: &StreamingConfig) -> Self { + pub fn new( + addr: HostAddr, + state_store: StateStoreImpl, + config: &StreamingConfig, + streaming_metrics: Arc, + ) -> Self { Self { channel_map: Default::default(), actor_infos: Default::default(), addr, compute_client_pool: ComputeClientPool::default(), - barrier_manager: Arc::new(Mutex::new(LocalBarrierManager::new(state_store))), + barrier_manager: LocalBarrierManager::new(state_store, streaming_metrics), config: config.clone(), } } @@ -111,9 +118,7 @@ impl SharedContext { actor_infos: Default::default(), addr: LOCAL_TEST_ADDR.clone(), compute_client_pool: ComputeClientPool::default(), - barrier_manager: Arc::new(Mutex::new(LocalBarrierManager::new( - StateStoreImpl::for_test(), - ))), + barrier_manager: LocalBarrierManager::for_test(), config: StreamingConfig { developer: StreamingDeveloperConfig { exchange_initial_permits: permit::for_test::INITIAL_PERMITS, @@ -126,8 +131,8 @@ impl SharedContext { } } - pub fn lock_barrier_manager(&self) -> MutexGuard<'_, LocalBarrierManager> { - self.barrier_manager.lock() + pub fn barrier_manager(&self) -> &LocalBarrierManager { + &self.barrier_manager } /// Get the channel pair for the given actor ids. If the channel pair does not exist, create one diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index a984f2cee58a3..57c5f12612aae 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -101,7 +101,6 @@ pub struct LocalStreamManager { // Maintain a copy of the core to reduce async locks state_store: StateStoreImpl, context: Arc, - streaming_metrics: Arc, total_mem_val: Arc>, } @@ -174,7 +173,6 @@ impl LocalStreamManager { Self { state_store: core.state_store.clone(), context: core.context.clone(), - streaming_metrics: core.streaming_metrics.clone(), total_mem_val: core.total_mem_val.clone(), core: Mutex::new(core), } @@ -238,40 +236,33 @@ impl LocalStreamManager { /// Broadcast a barrier to all senders. Save a receiver in barrier manager pub async fn send_barrier( &self, - barrier: &Barrier, + barrier: Barrier, actor_ids_to_send: impl IntoIterator, actor_ids_to_collect: impl IntoIterator, ) -> StreamResult<()> { - let timer = self - .streaming_metrics - .barrier_inflight_latency - .start_timer(); if barrier.kind == BarrierKind::Initial { let core = self.core.lock().await; core.get_watermark_epoch() .store(barrier.epoch.curr, std::sync::atomic::Ordering::SeqCst); } - let mut barrier_manager = self.context.lock_barrier_manager(); - barrier_manager.send_barrier( - barrier, - actor_ids_to_send, - actor_ids_to_collect, - Some(timer), - )?; + let barrier_manager = self.context.barrier_manager(); + barrier_manager + .send_barrier(barrier, actor_ids_to_send, actor_ids_to_collect) + .await?; Ok(()) } /// Reset the state of the barrier manager. pub fn reset_barrier_manager(&self) { - self.context.lock_barrier_manager().reset(); + self.context.barrier_manager().reset(); } /// Use `epoch` to find collect rx. And wait for all actor to be collected before /// returning. pub async fn collect_barrier(&self, epoch: u64) -> StreamResult { let complete_receiver = { - let mut barrier_manager = self.context.lock_barrier_manager(); - barrier_manager.remove_collect_rx(epoch)? + let barrier_manager = self.context.barrier_manager(); + barrier_manager.remove_collect_rx(epoch).await? }; // Wait for all actors finishing this barrier. let result = complete_receiver @@ -279,10 +270,7 @@ impl LocalStreamManager { .expect("no rx for local mode") .await .context("failed to collect barrier")??; - complete_receiver - .barrier_inflight_timer - .expect("no timer for test") - .observe_duration(); + complete_receiver.barrier_inflight_timer.observe_duration(); Ok(result) } @@ -314,23 +302,6 @@ impl LocalStreamManager { }); } - /// Broadcast a barrier to all senders. Returns immediately, and caller won't be notified when - /// this barrier is finished. - #[cfg(test)] - pub fn send_barrier_for_test(&self, barrier: &Barrier) -> StreamResult<()> { - use std::iter::empty; - - let mut barrier_manager = self.context.lock_barrier_manager(); - assert!(barrier_manager.is_local_mode()); - let timer = self - .streaming_metrics - .barrier_inflight_latency - .start_timer(); - barrier_manager.send_barrier(barrier, empty(), empty(), Some(timer))?; - barrier_manager.remove_collect_rx(barrier.epoch.prev)?; - Ok(()) - } - /// Drop the resources of the given actors. pub async fn drop_actors(&self, actors: &[ActorId]) -> StreamResult<()> { let mut core = self.core.lock().await; @@ -404,7 +375,12 @@ impl LocalStreamManagerCore { config: StreamingConfig, await_tree_config: Option, ) -> Self { - let context = SharedContext::new(addr, state_store.clone(), &config); + let context = SharedContext::new( + addr, + state_store.clone(), + &config, + streaming_metrics.clone(), + ); Self::new_inner( state_store, context, @@ -693,7 +669,7 @@ impl LocalStreamManagerCore { // TODO: check error type and panic if it's unexpected. // Intentionally use `?` on the report to also include the backtrace. tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error"); - context.lock_barrier_manager().notify_failure(actor_id, err); + context.barrier_manager().notify_failure(actor_id, err); } }; let traced = match &mut self.await_tree_reg {