From c017c75dbe7a13d55df885cd2fc3febe91eea084 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 31 Jan 2024 11:00:55 +0800 Subject: [PATCH] feat(meta): inject and collect barrier in bidi stream --- proto/stream_service.proto | 22 +- src/compute/src/rpc/service/stream_service.rs | 121 ++----- src/meta/src/stream/stream_manager.rs | 34 +- src/rpc_client/src/stream_client.rs | 43 ++- src/stream/src/task/barrier_manager.rs | 294 ++++++++++-------- .../src/task/barrier_manager/managed_state.rs | 29 -- src/stream/src/task/stream_manager.rs | 41 +-- 7 files changed, 294 insertions(+), 290 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 462f5ff0256a..d5778f0c0966 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -103,15 +103,31 @@ message WaitEpochCommitResponse { common.Status status = 1; } +message StreamingControlStreamRequest { + message InitRequest {} + + oneof request { + InitRequest init = 1; + InjectBarrierRequest inject_barrier = 2; + } +} + +message StreamingControlStreamResponse { + message InitResponse {} + + oneof response { + InitResponse init = 1; + BarrierCompleteResponse complete_barrier = 2; + } +} + service StreamService { rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse); rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse); rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse); rpc DropActors(DropActorsRequest) returns (DropActorsResponse); - rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse); - rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse); - rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse); rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); + rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse); } // TODO: Lifecycle management for actors. diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index def9a534586b..121f31787b03 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -13,18 +13,17 @@ // limitations under the License. use await_tree::InstrumentAwait; -use itertools::Itertools; -use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; -use risingwave_hummock_sdk::LocalSstableInfo; -use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo; +use futures::{Stream, StreamExt, TryStreamExt}; use risingwave_pb::stream_service::stream_service_server::StreamService; +use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; use risingwave_stream::error::StreamError; -use risingwave_stream::executor::Barrier; -use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment}; +use risingwave_stream::task::{LocalStreamManager, StreamEnvironment}; use thiserror_ext::AsReport; -use tonic::{Request, Response, Status}; +use tokio::sync::mpsc::unbounded_channel; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic::{Request, Response, Status, Streaming}; #[derive(Clone)] pub struct StreamServiceImpl { @@ -40,6 +39,9 @@ impl StreamServiceImpl { #[async_trait::async_trait] impl StreamService for StreamServiceImpl { + type StreamingControlStreamStream = + impl Stream>; + #[cfg_attr(coverage, coverage(off))] async fn update_actors( &self, @@ -110,86 +112,6 @@ impl StreamService for StreamServiceImpl { })) } - #[cfg_attr(coverage, coverage(off))] - async fn force_stop_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - self.mgr.reset().await; - Ok(Response::new(ForceStopActorsResponse { - request_id: req.request_id, - status: None, - })) - } - - #[cfg_attr(coverage, coverage(off))] - async fn inject_barrier( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - let barrier = - Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?; - - self.mgr - .send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect) - .await?; - - Ok(Response::new(InjectBarrierResponse { - request_id: req.request_id, - status: None, - })) - } - - #[cfg_attr(coverage, coverage(off))] - async fn barrier_complete( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - let BarrierCompleteResult { - create_mview_progress, - sync_result, - } = self - .mgr - .collect_barrier(req.prev_epoch) - .instrument_await(format!("collect_barrier (epoch {})", req.prev_epoch)) - .await - .inspect_err( - |err| tracing::error!(error = %err.as_report(), "failed to collect barrier"), - )?; - - let (synced_sstables, table_watermarks) = sync_result - .map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks)) - .unwrap_or_default(); - - Ok(Response::new(BarrierCompleteResponse { - request_id: req.request_id, - status: None, - create_mview_progress, - synced_sstables: synced_sstables - .into_iter() - .map( - |LocalSstableInfo { - compaction_group_id, - sst_info, - table_stats, - }| GroupedSstableInfo { - compaction_group_id, - sst: Some(sst_info), - table_stats_map: to_prost_table_stats_map(table_stats), - }, - ) - .collect_vec(), - worker_id: self.env.worker_id(), - table_watermarks: table_watermarks - .into_iter() - .map(|(key, value)| (key.table_id, value.to_protobuf())) - .collect(), - })) - } - #[cfg_attr(coverage, coverage(off))] async fn wait_epoch_commit( &self, @@ -210,4 +132,29 @@ impl StreamService for StreamServiceImpl { Ok(Response::new(WaitEpochCommitResponse { status: None })) } + + async fn streaming_control_stream( + &self, + request: Request>, + ) -> Result, Status> { + let mut stream = request.into_inner().boxed(); + let first_request = stream + .try_next() + .await? + .ok_or_else(|| Status::invalid_argument(format!("failed to receive first request")))?; + match first_request { + StreamingControlStreamRequest { + request: Some(streaming_control_stream_request::Request::Init(InitRequest {})), + } => {} + other => { + return Err(Status::invalid_argument(format!( + "unexpected first request: {:?}", + other + ))); + } + }; + let (tx, rx) = unbounded_channel(); + self.mgr.handle_new_control_stream(tx, stream); + Ok(Response::new(UnboundedReceiverStream::new(rx))) + } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 52976d744c20..118c0ace468a 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -812,6 +812,8 @@ mod tests { use std::sync::{Arc, Mutex}; use std::time::Duration; + use futures::stream::BoxStream; + use futures::Stream; use risingwave_common::catalog::TableId; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::system_param::reader::SystemParamsRead; @@ -833,7 +835,7 @@ mod tests { use tokio::sync::Notify; use tokio::task::JoinHandle; use tokio::time::sleep; - use tonic::{Request, Response, Status}; + use tonic::{Request, Response, Status, Streaming}; use super::*; use crate::barrier::GlobalBarrierManager; @@ -861,6 +863,9 @@ mod tests { #[async_trait::async_trait] impl StreamService for FakeStreamService { + type StreamingControlStreamStream = + impl Stream>; + async fn update_actors( &self, request: Request, @@ -923,26 +928,25 @@ mod tests { Ok(Response::new(ForceStopActorsResponse::default())) } - async fn inject_barrier( - &self, - _request: Request, - ) -> std::result::Result, Status> { - Ok(Response::new(InjectBarrierResponse::default())) - } - - async fn barrier_complete( - &self, - _request: Request, - ) -> std::result::Result, Status> { - Ok(Response::new(BarrierCompleteResponse::default())) - } - async fn wait_epoch_commit( &self, _request: Request, ) -> std::result::Result, Status> { Ok(Response::new(WaitEpochCommitResponse::default())) } + + async fn streaming_control_stream( + &self, + request: Request>, + ) -> Result, Status> { + Result::< + _, + BoxStream< + 'static, + std::result::Result, + >, + >::Err(Status::unimplemented("not implemented")) + } } struct MockServices { diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 3a271b5660bb..0f15da25f9d2 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -15,17 +15,22 @@ use std::sync::Arc; use std::time::Duration; +use anyhow::anyhow; use async_trait::async_trait; +use futures::TryStreamExt; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; use risingwave_common::monitor::connection::{EndpointExt, TcpConfig}; use risingwave_common::util::addr::HostAddr; use risingwave_pb::stream_service::stream_service_client::StreamServiceClient; +use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; +use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse; use risingwave_pb::stream_service::*; +use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Endpoint; -use crate::error::Result; +use crate::error::{Result, RpcError}; use crate::tracing::{Channel, TracingInjectedChannelExt}; -use crate::{rpc_client_method_impl, RpcClient, RpcClientPool}; +use crate::{rpc_client_method_impl, BidiStreamHandle, RpcClient, RpcClientPool}; #[derive(Clone)] pub struct StreamClient(StreamServiceClient); @@ -68,9 +73,6 @@ macro_rules! for_all_stream_rpc { ,{ 0, build_actors, BuildActorsRequest, BuildActorsResponse } ,{ 0, broadcast_actor_info_table, BroadcastActorInfoTableRequest, BroadcastActorInfoTableResponse } ,{ 0, drop_actors, DropActorsRequest, DropActorsResponse } - ,{ 0, force_stop_actors, ForceStopActorsRequest, ForceStopActorsResponse} - ,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse } - ,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse } ,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } } }; @@ -79,3 +81,34 @@ macro_rules! for_all_stream_rpc { impl StreamClient { for_all_stream_rpc! { rpc_client_method_impl } } + +pub type StreamingControlHandle = + BidiStreamHandle; + +impl StreamClient { + pub async fn start_streaming_control(&self) -> Result { + let first_request = StreamingControlStreamRequest { + request: Some(streaming_control_stream_request::Request::Init( + InitRequest {}, + )), + }; + let mut client = self.0.to_owned(); + let (handle, first_rsp) = BidiStreamHandle::initialize(first_request, |rx| async move { + client + .streaming_control_stream(ReceiverStream::new(rx)) + .await + .map(|response| response.into_inner().map_err(RpcError::from)) + .map_err(RpcError::from) + }) + .await?; + match first_rsp { + StreamingControlStreamResponse { + response: Some(streaming_control_stream_response::Response::Init(InitResponse {})), + } => {} + other => { + return Err(anyhow!("expect InitResponse but get {:?}", other).into()); + } + }; + Ok(handle) + } +} diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index b838314729ad..5800d2b17d14 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -13,19 +13,24 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; +use std::future::pending; use std::sync::Arc; use anyhow::anyhow; -use futures::stream::FuturesUnordered; -use futures::StreamExt; +use futures::stream::{BoxStream, FuturesUnordered}; +use futures::{StreamExt, TryStreamExt}; +use itertools::Itertools; use parking_lot::Mutex; -use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; +use risingwave_pb::stream_service::barrier_complete_response::{ + GroupedSstableInfo, PbCreateMviewProgress, +}; use rw_futures_util::{pending_on_none, AttachedFuture}; use thiserror_ext::AsReport; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use tokio::task::JoinHandle; +use tonic::Status; use self::managed_state::ManagedBarrierState; use crate::error::{IntoUnexpectedExit, StreamError, StreamResult}; @@ -38,8 +43,16 @@ mod tests; pub use progress::CreateMviewProgress; use risingwave_common::util::runtime::BackgroundShutdownRuntime; +use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; +use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::barrier::BarrierKind; +use risingwave_pb::stream_service::streaming_control_stream_request::Request; +use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse; +use risingwave_pb::stream_service::{ + streaming_control_stream_response, BarrierCompleteResponse, StreamingControlStreamRequest, + StreamingControlStreamResponse, +}; use risingwave_storage::store::SyncResult; use crate::executor::monitor::StreamingMetrics; @@ -62,17 +75,14 @@ pub struct BarrierCompleteResult { } pub(super) enum LocalBarrierEvent { + NewControlStream { + sender: UnboundedSender>, + request_stream: BoxStream<'static, Result>, + }, RegisterSender { actor_id: ActorId, sender: UnboundedSender, }, - InjectBarrier { - barrier: Barrier, - actor_ids_to_send: HashSet, - actor_ids_to_collect: HashSet, - result_sender: oneshot::Sender>, - }, - Reset(oneshot::Sender<()>), ReportActorCollected { actor_id: ActorId, barrier: Barrier, @@ -81,10 +91,6 @@ pub(super) enum LocalBarrierEvent { actor_id: ActorId, err: StreamError, }, - AwaitEpochCompleted { - epoch: u64, - result_sender: oneshot::Sender>, - }, ReportCreateProgress { current_epoch: u64, actor: ActorId, @@ -186,10 +192,10 @@ pub(super) struct LocalBarrierWorker { /// Current barrier collection state. state: ManagedBarrierState, - /// Record all unexpected exited actors. - failure_actors: HashMap, - - epoch_result_sender: HashMap>>, + control_stream_handle: Option<( + UnboundedSender>, + BoxStream<'static, Result>, + )>, pub(super) actor_manager: Arc, @@ -200,12 +206,11 @@ impl LocalBarrierWorker { pub(super) fn new(actor_manager: Arc) -> Self { Self { barrier_senders: HashMap::new(), - failure_actors: HashMap::default(), state: ManagedBarrierState::new( actor_manager.env.state_store(), actor_manager.streaming_metrics.clone(), ), - epoch_result_sender: HashMap::default(), + control_stream_handle: None, actor_manager, actor_manager_state: StreamActorManagerState::new(), } @@ -218,14 +223,36 @@ impl LocalBarrierWorker { self.handle_actor_created(sender, create_actors_result); } completed_epoch = self.state.next_completed_epoch() => { - self.on_epoch_completed(completed_epoch); + let result = self.on_epoch_completed(completed_epoch); + self.inspect_result(result); + }, + result = async { + if let Some((_, stream)) = &mut self.control_stream_handle { + stream.try_next().await.and_then(|opt| opt.ok_or_else(|| Status::internal("end of stream"))) + } else { + pending().await + } + } => { + match result { + Ok(request) => { + let result = self.handle_streaming_control_request(request); + self.inspect_result(result); + }, + Err(e) => { + self.reset_stream_with_err(Status::internal(format!("failed to receive request: {:?}", e.as_report()))); + } + } }, event = event_rx.recv() => { if let Some(event) = event { match event { - LocalBarrierEvent::Reset(finish_sender) => { + LocalBarrierEvent::NewControlStream { sender, request_stream } => { + self.reset_stream_with_err(Status::internal("control stream has been reset to a new one")); self.reset().await; - let _ = finish_sender.send(()); + self.control_stream_handle = Some((sender, request_stream)); + self.send_response(StreamingControlStreamResponse { + response: Some(streaming_control_stream_response::Response::Init(InitResponse {})) + }); } event => { self.handle_event(event); @@ -252,35 +279,66 @@ impl LocalBarrierWorker { let _ = sender.send(result); } + fn reset_stream_with_err(&mut self, err: Status) { + let (sender, _) = self + .control_stream_handle + .take() + .expect("should not be empty when called"); + warn!("control stream reset with: {:?}", err.as_report()); + if sender.send(Err(err)).is_err() { + warn!("failed to notify finish of control stream"); + } + } + + fn inspect_result(&mut self, result: StreamResult<()>) { + if let Err(e) = result { + self.reset_stream_with_err(Status::internal(format!("get error: {:?}", e.as_report()))); + } + } + + fn send_response(&mut self, response: StreamingControlStreamResponse) { + let (sender, _) = self + .control_stream_handle + .as_ref() + .expect("should not be None"); + if sender.send(Ok(response)).is_err() { + self.control_stream_handle = None; + warn!("fail to send response. control stream reset"); + } + } + + fn handle_streaming_control_request( + &mut self, + request: StreamingControlStreamRequest, + ) -> StreamResult<()> { + match request.request.expect("should not be empty") { + Request::InjectBarrier(req) => { + let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?; + self.send_barrier( + &barrier, + req.actor_ids_to_send.into_iter().collect(), + req.actor_ids_to_collect.into_iter().collect(), + )?; + Ok(()) + } + Request::Init(_) => { + unreachable!() + } + } + } + fn handle_event(&mut self, event: LocalBarrierEvent) { match event { LocalBarrierEvent::RegisterSender { actor_id, sender } => { self.register_sender(actor_id, sender); } - LocalBarrierEvent::InjectBarrier { - barrier, - actor_ids_to_send, - actor_ids_to_collect, - result_sender, - } => { - let result = self.send_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect); - let _ = result_sender.send(result).inspect_err(|e| { - warn!(err=?e, "fail to send inject barrier result"); - }); - } - LocalBarrierEvent::Reset(_) => { - unreachable!("Reset event should be handled separately in async context") + LocalBarrierEvent::NewControlStream { .. } => { + unreachable!("NewControlStream event should be handled separately in async context") } ReportActorCollected { actor_id, barrier } => self.collect(actor_id, &barrier), ReportActorFailure { actor_id, err } => { self.notify_failure(actor_id, err); } - LocalBarrierEvent::AwaitEpochCompleted { - epoch, - result_sender, - } => { - self.await_epoch_completed(epoch, result_sender); - } LocalBarrierEvent::ReportCreateProgress { current_epoch, actor, @@ -314,17 +372,55 @@ impl LocalBarrierWorker { // event handler impl LocalBarrierWorker { - fn on_epoch_completed(&mut self, epoch: u64) { - if let Some(sender) = self.epoch_result_sender.remove(&epoch) { - let result = self - .state - .pop_completed_epoch(epoch) - .expect("should exist") - .expect("should have completed"); - if sender.send(result).is_err() { - warn!(epoch, "fail to send epoch complete result"); - } - } + fn on_epoch_completed(&mut self, epoch: u64) -> StreamResult<()> { + let result = self + .state + .pop_completed_epoch(epoch) + .expect("should exist") + .expect("should have completed")?; + + let BarrierCompleteResult { + create_mview_progress, + sync_result, + } = result; + + let (synced_sstables, table_watermarks) = sync_result + .map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks)) + .unwrap_or_default(); + + let result = StreamingControlStreamResponse { + response: Some( + streaming_control_stream_response::Response::CompleteBarrier( + BarrierCompleteResponse { + request_id: "todo".to_string(), + status: None, + create_mview_progress, + synced_sstables: synced_sstables + .into_iter() + .map( + |LocalSstableInfo { + compaction_group_id, + sst_info, + table_stats, + }| GroupedSstableInfo { + compaction_group_id, + sst: Some(sst_info), + table_stats_map: to_prost_table_stats_map(table_stats), + }, + ) + .collect_vec(), + worker_id: self.actor_manager.env.worker_id(), + table_watermarks: table_watermarks + .into_iter() + .map(|(key, value)| (key.table_id, value.to_protobuf())) + .collect(), + }, + ), + ), + }; + + self.send_response(result); + Ok(()) } /// Register sender for source actors, used to send barriers. @@ -350,7 +446,6 @@ impl LocalBarrierWorker { ) -> StreamResult<()> { #[cfg(not(test))] { - use itertools::Itertools; // The barrier might be outdated and been injected after recovery in some certain extreme // scenarios. So some newly creating actors in the barrier are possibly not rebuilt during // recovery. Check it here and return an error here if some actors are not found to @@ -361,11 +456,10 @@ impl LocalBarrierWorker { .filter(|id| !self.actor_manager_state.handles.contains_key(id)) .collect_vec(); if !missing_actor_ids.is_empty() { - tracing::warn!( + panic!( "to collect actors not found, they should be cleaned when recovering: {:?}", missing_actor_ids ); - return Err(anyhow!("to collect actors not found: {:?}", to_collect).into()); } } @@ -385,15 +479,6 @@ impl LocalBarrierWorker { // There must be some actors to collect from. assert!(!to_collect.is_empty()); - for actor_id in &to_collect { - if let Some(e) = self.failure_actors.get(actor_id) { - // The failure actors could exit before the barrier is issued, while their - // up-downstream actors could be stuck somehow. Return error directly to trigger the - // recovery. - return Err(e.clone()); - } - } - self.state.transform_to_issued(barrier, to_collect); for actor_id in to_send { @@ -434,36 +519,6 @@ impl LocalBarrierWorker { Ok(()) } - /// Use `prev_epoch` to remove collect rx and return rx. - fn await_epoch_completed( - &mut self, - prev_epoch: u64, - result_sender: oneshot::Sender>, - ) { - match self.state.pop_completed_epoch(prev_epoch) { - Err(e) => { - let _ = result_sender.send(Err(e)); - } - Ok(Some(result)) => { - if result_sender.send(result).is_err() { - warn!(prev_epoch, "failed to send completed epoch result"); - } - } - Ok(None) => { - if let Some(prev_sender) = - self.epoch_result_sender.insert(prev_epoch, result_sender) - { - warn!(?prev_epoch, "duplicate await_collect_barrier on epoch"); - let _ = prev_sender.send(Err(anyhow!( - "duplicate await_collect_barrier on epoch {}", - prev_epoch - ) - .into())); - } - } - } - } - /// Reset all internal states. pub(super) fn reset_state(&mut self) { *self = Self::new(self.actor_manager.clone()); @@ -479,20 +534,7 @@ impl LocalBarrierWorker { /// will notice actor's exit while collecting. fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { let err = err.into_unexpected_exit(actor_id); - if let Some(prev_err) = self.failure_actors.insert(actor_id, err.clone()) { - warn!( - actor_id, - prev_err = %prev_err.as_report(), - "actor error overwritten" - ); - } - for fail_epoch in self.state.epochs_await_on_actor(actor_id) { - if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) { - if result_sender.send(Err(err.clone())).is_err() { - warn!(fail_epoch, actor_id, err = %err.as_report(), "fail to notify actor failure"); - } - } - } + self.inspect_result(Err(err)); } } @@ -568,29 +610,31 @@ impl LocalBarrierManager { /// barrier is finished, in managed mode. pub async fn send_barrier( &self, - barrier: Barrier, - actor_ids_to_send: impl IntoIterator, - actor_ids_to_collect: impl IntoIterator, + _barrier: Barrier, + _actor_ids_to_send: impl IntoIterator, + _actor_ids_to_collect: impl IntoIterator, ) -> StreamResult<()> { - self.send_and_await(move |result_sender| LocalBarrierEvent::InjectBarrier { - barrier, - actor_ids_to_send: actor_ids_to_send.into_iter().collect(), - actor_ids_to_collect: actor_ids_to_collect.into_iter().collect(), - result_sender, - }) - .await? + // self.send_and_await(move |result_sender| LocalBarrierEvent::InjectBarrier { + // barrier, + // actor_ids_to_send: actor_ids_to_send.into_iter().collect(), + // actor_ids_to_collect: actor_ids_to_collect.into_iter().collect(), + // result_sender, + // }) + // .await? + todo!() } /// Use `prev_epoch` to remove collect rx and return rx. pub async fn await_epoch_completed( &self, - prev_epoch: u64, + _prev_epoch: u64, ) -> StreamResult { - self.send_and_await(|result_sender| LocalBarrierEvent::AwaitEpochCompleted { - epoch: prev_epoch, - result_sender, - }) - .await? + // self.send_and_await(|result_sender| LocalBarrierEvent::AwaitEpochCompleted { + // epoch: prev_epoch, + // result_sender, + // }) + // .await? + todo!() } /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 3dce5dd35665..52a23a5fbc9c 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -248,35 +248,6 @@ impl ManagedBarrierState { } } - /// Returns an iterator on epochs that is awaiting on `actor_id`. - /// This is used on notifying actor failure. On actor failure, the - /// barrier manager can call this method to iterate on epochs that - /// waits on the failed actor and then notify failure on the result - /// sender of the epoch. - pub(crate) fn epochs_await_on_actor( - &self, - actor_id: ActorId, - ) -> impl Iterator + '_ { - self.epoch_barrier_state_map - .iter() - .filter_map(move |(prev_epoch, barrier_state)| { - #[allow(clippy::single_match)] - match barrier_state.inner { - ManagedBarrierStateInner::Issued { - ref remaining_actors, - .. - } => { - if remaining_actors.contains(&actor_id) { - Some(*prev_epoch) - } else { - None - } - } - _ => None, - } - }) - } - /// Collect a `barrier` from the actor with `actor_id`. pub(super) fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) { tracing::debug!( diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 5a2bde99da49..e6e44c34a734 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_recursion::async_recursion; +use futures::stream::BoxStream; use futures::FutureExt; use itertools::Itertools; use parking_lot::Mutex; @@ -33,14 +34,19 @@ use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{StreamActor, StreamNode}; +use risingwave_pb::stream_service::{ + StreamingControlStreamRequest, StreamingControlStreamResponse, +}; use risingwave_storage::monitor::HummockTraceFutureExt; use risingwave_storage::{dispatch_state_store, StateStore}; use rw_futures_util::AttachedFuture; use thiserror_ext::AsReport; +use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; use tokio::task::JoinHandle; +use tonic::Status; -use super::{unique_executor_id, unique_operator_id, BarrierCompleteResult}; +use super::{unique_executor_id, unique_operator_id}; use crate::error::StreamResult; use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; @@ -195,25 +201,16 @@ impl LocalStreamManager { } } - /// Broadcast a barrier to all senders. Save a receiver in barrier manager - pub async fn send_barrier( + pub fn handle_new_control_stream( &self, - barrier: Barrier, - actor_ids_to_send: impl IntoIterator, - actor_ids_to_collect: impl IntoIterator, - ) -> StreamResult<()> { - self.local_barrier_manager - .send_barrier(barrier, actor_ids_to_send, actor_ids_to_collect) - .await?; - Ok(()) - } - - /// Use `epoch` to find collect rx. And wait for all actor to be collected before - /// returning. - pub async fn collect_barrier(&self, epoch: u64) -> StreamResult { + sender: UnboundedSender>, + request_stream: BoxStream<'static, Result>, + ) { self.local_barrier_manager - .await_epoch_completed(epoch) - .await + .send_event(LocalBarrierEvent::NewControlStream { + sender, + request_stream, + }) } pub fn context(&self) -> &Arc { @@ -230,14 +227,6 @@ impl LocalStreamManager { .await } - /// Force stop all actors on this worker, and then drop their resources. - pub async fn reset(&self) { - self.local_barrier_manager - .send_and_await(LocalBarrierEvent::Reset) - .await - .expect("should receive reset") - } - pub async fn update_actors(&self, actors: Vec) -> StreamResult<()> { self.local_barrier_manager .send_and_await(|result_sender| LocalBarrierEvent::UpdateActors {