diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 663b7aa5bae5..08f0ff1e7684 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -58,6 +58,7 @@ message InjectBarrierRequest { repeated uint32 actor_ids_to_send = 3; repeated uint32 actor_ids_to_collect = 4; repeated uint32 table_ids_to_sync = 5; + uint32 partial_graph_id = 6; } message BarrierCompleteResponse { @@ -80,6 +81,9 @@ message BarrierCompleteResponse { uint32 worker_id = 5; map table_watermarks = 6; repeated hummock.SstableInfo old_value_sstables = 7; + uint32 partial_graph_id = 8; + // prev_epoch of barrier + uint64 epoch = 9; } // Before starting streaming, the leader node broadcast the actor-host table to needed workers. @@ -100,9 +104,14 @@ message StreamingControlStreamRequest { uint64 prev_epoch = 2; } + message RemovePartialGraphRequest { + repeated uint32 partial_graph_ids = 1; + } + oneof request { InitRequest init = 1; InjectBarrierRequest inject_barrier = 2; + RemovePartialGraphRequest remove_partial_graph = 3; } } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 2c65f467b4a9..c67d018a1033 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -41,12 +41,10 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest; use thiserror_ext::AsReport; use tracing::warn; -use super::info::{ - CommandActorChanges, CommandFragmentChanges, CommandNewFragmentInfo, InflightActorInfo, -}; +use super::info::{CommandActorChanges, CommandFragmentChanges, InflightActorInfo}; use super::trace::TracedEpoch; use crate::barrier::GlobalBarrierManagerContext; -use crate::manager::{DdlType, MetadataManager, StreamingJob, WorkerId}; +use crate::manager::{DdlType, InflightFragmentInfo, MetadataManager, StreamingJob, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; use crate::MetaResult; @@ -109,8 +107,8 @@ impl ReplaceTablePlan { fn actor_changes(&self) -> CommandActorChanges { let mut fragment_changes = HashMap::new(); for fragment in self.new_table_fragments.fragments.values() { - let fragment_change = CommandFragmentChanges::NewFragment(CommandNewFragmentInfo { - new_actors: fragment + let fragment_change = CommandFragmentChanges::NewFragment(InflightFragmentInfo { + actors: fragment .actors .iter() .map(|actor| { @@ -124,7 +122,7 @@ impl ReplaceTablePlan { ) }) .collect(), - table_ids: fragment + state_table_ids: fragment .state_table_ids .iter() .map(|table_id| TableId::new(*table_id)) @@ -159,12 +157,12 @@ pub struct CreateStreamingJobCommandInfo { } impl CreateStreamingJobCommandInfo { - fn new_fragment_info(&self) -> impl Iterator + '_ { + fn new_fragment_info(&self) -> impl Iterator + '_ { self.table_fragments.fragments.values().map(|fragment| { ( fragment.fragment_id, - CommandNewFragmentInfo { - new_actors: fragment + InflightFragmentInfo { + actors: fragment .actors .iter() .map(|actor| { @@ -178,7 +176,7 @@ impl CreateStreamingJobCommandInfo { ) }) .collect(), - table_ids: fragment + state_table_ids: fragment .state_table_ids .iter() .map(|table_id| TableId::new(*table_id)) diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 645d15e83a7e..44194c7f9eb3 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -22,16 +22,9 @@ use crate::barrier::Command; use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, InflightFragmentInfo, WorkerId}; use crate::model::{ActorId, FragmentId}; -#[derive(Debug, Clone)] -pub(crate) struct CommandNewFragmentInfo { - pub new_actors: HashMap, - pub table_ids: HashSet, - pub is_injectable: bool, -} - #[derive(Debug, Clone)] pub(crate) enum CommandFragmentChanges { - NewFragment(CommandNewFragmentInfo), + NewFragment(InflightFragmentInfo), Reschedule { new_actors: HashMap, to_remove: HashSet, @@ -65,9 +58,6 @@ pub struct InflightActorInfo { /// `node_id` => actors pub actor_map: HashMap>, - /// `node_id` => barrier inject actors - pub actor_map_to_send: HashMap>, - /// `actor_id` => `WorkerId` pub actor_location_map: HashMap, @@ -96,20 +86,6 @@ impl InflightActorInfo { map }; - let actor_map_to_send = { - let mut map: HashMap<_, HashSet<_>> = HashMap::new(); - for info in actor_infos - .fragment_infos - .values() - .filter(|info| info.is_injectable) - { - for (actor_id, worker_id) in &info.actors { - map.entry(*worker_id).or_default().insert(*actor_id); - } - } - map - }; - let actor_location_map = actor_infos .fragment_infos .values() @@ -124,7 +100,6 @@ impl InflightActorInfo { Self { node_map, actor_map, - actor_map_to_send, actor_location_map, mv_depended_subscriptions, fragment_infos: actor_infos.fragment_infos, @@ -167,28 +142,11 @@ impl InflightActorInfo { let mut to_add = HashMap::new(); for (fragment_id, change) in fragment_changes { match change { - CommandFragmentChanges::NewFragment(CommandNewFragmentInfo { - new_actors, - table_ids, - is_injectable, - .. - }) => { - for (actor_id, node_id) in &new_actors { - assert!(to_add - .insert(*actor_id, (*node_id, is_injectable)) - .is_none()); + CommandFragmentChanges::NewFragment(info) => { + for (actor_id, node_id) in &info.actors { + assert!(to_add.insert(*actor_id, *node_id).is_none()); } - assert!(self - .fragment_infos - .insert( - fragment_id, - InflightFragmentInfo { - actors: new_actors, - state_table_ids: table_ids, - is_injectable, - } - ) - .is_none()); + assert!(self.fragment_infos.insert(fragment_id, info).is_none()); } CommandFragmentChanges::Reschedule { new_actors, .. } => { let info = self @@ -197,30 +155,19 @@ impl InflightActorInfo { .expect("should exist"); let actors = &mut info.actors; for (actor_id, node_id) in new_actors { - assert!(to_add - .insert(actor_id, (node_id, info.is_injectable)) - .is_none()); + assert!(to_add.insert(actor_id, node_id).is_none()); assert!(actors.insert(actor_id, node_id).is_none()); } } CommandFragmentChanges::RemoveFragment => {} } } - for (actor_id, (node_id, is_injectable)) in to_add { + for (actor_id, node_id) in to_add { assert!(self.node_map.contains_key(&node_id)); assert!( self.actor_map.entry(node_id).or_default().insert(actor_id), "duplicate actor in command changes" ); - if is_injectable { - assert!( - self.actor_map_to_send - .entry(node_id) - .or_default() - .insert(actor_id), - "duplicate actor in command changes" - ); - } assert!( self.actor_location_map.insert(actor_id, node_id).is_none(), "duplicate actor in command changes" @@ -280,13 +227,8 @@ impl InflightActorInfo { .expect("actor not found"); let actor_ids = self.actor_map.get_mut(&node_id).expect("node not found"); assert!(actor_ids.remove(&actor_id), "actor not found"); - self.actor_map_to_send - .get_mut(&node_id) - .map(|actor_ids| actor_ids.remove(&actor_id)); } self.actor_map.retain(|_, actor_ids| !actor_ids.is_empty()); - self.actor_map_to_send - .retain(|_, actor_ids| !actor_ids.is_empty()); } if let Command::DropSubscription { subscription_id, @@ -310,27 +252,49 @@ impl InflightActorInfo { } /// Returns actor list to collect in the target worker node. - pub fn actor_ids_to_collect(&self, node_id: &WorkerId) -> impl Iterator { - self.actor_map - .get(node_id) - .cloned() - .unwrap_or_default() - .into_iter() + pub fn actor_ids_to_collect( + fragment_infos: &HashMap, + node_id: WorkerId, + ) -> impl Iterator + '_ { + fragment_infos.values().flat_map(move |info| { + info.actors + .iter() + .filter_map(move |(actor_id, actor_node_id)| { + if *actor_node_id == node_id { + Some(*actor_id) + } else { + None + } + }) + }) } /// Returns actor list to send in the target worker node. - pub fn actor_ids_to_send(&self, node_id: &WorkerId) -> impl Iterator { - self.actor_map_to_send - .get(node_id) - .cloned() - .unwrap_or_default() - .into_iter() + pub fn actor_ids_to_send( + fragment_infos: &HashMap, + node_id: WorkerId, + ) -> impl Iterator + '_ { + fragment_infos + .values() + .filter(|info| info.is_injectable) + .flat_map(move |info| { + info.actors + .iter() + .filter_map(move |(actor_id, actor_node_id)| { + if *actor_node_id == node_id { + Some(*actor_id) + } else { + None + } + }) + }) } - pub fn existing_table_ids(&self) -> HashSet { - self.fragment_infos + pub fn existing_table_ids( + fragment_infos: &HashMap, + ) -> impl Iterator + '_ { + fragment_infos .values() .flat_map(|info| info.state_table_ids.iter().cloned()) - .collect() } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index ad6d5f675415..f58cb3d39503 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -54,7 +54,7 @@ use self::notifier::Notifier; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; -use crate::barrier::rpc::ControlStreamManager; +use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; use crate::barrier::state::BarrierManagerState; use crate::error::MetaErrorInner; use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; @@ -277,12 +277,10 @@ impl CheckpointControl { /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. - fn barrier_collected( - &mut self, - worker_id: WorkerId, - prev_epoch: u64, - resp: BarrierCompleteResponse, - ) { + fn barrier_collected(&mut self, resp: BarrierCompleteResponse) { + let worker_id = resp.worker_id; + let prev_epoch = resp.epoch; + assert_eq!(resp.partial_graph_id, u32::MAX); if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) { assert!(node.state.node_to_collect.remove(&worker_id)); node.state.resps.push(resp); @@ -391,10 +389,20 @@ impl CheckpointControl { node.enqueue_time.observe_duration(); } } + + /// Return the earliest command waiting on the `worker_id`. + fn command_wait_collect_from_worker(&self, worker_id: WorkerId) -> Option<&CommandContext> { + for epoch_node in self.command_ctx_queue.values() { + if epoch_node.state.node_to_collect.contains(&worker_id) { + return Some(&epoch_node.command_ctx); + } + } + None + } } /// The state and message of this barrier, a node for concurrent checkpoint. -pub struct EpochNode { +struct EpochNode { /// Timer for recording barrier latency, taken after `complete_barriers`. enqueue_time: HistogramTimer, @@ -684,14 +692,25 @@ impl GlobalBarrierManager { _ => {} } } - resp_result = self.control_stream_manager.next_complete_barrier_response() => { + (worker_id, resp_result) = self.control_stream_manager.next_complete_barrier_response() => { match resp_result { - Ok((worker_id, prev_epoch, resp)) => { - self.checkpoint_control.barrier_collected(worker_id, prev_epoch, resp); + Ok(resp) => { + self.checkpoint_control.barrier_collected(resp); } Err(e) => { - self.failure_recovery(e).await; + let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id); + if failed_command.is_some() + || self.state.inflight_actor_infos.actor_map.contains_key(&worker_id) { + let errors = self.control_stream_manager.collect_errors(worker_id, e).await; + let err = merge_node_rpc_errors("get error from control stream", errors); + if let Some(failed_command) = failed_command { + self.context.report_collect_failure(failed_command, &err); + } + self.failure_recovery(err).await; + } else { + warn!(e = ?e.as_report(), worker_id, "no barrier to collect from worker, ignore err"); + } } } } @@ -765,8 +784,9 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); let node_to_collect = match self.control_stream_manager.inject_barrier( - command_ctx.clone(), - self.state.inflight_actor_infos.existing_table_ids(), + &command_ctx, + &command_ctx.info.fragment_infos, + Some(&self.state.inflight_actor_infos.fragment_infos), ) { Ok(node_to_collect) => node_to_collect, Err(err) => { @@ -1190,6 +1210,7 @@ fn collect_commit_epoch_info( table_watermarks.push(resp.table_watermarks); old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into())); } + let new_table_fragment_info = if let Command::CreateStreamingJob { info, .. } = &command_ctx.command { let table_fragments = &info.table_fragments; @@ -1244,7 +1265,10 @@ fn collect_commit_epoch_info( sst_to_worker, new_table_fragment_info, table_new_change_log, - BTreeMap::from_iter([(epoch, command_ctx.info.existing_table_ids())]), + BTreeMap::from_iter([( + epoch, + InflightActorInfo::existing_table_ids(&command_ctx.info.fragment_infos).collect(), + )]), epoch, ) } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 1ee367a78022..3c0c3440b0a6 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -335,7 +335,9 @@ impl GlobalBarrierManager { let info = info; self.context - .purge_state_table_from_hummock(&info.existing_table_ids()) + .purge_state_table_from_hummock( + &InflightActorInfo::existing_table_ids(&info.fragment_infos).collect(), + ) .await .context("purge state table from hummock")?; @@ -385,13 +387,17 @@ impl GlobalBarrierManager { tracing::Span::current(), // recovery span )); - let mut node_to_collect = control_stream_manager - .inject_barrier(command_ctx.clone(), info.existing_table_ids())?; + let mut node_to_collect = control_stream_manager.inject_barrier( + &command_ctx, + &info.fragment_infos, + Some(&info.fragment_infos), + )?; while !node_to_collect.is_empty() { - let (worker_id, prev_epoch, _) = control_stream_manager + let (worker_id, result) = control_stream_manager .next_complete_barrier_response() - .await?; - assert_eq!(prev_epoch, command_ctx.prev_epoch.value().0); + .await; + let resp = result?; + assert_eq!(resp.epoch, command_ctx.prev_epoch.value().0); assert!(node_to_collect.remove(&worker_id)); } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index f3f56db1a9a8..ae12c439e0a9 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; use std::error::Error; use std::future::Future; -use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; @@ -24,7 +23,6 @@ use futures::future::try_join_all; use futures::stream::{BoxStream, FuturesUnordered}; use futures::{pin_mut, FutureExt, StreamExt}; use itertools::Itertools; -use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorId; use risingwave_common::util::tracing::TracingContext; use risingwave_pb::common::{ActorInfo, WorkerNode}; @@ -47,7 +45,9 @@ use uuid::Uuid; use super::command::CommandContext; use super::GlobalBarrierManagerContext; -use crate::manager::{MetaSrvEnv, WorkerId}; +use crate::barrier::info::InflightActorInfo; +use crate::manager::{InflightFragmentInfo, MetaSrvEnv, WorkerId}; +use crate::model::FragmentId; use crate::{MetaError, MetaResult}; const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3); @@ -55,8 +55,6 @@ const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3); struct ControlStreamNode { worker: WorkerNode, sender: UnboundedSender, - // earlier epoch at the front - inflight_barriers: VecDeque>, } fn into_future( @@ -192,62 +190,39 @@ impl ControlStreamManager { pub(super) async fn next_complete_barrier_response( &mut self, - ) -> MetaResult<(WorkerId, u64, BarrierCompleteResponse)> { + ) -> (WorkerId, MetaResult) { use streaming_control_stream_response::Response; - loop { + { let (worker_id, result) = pending_on_none(self.next_response()).await; - match result { + let result = match result { Ok(resp) => match resp.response.unwrap() { Response::CompleteBarrier(resp) => { - let node = self - .nodes - .get_mut(&worker_id) - .expect("should exist when get collect resp"); - let command = node - .inflight_barriers - .pop_front() - .expect("should exist when get collect resp"); - break Ok((worker_id, command.prev_epoch.value().0, resp)); + assert_eq!(worker_id, resp.worker_id); + Ok(resp) } Response::Shutdown(_) => { - let _ = self - .nodes - .remove(&worker_id) - .expect("should exist when get shutdown resp"); - // TODO: if there's no actor running on the node, we can ignore and not trigger recovery. - break Err(anyhow!("worker node {worker_id} is shutting down").into()); + Err(anyhow!("worker node {worker_id} is shutting down").into()) } Response::Init(_) => { // This arm should be unreachable. - break Err(anyhow!("get unexpected init response").into()); + Err(anyhow!("get unexpected init response").into()) } }, - Err(err) => { - let node = self - .nodes - .remove(&worker_id) - .expect("should exist when get collect resp"); - // Note: No need to use `?` as the backtrace is from meta and not useful. - warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); - - if let Some(command) = node.inflight_barriers.into_iter().next() { - // FIXME: this future can be cancelled during collection, so the error collection - // might not work as expected. - let errors = self.collect_errors(node.worker.id, err).await; - let err = merge_node_rpc_errors("get error from control stream", errors); - self.context.report_collect_failure(&command, &err); - break Err(err); - } else { - // for node with no inflight barrier, simply ignore the error - info!(node = ?node.worker, error = %err.as_report(), "no inflight barrier in the node, ignore error"); - } - } + Err(err) => Err(err), + }; + if let Err(err) = &result { + let node = self + .nodes + .remove(&worker_id) + .expect("should exist when get shutdown resp"); + warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); } + (worker_id, result) } } - async fn collect_errors( + pub(super) async fn collect_errors( &mut self, worker_id: WorkerId, first_err: MetaError, @@ -273,8 +248,9 @@ impl ControlStreamManager { /// Send inject-barrier-rpc to stream service and wait for its response before returns. pub(super) fn inject_barrier( &mut self, - command_context: Arc, - table_ids_to_sync: HashSet, + command_context: &CommandContext, + pre_applied_fragment_infos: &HashMap, + applied_fragment_infos: Option<&HashMap>, ) -> MetaResult> { fail_point!("inject_barrier_err", |_| risingwave_common::bail!( "inject_barrier_err" @@ -286,12 +262,24 @@ impl ControlStreamManager { info.node_map .iter() .map(|(node_id, worker_node)| { - let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec(); - let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec(); + let actor_ids_to_send: Vec<_> = + InflightActorInfo::actor_ids_to_send(pre_applied_fragment_infos, *node_id) + .collect(); + let actor_ids_to_collect: Vec<_> = + InflightActorInfo::actor_ids_to_collect(pre_applied_fragment_infos, *node_id) + .collect(); if actor_ids_to_collect.is_empty() { // No need to send or collect barrier for this node. assert!(actor_ids_to_send.is_empty()); } + let table_ids_to_sync = if let Some(fragment_infos) = applied_fragment_infos { + InflightActorInfo::existing_table_ids(fragment_infos) + .map(|table_id| table_id.table_id) + .collect() + } else { + Default::default() + }; + { let Some(node) = self.nodes.get_mut(node_id) else { if actor_ids_to_collect.is_empty() { @@ -326,10 +314,8 @@ impl ControlStreamManager { barrier: Some(barrier), actor_ids_to_send, actor_ids_to_collect, - table_ids_to_sync: table_ids_to_sync - .iter() - .map(|table_id| table_id.table_id) - .collect(), + table_ids_to_sync, + partial_graph_id: u32::MAX, }, ), ), @@ -342,7 +328,6 @@ impl ControlStreamManager { )) })?; - node.inflight_barriers.push_back(command_context.clone()); node_need_collect.insert(*node_id); Result::<_, MetaError>::Ok(()) } @@ -385,14 +370,17 @@ impl GlobalBarrierManagerContext { ControlStreamNode { worker: node.clone(), sender: handle.request_sender, - inflight_barriers: VecDeque::new(), }, handle.response_stream, )) } /// Send barrier-complete-rpc and wait for responses from all CNs - fn report_collect_failure(&self, command_context: &CommandContext, error: &MetaError) { + pub(super) fn report_collect_failure( + &self, + command_context: &CommandContext, + error: &MetaError, + ) { // Record failure in event log. use risingwave_pb::meta::event_log; let event = event_log::EventCollectBarrierFail { @@ -562,7 +550,7 @@ where Err(results_err) } -fn merge_node_rpc_errors( +pub(super) fn merge_node_rpc_errors( message: &str, errors: impl IntoIterator, ) -> MetaError { diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 2ff0fc1ac971..eab1da29f4d7 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -173,7 +173,7 @@ pub struct FragmentManager { core: RwLock, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct InflightFragmentInfo { pub actors: HashMap, pub state_table_ids: HashSet, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 6219f85bc6bd..42745a80423d 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -841,7 +841,6 @@ mod tests { #[cfg(feature = "failpoints")] use tokio::sync::Notify; use tokio::task::JoinHandle; - use tokio::time::sleep; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{Request, Response, Status, Streaming}; @@ -851,7 +850,7 @@ mod tests { use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ CatalogManager, CatalogManagerRef, ClusterManager, FragmentManager, FragmentManagerRef, - RelationIdEnum, StreamingClusterInfo, + RelationIdEnum, StreamingClusterInfo, WorkerId, }; use crate::rpc::ddl_controller::DropMode; use crate::rpc::metrics::MetaMetrics; @@ -865,6 +864,7 @@ mod tests { } struct FakeStreamService { + worker_id: WorkerId, inner: Arc, } @@ -932,6 +932,7 @@ mod tests { let (tx, rx) = unbounded_channel(); let mut request_stream = request.into_inner(); let inner = self.inner.clone(); + let worker_id = self.worker_id; let _join_handle = spawn(async move { while let Ok(Some(request)) = request_stream.try_next().await { match request.request.unwrap() { @@ -945,15 +946,21 @@ mod tests { )), })); } - streaming_control_stream_request::Request::InjectBarrier(_) => { + streaming_control_stream_request::Request::InjectBarrier(req) => { let _ = tx.send(Ok(StreamingControlStreamResponse { response: Some( streaming_control_stream_response::Response::CompleteBarrier( - BarrierCompleteResponse::default(), + BarrierCompleteResponse { + epoch: req.barrier.unwrap().epoch.unwrap().prev, + worker_id, + partial_graph_id: req.partial_graph_id, + ..BarrierCompleteResponse::default() + }, ), ), })); } + streaming_control_stream_request::Request::RemovePartialGraph(..) => {} } } }); @@ -985,21 +992,7 @@ mod tests { actor_infos: Mutex::new(HashMap::new()), }); - let fake_service = FakeStreamService { - inner: state.clone(), - }; - let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); - let stream_srv = StreamServiceServer::new(fake_service); - let join_handle = tokio::spawn(async move { - tonic::transport::Server::builder() - .add_service(stream_srv) - .serve_with_shutdown(addr, async move { shutdown_rx.await.unwrap() }) - .await - .unwrap(); - }); - - sleep(Duration::from_secs(1)).await; let env = MetaSrvEnv::for_test_opts(MetaOpts::test(enable_recovery)).await; let system_params = env.system_params_reader().await; @@ -1011,7 +1004,7 @@ mod tests { port: port as i32, }; let fake_parallelism = 4; - cluster_manager + let worker_node = cluster_manager .add_worker_node( WorkerType::ComputeNode, host.clone(), @@ -1026,6 +1019,19 @@ mod tests { .await?; cluster_manager.activate_worker_node(host).await?; + let fake_service = FakeStreamService { + worker_id: worker_node.id, + inner: state.clone(), + }; + let stream_srv = StreamServiceServer::new(fake_service); + let join_handle = tokio::spawn(async move { + tonic::transport::Server::builder() + .add_service(stream_srv) + .serve_with_shutdown(addr, async move { shutdown_rx.await.unwrap() }) + .await + .unwrap(); + }); + let catalog_manager = Arc::new(CatalogManager::new(env.clone()).await?); let fragment_manager = Arc::new(FragmentManager::new(env.clone()).await?); diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 884b29de5edd..8210a998974c 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -850,7 +850,26 @@ impl UnsyncData { // called `start_epoch` because we have stopped writing on it. if !table_data.unsync_epochs.contains_key(&next_epoch) { if let Some(stopped_next_epoch) = table_data.stopped_next_epoch { - assert_eq!(stopped_next_epoch, next_epoch); + if stopped_next_epoch != next_epoch { + let table_id = table_data.table_id.table_id; + let unsync_epochs = table_data.unsync_epochs.keys().collect_vec(); + if cfg!(debug_assertions) { + panic!( + "table_id {} stop epoch {} different to prev stop epoch {}. unsync epochs: {:?}, syncing epochs {:?}, max_synced_epoch {:?}", + table_id, next_epoch, stopped_next_epoch, unsync_epochs, table_data.syncing_epochs, table_data.max_synced_epoch + ); + } else { + warn!( + table_id, + stopped_next_epoch, + next_epoch, + ?unsync_epochs, + syncing_epochs = ?table_data.syncing_epochs, + max_synced_epoch = ?table_data.max_synced_epoch, + "different stop epoch" + ); + } + } } else { if let Some(max_epoch) = table_data.max_epoch() { assert_gt!(next_epoch, max_epoch); diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 7920e8dceee8..e3979496731b 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -462,7 +462,7 @@ where // May need to revisit it. // Need to check it after scale-in / scale-out. self.progress.update( - barrier.epoch.curr, + barrier.epoch, snapshot_read_epoch, total_snapshot_processed_rows, ); @@ -577,7 +577,7 @@ where } self.progress - .finish(barrier.epoch.curr, total_snapshot_processed_rows); + .finish(barrier.epoch, total_snapshot_processed_rows); yield msg; break; } diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 59686f4bb8fd..943059355f05 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -655,7 +655,7 @@ impl CdcBackfillExecutor { // mark progress as finished if let Some(progress) = self.progress.as_mut() { - progress.finish(barrier.epoch.curr, total_snapshot_row_count); + progress.finish(barrier.epoch, total_snapshot_row_count); } yield msg; // break after the state have been saved diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index e368086a9773..a65c5f1c4487 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -417,7 +417,7 @@ where snapshot_read_epoch = barrier.epoch.prev; self.progress.update( - barrier.epoch.curr, + barrier.epoch, snapshot_read_epoch, total_snapshot_processed_rows, ); @@ -540,7 +540,7 @@ where // and backfill which just finished, we need to update mview tracker, // it does not persist this information. self.progress - .finish(barrier.epoch.curr, total_snapshot_processed_rows); + .finish(barrier.epoch, total_snapshot_processed_rows); tracing::trace!( epoch = ?barrier.epoch, "Updated CreateMaterializedTracker" diff --git a/src/stream/src/executor/chain.rs b/src/stream/src/executor/chain.rs index 96b9422a97b2..8a4131bb2635 100644 --- a/src/stream/src/executor/chain.rs +++ b/src/stream/src/executor/chain.rs @@ -64,7 +64,7 @@ impl ChainExecutor { // If the barrier is a conf change of creating this mview, and the snapshot is not to be // consumed, we can finish the progress immediately. if barrier.is_newly_added(self.actor_id) && self.upstream_only { - self.progress.finish(barrier.epoch.curr, 0); + self.progress.finish(barrier.epoch, 0); } // The first barrier message should be propagated. @@ -88,7 +88,7 @@ impl ChainExecutor { for msg in upstream { let msg = msg?; if to_consume_snapshot && let Message::Barrier(barrier) = &msg { - self.progress.finish(barrier.epoch.curr, 0); + self.progress.finish(barrier.epoch, 0); } yield msg; } diff --git a/src/stream/src/executor/rearranged_chain.rs b/src/stream/src/executor/rearranged_chain.rs index 19ebfeabc298..37717d270d90 100644 --- a/src/stream/src/executor/rearranged_chain.rs +++ b/src/stream/src/executor/rearranged_chain.rs @@ -155,7 +155,7 @@ impl RearrangedChainExecutor { // Update the progress since we've consumed all chunks before this // phantom. self.progress.update( - last_rearranged_epoch.curr, + last_rearranged_epoch, barrier.epoch.curr, processed_rows, ); @@ -201,7 +201,7 @@ impl RearrangedChainExecutor { continue; }; if let Some(barrier) = msg.as_barrier() { - self.progress.finish(barrier.epoch.curr, processed_rows); + self.progress.finish(barrier.epoch, processed_rows); } yield msg; } @@ -214,7 +214,7 @@ impl RearrangedChainExecutor { for msg in upstream { let msg: Message = msg?; if let Some(barrier) = msg.as_barrier() { - self.progress.finish(barrier.epoch.curr, processed_rows); + self.progress.finish(barrier.epoch, processed_rows); } yield msg; } diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index dfa5579d66b4..f2e737701cd2 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -121,7 +121,7 @@ impl ValuesExecutor { while let Some(barrier) = barrier_receiver.recv().await { if emit { - progress.finish(barrier.epoch.curr, 0); + progress.finish(barrier.epoch, 0); } yield Message::Barrier(barrier); } diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index f3ad76789ff7..94dbc0764497 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -39,7 +39,8 @@ use tonic::{Code, Status}; use self::managed_state::ManagedBarrierState; use crate::error::{IntoUnexpectedExit, StreamError, StreamResult}; use crate::task::{ - ActorHandle, ActorId, AtomicU64Ref, SharedContext, StreamEnvironment, UpDownActorIds, + ActorHandle, ActorId, AtomicU64Ref, PartialGraphId, SharedContext, StreamEnvironment, + UpDownActorIds, }; mod managed_state; @@ -225,7 +226,7 @@ pub(super) enum LocalBarrierEvent { epoch: EpochPair, }, ReportCreateProgress { - current_epoch: u64, + epoch: EpochPair, actor: ActorId, state: BackfillState, }, @@ -462,8 +463,8 @@ impl LocalBarrierWorker { (sender, create_actors_result) = self.actor_manager_state.next_created_actors() => { self.handle_actor_created(sender, create_actors_result); } - completed_epoch = self.state.next_completed_epoch() => { - let result = self.on_epoch_completed(completed_epoch); + (partial_graph_id, completed_epoch) = self.state.next_completed_epoch() => { + let result = self.on_epoch_completed(partial_graph_id, completed_epoch); if let Err(err) = result { self.notify_other_failure(err, "failed to complete epoch").await; } @@ -544,9 +545,16 @@ impl LocalBarrierWorker { .into_iter() .map(TableId::new) .collect(), + PartialGraphId::new(req.partial_graph_id), )?; Ok(()) } + Request::RemovePartialGraph(req) => { + self.remove_partial_graphs( + req.partial_graph_ids.into_iter().map(PartialGraphId::new), + ); + Ok(()) + } Request::Init(_) => { unreachable!() } @@ -559,11 +567,11 @@ impl LocalBarrierWorker { self.collect(actor_id, epoch) } LocalBarrierEvent::ReportCreateProgress { - current_epoch, + epoch, actor, state, } => { - self.update_create_mview_progress(current_epoch, actor, state); + self.update_create_mview_progress(epoch, actor, state); } LocalBarrierEvent::SubscribeBarrierMutation { actor_id, @@ -633,9 +641,17 @@ impl LocalBarrierWorker { // event handler impl LocalBarrierWorker { - fn on_epoch_completed(&mut self, epoch: u64) -> StreamResult<()> { - let result = self + fn on_epoch_completed( + &mut self, + partial_graph_id: PartialGraphId, + epoch: u64, + ) -> StreamResult<()> { + let state = self .state + .graph_states + .get_mut(&partial_graph_id) + .expect("should exist"); + let result = state .pop_completed_epoch(epoch) .expect("should exist") .expect("should have completed")?; @@ -660,6 +676,8 @@ impl LocalBarrierWorker { streaming_control_stream_response::Response::CompleteBarrier( BarrierCompleteResponse { request_id: "todo".to_string(), + partial_graph_id: partial_graph_id.into(), + epoch, status: None, create_mview_progress, synced_sstables: synced_sstables @@ -717,6 +735,7 @@ impl LocalBarrierWorker { to_send: HashSet, to_collect: HashSet, table_ids: HashSet, + partial_graph_id: PartialGraphId, ) -> StreamResult<()> { if !cfg!(test) { // The barrier might be outdated and been injected after recovery in some certain extreme @@ -764,7 +783,7 @@ impl LocalBarrierWorker { } self.state - .transform_to_issued(barrier, to_collect, table_ids); + .transform_to_issued(barrier, to_collect, table_ids, partial_graph_id); for actor_id in to_send { match self.barrier_senders.get(&actor_id) { @@ -804,6 +823,23 @@ impl LocalBarrierWorker { Ok(()) } + fn remove_partial_graphs(&mut self, partial_graph_ids: impl Iterator) { + for partial_graph_id in partial_graph_ids { + if let Some(graph) = self.state.graph_states.remove(&partial_graph_id) { + assert!( + graph.is_empty(), + "non empty graph to be removed: {}", + &graph + ); + } else { + warn!( + partial_graph_id = partial_graph_id.0, + "no partial graph to remove" + ); + } + } + } + /// Reset all internal states. pub(super) fn reset_state(&mut self) { *self = Self::new(self.actor_manager.clone()); @@ -821,14 +857,12 @@ impl LocalBarrierWorker { self.add_failure(actor_id, err.clone()); let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one - let failed_epochs = self.state.epochs_await_on_actor(actor_id).collect_vec(); - if !failed_epochs.is_empty() { + if let Some(actor_state) = self.state.actor_states.get(&actor_id) + && (!actor_state.inflight_barriers.is_empty() || actor_state.is_running()) + { self.control_stream_handle.reset_stream_with_err( anyhow!(root_err) - .context(format!( - "failed to collect barrier for epoch {:?}", - failed_epochs - )) + .context("failed to collect barrier") .to_status_unnamed(Code::Internal), ); } @@ -1181,6 +1215,7 @@ pub(crate) mod barrier_test_utils { actor_ids_to_send: actor_to_send.into_iter().collect(), actor_ids_to_collect: actor_to_collect.into_iter().collect(), table_ids_to_sync: vec![], + partial_graph_id: u32::MAX, }, )), })) diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index ae1a576fe7c4..3651dcc44d5e 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -15,9 +15,10 @@ use std::assert_matches::assert_matches; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::mem::replace; use std::sync::Arc; +use std::task::{ready, Context, Poll}; use anyhow::anyhow; use await_tree::InstrumentAwait; @@ -32,7 +33,6 @@ use risingwave_hummock_sdk::SyncResult; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use risingwave_storage::{dispatch_state_store, StateStore, StateStoreImpl}; -use rw_futures_util::pending_on_none; use thiserror_ext::AsReport; use tokio::sync::mpsc; @@ -41,7 +41,7 @@ use super::{BarrierCompleteResult, SubscribeMutationItem}; use crate::error::StreamResult; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; -use crate::task::{await_tree_key, ActorId}; +use crate::task::{await_tree_key, ActorId, PartialGraphId}; struct IssuedState { pub mutation: Option>, @@ -114,17 +114,24 @@ fn sync_epoch( .boxed() } -#[derive(Debug)] pub(super) struct ManagedBarrierStateDebugInfo<'a> { - epoch_barrier_state_map: &'a BTreeMap, - - create_mview_progress: &'a HashMap>, + graph_states: &'a HashMap, } impl Display for ManagedBarrierStateDebugInfo<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + for (partial_graph_id, graph_states) in self.graph_states { + writeln!(f, "--- Partial Group {}", partial_graph_id.0)?; + write!(f, "{}", graph_states)?; + } + Ok(()) + } +} + +impl Display for &'_ PartialGraphManagedBarrierState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut prev_epoch = 0u64; - for (epoch, barrier_state) in self.epoch_barrier_state_map { + for (epoch, barrier_state) in &self.epoch_barrier_state_map { write!(f, "> Epoch {}: ", epoch)?; match &barrier_state.inner { ManagedBarrierStateInner::Issued(state) => { @@ -172,7 +179,7 @@ impl Display for ManagedBarrierStateDebugInfo<'_> { if !self.create_mview_progress.is_empty() { writeln!(f, "Create MView Progress:")?; - for (epoch, progress) in self.create_mview_progress { + for (epoch, progress) in &self.create_mview_progress { write!(f, "> Epoch {}:", epoch)?; for (actor_id, state) in progress { write!(f, ">> Actor {}: {}, ", actor_id, state)?; @@ -184,26 +191,131 @@ impl Display for ManagedBarrierStateDebugInfo<'_> { } } -#[derive(Default)] -struct ActorMutationSubscribers { +enum InflightActorStatus { + /// The actor is just spawned and not issued any barrier yet + NotStarted, + /// The actor has been issued some barriers, and not issued any stop barrier yet + Running(u64), + /// The actor has been issued a stop barrier + Stopping(u64), +} + +impl InflightActorStatus { + pub(super) fn is_stopping(&self) -> bool { + matches!(self, InflightActorStatus::Stopping(_)) + } + + fn max_issued_epoch(&self) -> Option { + match self { + InflightActorStatus::NotStarted => None, + InflightActorStatus::Running(epoch) | InflightActorStatus::Stopping(epoch) => { + Some(*epoch) + } + } + } +} + +pub(crate) struct InflightActorState { pending_subscribers: BTreeMap>>, - started_subscribers: Vec>, + /// `prev_epoch` -> partial graph id + pub(super) inflight_barriers: BTreeMap, + /// `prev_epoch` -> (`mutation`, `curr_epoch`) + barrier_mutations: BTreeMap>, u64)>, + status: InflightActorStatus, } -impl ActorMutationSubscribers { - fn is_empty(&self) -> bool { - self.pending_subscribers.is_empty() && self.started_subscribers.is_empty() +impl InflightActorState { + pub(super) fn not_started() -> Self { + Self { + pending_subscribers: Default::default(), + inflight_barriers: BTreeMap::default(), + barrier_mutations: Default::default(), + status: InflightActorStatus::NotStarted, + } + } + + pub(super) fn issue_barrier( + &mut self, + partial_graph_id: PartialGraphId, + barrier: &Barrier, + is_stop: bool, + ) { + if let Some(max_issued_epoch) = self.status.max_issued_epoch() { + assert!(barrier.epoch.prev > max_issued_epoch); + } + + if let Some((first_epoch, _)) = self.pending_subscribers.first_key_value() { + assert!( + *first_epoch >= barrier.epoch.prev, + "barrier epoch {:?} skip subscribed epoch {}", + barrier.epoch, + first_epoch + ); + if *first_epoch == barrier.epoch.prev { + let (_, mut subscribers) = self.pending_subscribers.pop_first().expect("non empty"); + subscribers.retain(|tx| { + tx.send((barrier.epoch.prev, barrier.mutation.clone())) + .is_ok() + }); + if !is_stop && !subscribers.is_empty() { + self.pending_subscribers + .entry(barrier.epoch.curr) + .or_default() + .extend(subscribers); + } + } + } + + assert!(self + .inflight_barriers + .insert(barrier.epoch.prev, partial_graph_id) + .is_none()); + + assert!(self + .barrier_mutations + .insert( + barrier.epoch.prev, + (barrier.mutation.clone(), barrier.epoch.curr), + ) + .is_none()); + if is_stop { + assert!(self.pending_subscribers.is_empty()); + assert!( + !self.status.is_stopping(), + "stopped actor should not issue barrier" + ); + self.status = InflightActorStatus::Stopping(barrier.epoch.prev); + } else { + self.status = InflightActorStatus::Running(barrier.epoch.prev); + } + } + + pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) { + let (prev_epoch, prev_partial_graph_id) = + self.inflight_barriers.pop_first().expect("should exist"); + assert_eq!(prev_epoch, epoch.prev); + let (min_mutation_epoch, _) = self.barrier_mutations.pop_first().expect("should exist"); + assert_eq!(min_mutation_epoch, epoch.prev); + ( + prev_partial_graph_id, + self.inflight_barriers.is_empty() && self.status.is_stopping(), + ) + } + + pub(super) fn is_running(&self) -> bool { + matches!(&self.status, InflightActorStatus::Running(_)) } } -pub(super) struct ManagedBarrierState { +pub(super) struct PartialGraphManagedBarrierState { + /// This is a temporary workaround for the need to still calling `seal_epoch` for storage. + /// Can be removed after `seal_epoch` is deprecated in storage. + need_seal_epoch: bool, /// Record barrier state for each epoch of concurrent checkpoints. /// /// The key is `prev_epoch`, and the first value is `curr_epoch` epoch_barrier_state_map: BTreeMap, - mutation_subscribers: HashMap, - prev_barrier_table_ids: Option<(EpochPair, HashSet)>, /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints. @@ -220,16 +332,54 @@ pub(super) struct ManagedBarrierState { barrier_await_tree_reg: Option, } -impl ManagedBarrierState { +impl PartialGraphManagedBarrierState { + fn new( + need_seal_epoch: bool, + state_store: StateStoreImpl, + streaming_metrics: Arc, + barrier_await_tree_reg: Option, + ) -> Self { + Self { + need_seal_epoch, + epoch_barrier_state_map: Default::default(), + prev_barrier_table_ids: None, + create_mview_progress: Default::default(), + await_epoch_completed_futures: Default::default(), + state_store, + streaming_metrics, + barrier_await_tree_reg, + } + } + #[cfg(test)] pub(crate) fn for_test() -> Self { Self::new( + true, StateStoreImpl::for_test(), Arc::new(StreamingMetrics::unused()), None, ) } + pub(super) fn is_empty(&self) -> bool { + self.epoch_barrier_state_map.is_empty() + } +} + +pub(super) struct ManagedBarrierState { + pub(super) actor_states: HashMap, + + pub(super) graph_states: HashMap, + + pub(super) state_store: StateStoreImpl, + + pub(super) streaming_metrics: Arc, + + /// Manages the await-trees of all barriers. + barrier_await_tree_reg: Option, +} + +impl ManagedBarrierState { /// Create a barrier manager state. This will be called only once. pub(super) fn new( state_store: StateStoreImpl, @@ -237,90 +387,152 @@ impl ManagedBarrierState { barrier_await_tree_reg: Option, ) -> Self { Self { - epoch_barrier_state_map: BTreeMap::default(), - mutation_subscribers: Default::default(), - prev_barrier_table_ids: None, - create_mview_progress: Default::default(), + actor_states: Default::default(), + graph_states: Default::default(), state_store, streaming_metrics, - await_epoch_completed_futures: FuturesOrdered::new(), barrier_await_tree_reg, } } pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> { ManagedBarrierStateDebugInfo { - epoch_barrier_state_map: &self.epoch_barrier_state_map, - create_mview_progress: &self.create_mview_progress, + graph_states: &self.graph_states, } } +} +impl InflightActorState { pub(super) fn subscribe_actor_mutation( &mut self, - actor_id: ActorId, start_prev_epoch: u64, tx: mpsc::UnboundedSender, ) { - let subscribers = self.mutation_subscribers.entry(actor_id).or_default(); - if let Some(state) = self.epoch_barrier_state_map.get(&start_prev_epoch) { - match &state.inner { - ManagedBarrierStateInner::Issued(issued_state) => { - assert!(issued_state.remaining_actors.contains(&actor_id)); - for (prev_epoch, state) in - self.epoch_barrier_state_map.range(start_prev_epoch..) - { - match &state.inner { - ManagedBarrierStateInner::Issued(issued_state) => { - if issued_state.remaining_actors.contains(&actor_id) { - if tx - .send((*prev_epoch, issued_state.mutation.clone())) - .is_err() - { - // No more subscribe on the mutation. Simply return. - return; - } - } else { - // The barrier no more collect from such actor. End subscribe on mutation. - return; - } - } - state @ ManagedBarrierStateInner::AllCollected - | state @ ManagedBarrierStateInner::Completed(_) => { - unreachable!( - "should be Issued when having new subscriber, but current state: {:?}", - state - ) - } - } - } - subscribers.started_subscribers.push(tx); - } - state @ ManagedBarrierStateInner::AllCollected - | state @ ManagedBarrierStateInner::Completed(_) => { - unreachable!( - "should be Issued when having new subscriber, but current state: {:?}", - state - ) + if let Some((mutation, start_curr_epoch)) = self.barrier_mutations.get(&start_prev_epoch) { + if tx.send((start_prev_epoch, mutation.clone())).is_err() { + return; + } + let mut prev_epoch = *start_curr_epoch; + for (mutation_prev_epoch, (mutation, mutation_curr_epoch)) in + self.barrier_mutations.range(start_curr_epoch..) + { + assert_eq!(prev_epoch, *mutation_prev_epoch); + if tx.send((prev_epoch, mutation.clone())).is_err() { + // No more subscribe on the mutation. Simply return. + return; } + prev_epoch = *mutation_curr_epoch; + } + if !self.status.is_stopping() { + // Only add the subscribers when the actor is not stopped yet. + self.pending_subscribers + .entry(prev_epoch) + .or_default() + .push(tx); } } else { // Barrier has not issued yet. Store the pending tx - if let Some((last_epoch, _)) = self.epoch_barrier_state_map.last_key_value() { + if let Some(max_issued_epoch) = self.status.max_issued_epoch() { assert!( - *last_epoch < start_prev_epoch, + max_issued_epoch < start_prev_epoch, "later barrier {} has been issued, but skip the start epoch {:?}", - last_epoch, + max_issued_epoch, start_prev_epoch ); + } else { + assert!(!self.status.is_stopping(), "actor has been stopped and has not inflight barrier. unlikely to get further barrier"); } - subscribers - .pending_subscribers + self.pending_subscribers .entry(start_prev_epoch) .or_default() .push(tx); } } +} +impl ManagedBarrierState { + pub(super) fn subscribe_actor_mutation( + &mut self, + actor_id: ActorId, + start_prev_epoch: u64, + tx: mpsc::UnboundedSender, + ) { + self.actor_states + .entry(actor_id) + .or_insert_with(InflightActorState::not_started) + .subscribe_actor_mutation(start_prev_epoch, tx); + } + + pub(super) fn transform_to_issued( + &mut self, + barrier: &Barrier, + actor_ids_to_collect: HashSet, + table_ids: HashSet, + partial_graph_id: PartialGraphId, + ) { + let actor_to_stop = barrier.all_stop_actors(); + let graph_state = self + .graph_states + .entry(partial_graph_id) + .or_insert_with(|| { + PartialGraphManagedBarrierState::new( + partial_graph_id.is_global_graph(), + self.state_store.clone(), + self.streaming_metrics.clone(), + self.barrier_await_tree_reg.clone(), + ) + }); + + graph_state.transform_to_issued(barrier, actor_ids_to_collect.clone(), table_ids); + + // Note: it's important to issue barrier to actor after issuing to graph to ensure that + // we call `start_epoch` on the graph before the actors receive the barrier + for actor_id in actor_ids_to_collect { + self.actor_states + .entry(actor_id) + .or_insert_with(InflightActorState::not_started) + .issue_barrier( + partial_graph_id, + barrier, + actor_to_stop + .map(|actors| actors.contains(&actor_id)) + .unwrap_or(false), + ); + } + } + + pub(super) fn next_completed_epoch( + &mut self, + ) -> impl Future + '_ { + poll_fn(|cx| { + for (partial_graph_id, graph_state) in &mut self.graph_states { + if let Poll::Ready(epoch) = graph_state.poll_next_completed_epoch(cx) { + let partial_graph_id = *partial_graph_id; + return Poll::Ready((partial_graph_id, epoch)); + } + } + Poll::Pending + }) + } + + pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) { + let (prev_partial_graph_id, is_finished) = self + .actor_states + .get_mut(&actor_id) + .expect("should exist") + .collect(epoch); + if is_finished { + self.actor_states.remove(&actor_id); + } + let prev_graph_state = self + .graph_states + .get_mut(&prev_partial_graph_id) + .expect("should exist"); + prev_graph_state.collect(actor_id, epoch); + } +} + +impl PartialGraphManagedBarrierState { /// This method is called when barrier state is modified in either `Issued` or `Stashed` /// to transform the state to `AllCollected` and start state store `sync` when the barrier /// has been collected from all actors for an `Issued` barrier. @@ -396,14 +608,18 @@ impl ManagedBarrierState { None } BarrierKind::Barrier => { - dispatch_state_store!(&self.state_store, state_store, { - state_store.seal_epoch(prev_epoch, kind.is_checkpoint()); - }); + if self.need_seal_epoch { + dispatch_state_store!(&self.state_store, state_store, { + state_store.seal_epoch(prev_epoch, kind.is_checkpoint()); + }); + } None } BarrierKind::Checkpoint => { dispatch_state_store!(&self.state_store, state_store, { - state_store.seal_epoch(prev_epoch, kind.is_checkpoint()); + if self.need_seal_epoch { + state_store.seal_epoch(prev_epoch, kind.is_checkpoint()); + } Some(sync_epoch( state_store, &self.streaming_metrics, @@ -446,35 +662,6 @@ impl ManagedBarrierState { } } - /// Returns an iterator on epochs that is awaiting on `actor_id`. - /// This is used on notifying actor failure. On actor failure, the - /// barrier manager can call this method to iterate on epochs that - /// waits on the failed actor and then notify failure on the result - /// sender of the epoch. - pub(crate) fn epochs_await_on_actor( - &self, - actor_id: ActorId, - ) -> impl Iterator + '_ { - self.epoch_barrier_state_map - .iter() - .filter_map(move |(prev_epoch, barrier_state)| { - #[allow(clippy::single_match)] - match barrier_state.inner { - ManagedBarrierStateInner::Issued(IssuedState { - ref remaining_actors, - .. - }) => { - if remaining_actors.contains(&actor_id) { - Some(*prev_epoch) - } else { - None - } - } - _ => None, - } - }) - } - /// Collect a `barrier` from the actor with `actor_id`. pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) { tracing::debug!( @@ -566,11 +753,11 @@ impl ManagedBarrierState { if let Some((prev_epoch, prev_table_ids)) = self .prev_barrier_table_ids .replace((barrier.epoch, table_ids)) + && prev_epoch.curr == barrier.epoch.prev { - assert_eq!(prev_epoch.curr, barrier.epoch.prev); prev_table_ids } else { - info!(epoch = ?barrier.epoch, "initialize at Checkpoint barrier"); + debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier"); HashSet::new() }, ), @@ -587,45 +774,6 @@ impl ManagedBarrierState { } }; - for (actor_id, subscribers) in &mut self.mutation_subscribers { - if actor_ids_to_collect.contains(actor_id) { - if let Some((first_epoch, _)) = subscribers.pending_subscribers.first_key_value() { - assert!( - *first_epoch >= barrier.epoch.prev, - "barrier epoch {:?} skip subscribed epoch {}", - barrier.epoch, - first_epoch - ); - if *first_epoch == barrier.epoch.prev { - subscribers.started_subscribers.extend( - subscribers - .pending_subscribers - .pop_first() - .expect("should exist") - .1, - ); - } - } - subscribers.started_subscribers.retain(|tx| { - tx.send((barrier.epoch.prev, barrier.mutation.clone())) - .is_ok() - }); - } else { - subscribers.started_subscribers.clear(); - if let Some((first_epoch, _)) = subscribers.pending_subscribers.first_key_value() { - assert!( - *first_epoch > barrier.epoch.prev, - "barrier epoch {:?} skip subscribed epoch {}", - barrier.epoch, - first_epoch - ); - } - } - } - - self.mutation_subscribers - .retain(|_, subscribers| !subscribers.is_empty()); - self.epoch_barrier_state_map.insert( barrier.epoch.prev, BarrierState { @@ -643,17 +791,20 @@ impl ManagedBarrierState { } /// Return a future that yields the next completed epoch. The future is cancellation safe. - pub(crate) fn next_completed_epoch(&mut self) -> impl Future + '_ { - pending_on_none(self.await_epoch_completed_futures.next()).map(|(prev_epoch, result)| { - let state = self - .epoch_barrier_state_map - .get_mut(&prev_epoch) - .expect("should exist"); - // sanity check on barrier state - assert_matches!(&state.inner, ManagedBarrierStateInner::AllCollected); - state.inner = ManagedBarrierStateInner::Completed(result); - prev_epoch - }) + pub(crate) fn poll_next_completed_epoch(&mut self, cx: &mut Context<'_>) -> Poll { + ready!(self.await_epoch_completed_futures.next().poll_unpin(cx)) + .map(|(prev_epoch, result)| { + let state = self + .epoch_barrier_state_map + .get_mut(&prev_epoch) + .expect("should exist"); + // sanity check on barrier state + assert_matches!(&state.inner, ManagedBarrierStateInner::AllCollected); + state.inner = ManagedBarrierStateInner::Completed(result); + prev_epoch + }) + .map(Poll::Ready) + .unwrap_or(Poll::Pending) } /// Pop the completion result of an completed epoch. @@ -696,7 +847,7 @@ impl ManagedBarrierState { #[cfg(test)] async fn pop_next_completed_epoch(&mut self) -> u64 { - let epoch = self.next_completed_epoch().await; + let epoch = poll_fn(|cx| self.poll_next_completed_epoch(cx)).await; let _ = self.pop_completed_epoch(epoch).unwrap().unwrap(); epoch } @@ -709,11 +860,11 @@ mod tests { use risingwave_common::util::epoch::test_epoch; use crate::executor::Barrier; - use crate::task::barrier_manager::managed_state::ManagedBarrierState; + use crate::task::barrier_manager::managed_state::PartialGraphManagedBarrierState; #[tokio::test] async fn test_managed_state_add_actor() { - let mut managed_barrier_state = ManagedBarrierState::for_test(); + let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test(); let barrier1 = Barrier::new_test_barrier(test_epoch(1)); let barrier2 = Barrier::new_test_barrier(test_epoch(2)); let barrier3 = Barrier::new_test_barrier(test_epoch(3)); @@ -763,7 +914,7 @@ mod tests { #[tokio::test] async fn test_managed_state_stop_actor() { - let mut managed_barrier_state = ManagedBarrierState::for_test(); + let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test(); let barrier1 = Barrier::new_test_barrier(test_epoch(1)); let barrier2 = Barrier::new_test_barrier(test_epoch(2)); let barrier3 = Barrier::new_test_barrier(test_epoch(3)); diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index 69c603a4b1ab..9a243c2e975d 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -14,6 +14,8 @@ use std::fmt::{Display, Formatter}; +use risingwave_common::util::epoch::EpochPair; + use super::LocalBarrierManager; use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress; use crate::task::barrier_manager::LocalBarrierWorker; @@ -42,27 +44,29 @@ impl Display for BackfillState { impl LocalBarrierWorker { pub(crate) fn update_create_mview_progress( &mut self, - current_epoch: u64, + epoch: EpochPair, actor: ActorId, state: BackfillState, ) { - self.state - .create_mview_progress - .entry(current_epoch) - .or_default() - .insert(actor, state); + if let Some(actor_state) = self.state.actor_states.get(&actor) + && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev) + && let Some(graph_state) = self.state.graph_states.get_mut(partial_graph_id) + { + graph_state + .create_mview_progress + .entry(epoch.curr) + .or_default() + .insert(actor, state); + } else { + warn!(?epoch, actor, ?state, "ignore create mview progress"); + } } } impl LocalBarrierManager { - fn update_create_mview_progress( - &self, - current_epoch: u64, - actor: ActorId, - state: BackfillState, - ) { + fn update_create_mview_progress(&self, epoch: EpochPair, actor: ActorId, state: BackfillState) { self.send_event(ReportCreateProgress { - current_epoch, + epoch, actor, state, }) @@ -126,13 +130,10 @@ impl CreateMviewProgress { self.backfill_actor_id } - fn update_inner(&mut self, current_epoch: u64, state: BackfillState) { + fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) { self.state = Some(state); - self.barrier_manager.update_create_mview_progress( - current_epoch, - self.backfill_actor_id, - state, - ); + self.barrier_manager + .update_create_mview_progress(epoch, self.backfill_actor_id, state); } /// Update the progress to `ConsumingUpstream(consumed_epoch, consumed_rows)`. The epoch must be @@ -141,7 +142,7 @@ impl CreateMviewProgress { /// `current_consumed_rows` is an accumulated value. pub fn update( &mut self, - current_epoch: u64, + epoch: EpochPair, consumed_epoch: ConsumedEpoch, current_consumed_rows: ConsumedRows, ) { @@ -159,18 +160,18 @@ impl CreateMviewProgress { None => {} }; self.update_inner( - current_epoch, + epoch, BackfillState::ConsumingUpstream(consumed_epoch, current_consumed_rows), ); } /// Finish the progress. If the progress is already finished, then perform no-op. /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint. - pub fn finish(&mut self, current_epoch: u64, current_consumed_rows: ConsumedRows) { + pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) { if let Some(BackfillState::Done(_)) = self.state { return; } - self.update_inner(current_epoch, BackfillState::Done(current_consumed_rows)); + self.update_inner(epoch, BackfillState::Done(current_consumed_rows)); } } diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index b31537972bc5..b5382b341805 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -40,6 +40,25 @@ pub type DispatcherId = u64; pub type UpDownActorIds = (ActorId, ActorId); pub type UpDownFragmentIds = (FragmentId, FragmentId); +#[derive(Hash, Eq, PartialEq, Copy, Clone, Debug)] +struct PartialGraphId(u32); + +impl PartialGraphId { + fn new(id: u32) -> Self { + Self(id) + } + + fn is_global_graph(&self) -> bool { + self.0 == u32::MAX + } +} + +impl From for u32 { + fn from(val: PartialGraphId) -> u32 { + val.0 + } +} + /// Stores the information which may be modified from the data plane. /// /// The data structure is created in `LocalBarrierWorker` and is shared by actors created