diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 2078828911a13..4cf0151c717f3 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -14,16 +14,17 @@ use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::future::{poll_fn, Future}; -use std::iter::empty; -use std::mem::take; +use std::mem::{replace, take}; use std::sync::Arc; -use std::task::Poll; +use std::task::{ready, Poll}; use std::time::Duration; +use anyhow::anyhow; use arc_swap::ArcSwap; use fail::fail_point; +use futures::FutureExt; use itertools::Itertools; use prometheus::HistogramTimer; use risingwave_common::bail; @@ -41,6 +42,7 @@ use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; +use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use risingwave_pb::stream_service::{ streaming_control_stream_response, BarrierCompleteResponse, StreamingControlStreamResponse, }; @@ -48,7 +50,7 @@ use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; -use tracing::{info, warn, Instrument}; +use tracing::{error, info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; @@ -199,7 +201,11 @@ pub struct GlobalBarrierManager { /// Controls the concurrent execution of commands. struct CheckpointControl { /// Save the state and message of barrier in order. - command_ctx_queue: VecDeque, + command_ctx_queue: BTreeMap, + + /// Command that has been collected but is still completing. + /// The join handle of the completing future is stored. + completing_command: CompletingCommand, context: GlobalBarrierManagerContext, } @@ -208,22 +214,31 @@ impl CheckpointControl { fn new(context: GlobalBarrierManagerContext) -> Self { Self { command_ctx_queue: Default::default(), + completing_command: CompletingCommand::None, context, } } + fn total_command_num(&self) -> usize { + self.command_ctx_queue.len() + + match &self.completing_command { + CompletingCommand::Completing { .. } => 1, + _ => 0, + } + } + /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { self.context.metrics.in_flight_barrier_nums.set( self.command_ctx_queue - .iter() + .values() .filter(|x| x.state.is_inflight()) .count() as i64, ); self.context .metrics .all_barrier_nums - .set(self.command_ctx_queue.len() as i64); + .set(self.total_command_num() as i64); } /// Enqueue a barrier command, and init its state to `InFlight`. @@ -235,42 +250,35 @@ impl CheckpointControl { ) { let timer = self.context.metrics.barrier_latency.start_timer(); - self.command_ctx_queue.push_back(EpochNode { - timer: Some(timer), - wait_commit_timer: None, - - state: BarrierEpochState { - node_to_collect, - resps: vec![], + if let Some((_, node)) = self.command_ctx_queue.last_key_value() { + assert_eq!( + command_ctx.prev_epoch.value(), + node.command_ctx.curr_epoch.value() + ); + } + self.command_ctx_queue.insert( + command_ctx.prev_epoch.value().0, + EpochNode { + enqueue_time: timer, + state: BarrierEpochState { + node_to_collect, + resps: vec![], + }, + command_ctx, + notifiers, }, - command_ctx, - notifiers, - }); - } - - fn wait_completed_barrier(&self) -> impl Future + '_ { - poll_fn(|_cx| { - if let Some(node) = self.command_ctx_queue.front() - && !node.state.is_inflight() - { - Poll::Ready(()) - } else { - Poll::Pending - } - }) + ); } + /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes + /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. fn barrier_collected( &mut self, worker_id: WorkerId, prev_epoch: u64, resp: BarrierCompleteResponse, ) { - if let Some(node) = self - .command_ctx_queue - .iter_mut() - .find(|x| x.command_ctx.prev_epoch.value().0 == prev_epoch) - { + if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) { assert!(node.state.node_to_collect.remove(&worker_id)); node.state.resps.push(resp); } else { @@ -281,42 +289,11 @@ impl CheckpointControl { } } - /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes - /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. - fn completed_barriers(&mut self) -> Vec { - // Find all continuous nodes with 'Complete' starting from first node - let index = self - .command_ctx_queue - .iter() - .position(|x| x.state.is_inflight()) - .unwrap_or(self.command_ctx_queue.len()); - let complete_nodes = self - .command_ctx_queue - .drain(..index) - .map(|mut node| { - // change state to complete, and wait for nodes with the smaller epoch to commit - let wait_commit_timer = self - .context - .metrics - .barrier_wait_commit_latency - .start_timer(); - node.wait_commit_timer = Some(wait_commit_timer); - node - }) - .collect_vec(); - complete_nodes - } - - /// Remove all nodes from queue and return them. - fn barrier_failed(&mut self) -> Vec { - self.command_ctx_queue.drain(..).collect_vec() - } - /// Pause inject barrier until True. fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool { let in_flight_not_full = self .command_ctx_queue - .iter() + .values() .filter(|x| x.state.is_inflight()) .count() < in_flight_barrier_nums; @@ -324,26 +301,96 @@ impl CheckpointControl { // Whether some command requires pausing concurrent barrier. If so, it must be the last one. let should_pause = self .command_ctx_queue - .back() - .map(|x| x.command_ctx.command.should_pause_inject_barrier()) + .last_key_value() + .map(|(_, x)| &x.command_ctx) + .or(match &self.completing_command { + CompletingCommand::None | CompletingCommand::Err(_) => None, + CompletingCommand::Completing { command_ctx, .. } => Some(command_ctx), + }) + .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); debug_assert_eq!( self.command_ctx_queue - .iter() - .any(|x| x.command_ctx.command.should_pause_inject_barrier()), + .values() + .map(|node| &node.command_ctx) + .chain( + match &self.completing_command { + CompletingCommand::None | CompletingCommand::Err(_) => None, + CompletingCommand::Completing { command_ctx, .. } => Some(command_ctx), + } + .into_iter() + ) + .any(|command_ctx| command_ctx.command.should_pause_inject_barrier()), should_pause ); in_flight_not_full && !should_pause } + + /// We need to make sure there are no changes when doing recovery + pub async fn clear_on_err(&mut self, err: &MetaError) { + // join spawned completing command to finish no matter it succeeds or not. + let is_err = match replace(&mut self.completing_command, CompletingCommand::None) { + CompletingCommand::None => false, + CompletingCommand::Completing { + command_ctx, + join_handle, + } => { + info!( + prev_epoch = ?command_ctx.prev_epoch, + curr_epoch = ?command_ctx.curr_epoch, + "waiting for completing command to finish in recovery" + ); + match join_handle.await { + Err(e) => { + warn!(err = ?e.as_report(), "failed to join completing task"); + } + Ok(Err(e)) => { + warn!(err = ?e.as_report(), "failed to complete barrier during clear"); + } + Ok(Ok(_)) => {} + }; + false + } + CompletingCommand::Err(_) => true, + }; + if !is_err { + // continue to finish the pending collected barrier. + while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + && !state.is_inflight() + { + let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); + let command_ctx = node.command_ctx.clone(); + if let Err(e) = self.context.clone().complete_barrier(node).await { + error!( + prev_epoch = ?command_ctx.prev_epoch, + curr_epoch = ?command_ctx.curr_epoch, + err = ?e.as_report(), + "failed to complete barrier during recovery" + ); + break; + } else { + info!( + prev_epoch = ?command_ctx.prev_epoch, + curr_epoch = ?command_ctx.curr_epoch, + "succeed to complete barrier during recovery" + ) + } + } + } + for (_, node) in take(&mut self.command_ctx_queue) { + for notifier in node.notifiers { + notifier.notify_collection_failed(err.clone()); + } + node.enqueue_time.observe_duration(); + } + } } /// The state and message of this barrier, a node for concurrent checkpoint. pub struct EpochNode { /// Timer for recording barrier latency, taken after `complete_barriers`. - timer: Option, - /// The timer of `barrier_wait_commit_latency` - wait_commit_timer: Option, + enqueue_time: HistogramTimer, /// Whether this barrier is in-flight or completed. state: BarrierEpochState, @@ -366,6 +413,19 @@ impl BarrierEpochState { } } +enum CompletingCommand { + None, + Completing { + command_ctx: Arc, + + // The join handle of a spawned task that completes the barrier. + // The return value indicate whether there is some create streaming job command + // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier + join_handle: JoinHandle>, + }, + Err(MetaError), +} + impl GlobalBarrierManager { /// Create a new [`crate::barrier::GlobalBarrierManager`]. #[allow(clippy::too_many_arguments)] @@ -615,20 +675,31 @@ impl GlobalBarrierManager { } Err(e) => { - self.failure_recovery(e, empty()).await; + self.failure_recovery(e).await; } } } - // Barrier completes. - () = self.checkpoint_control.wait_completed_barrier() => { - self.handle_barrier_complete().await; - } + complete_result = self.checkpoint_control.next_completed_barrier() => { + match complete_result { + Ok((command_context, remaining)) => { + // If there are remaining commands (that requires checkpoint to finish), we force + // the next barrier to be a checkpoint. + if remaining { + assert_matches!(command_context.kind, BarrierKind::Barrier); + self.scheduled_barriers.force_checkpoint_in_next_barrier(); + } + } + Err(e) => { + self.failure_recovery(e).await; + } + } + }, scheduled = self.scheduled_barriers.next_barrier(), if self .checkpoint_control .can_inject_barrier(self.in_flight_barrier_nums) => { if let Err(e) = self.handle_new_barrier(scheduled) { - self.failure_recovery(e, empty()).await; + self.failure_recovery(e).await; } } } @@ -701,51 +772,8 @@ impl GlobalBarrierManager { Ok(()) } - /// Changes the state to `Complete`, and try to commit all epoch that state is `Complete` in - /// order. If commit is err, all nodes will be handled. - async fn handle_barrier_complete(&mut self) { - let mut complete_nodes = self.checkpoint_control.completed_barriers(); - // try commit complete nodes - let (mut index, mut err_msg) = (0, None); - for (i, node) in complete_nodes.iter_mut().enumerate() { - let span = node.command_ctx.span.clone(); - if let Err(err) = self.complete_barrier(node).instrument(span).await { - index = i; - err_msg = Some(err); - break; - } - } - // Handle the error node and the nodes after it - if let Some(err) = err_msg { - warn!( - prev_epoch = ?complete_nodes[index].command_ctx.prev_epoch, - error = %err.as_report(), - "Failed to commit epoch" - ); - let fail_nodes = complete_nodes.drain(index..); - self.failure_recovery(err, fail_nodes).await; - } - } - - async fn failure_recovery( - &mut self, - err: MetaError, - fail_nodes: impl IntoIterator, - ) { - for node in fail_nodes - .into_iter() - .chain(self.checkpoint_control.barrier_failed().into_iter()) - { - if let Some(timer) = node.timer { - timer.observe_duration(); - } - if let Some(wait_commit_timer) = node.wait_commit_timer { - wait_commit_timer.observe_duration(); - } - node.notifiers - .into_iter() - .for_each(|notifier| notifier.notify_collection_failed(err.clone())); - } + async fn failure_recovery(&mut self, err: MetaError) { + self.checkpoint_control.clear_on_err(&err).await; if self.enable_recovery { self.context @@ -768,20 +796,53 @@ impl GlobalBarrierManager { panic!("failed to execute barrier: {}", err.as_report()); } } +} +impl GlobalBarrierManagerContext { /// Try to commit this node. If err, returns - async fn complete_barrier(&mut self, node: &mut EpochNode) -> MetaResult<()> { - let prev_epoch = node.command_ctx.prev_epoch.value().0; - assert!(!node.state.is_inflight()); - let resps = &mut node.state.resps; + async fn complete_barrier(self, node: EpochNode) -> MetaResult { + let EpochNode { + command_ctx, + mut notifiers, + enqueue_time, + state, + .. + } = node; + assert!(state.node_to_collect.is_empty()); + let resps = state.resps; + let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); + let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps); + if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await { + for notifier in notifiers { + notifier.notify_collection_failed(e.clone()); + } + return Err(e); + }; + notifiers.iter_mut().for_each(|notifier| { + notifier.notify_collected(); + }); + let result = self + .update_tracking_jobs(notifiers, command_ctx.clone(), create_mview_progress) + .await; + let duration_sec = enqueue_time.stop_and_record(); + self.report_complete_event(duration_sec, &command_ctx); + wait_commit_timer.observe_duration(); + result + } + + async fn update_snapshot( + &self, + command_ctx: &CommandContext, + commit_info: CommitEpochInfo, + ) -> MetaResult<()> { { { + let prev_epoch = command_ctx.prev_epoch.value().0; // We must ensure all epochs are committed in ascending order, // because the storage engine will query from new to old in the order in which // the L0 layer files are generated. // See https://github.com/risingwave-labs/risingwave/issues/1251 - let kind = node.command_ctx.kind; - let commit_info = collect_commit_epoch_info(resps); + let kind = command_ctx.kind; // hummock_manager commit epoch. let mut new_snapshot = None; @@ -793,25 +854,20 @@ impl GlobalBarrierManager { ), BarrierKind::Checkpoint => { new_snapshot = self - .context .hummock_manager - .commit_epoch(node.command_ctx.prev_epoch.value().0, commit_info) + .commit_epoch(command_ctx.prev_epoch.value().0, commit_info) .await?; } BarrierKind::Barrier => { - new_snapshot = Some( - self.context - .hummock_manager - .update_current_epoch(prev_epoch), - ); + new_snapshot = Some(self.hummock_manager.update_current_epoch(prev_epoch)); // if we collect a barrier(checkpoint = false), // we need to ensure that command is Plain and the notifier's checkpoint is // false - assert!(!node.command_ctx.command.need_checkpoint()); + assert!(!command_ctx.command.need_checkpoint()); } } - node.command_ctx.post_collect().await?; + command_ctx.post_collect().await?; // Notify new snapshot after fragment_mapping changes have been notified in // `post_collect`. if let Some(snapshot) = new_snapshot { @@ -822,16 +878,22 @@ impl GlobalBarrierManager { Info::HummockSnapshot(snapshot), ); } + Ok(()) + } + } + } + async fn update_tracking_jobs( + &self, + notifiers: Vec, + command_ctx: Arc, + create_mview_progress: Vec, + ) -> MetaResult { + { + { // Notify about collected. - let mut notifiers = take(&mut node.notifiers); - notifiers.iter_mut().for_each(|notifier| { - notifier.notify_collected(); - }); - - // Notify about collected. - let version_stats = self.context.hummock_manager.get_version_stats().await; - let mut tracker = self.context.tracker.lock().await; + let version_stats = self.hummock_manager.get_version_stats().await; + let mut tracker = self.tracker.lock().await; // Save `finished_commands` for Create MVs. let finished_commands = { @@ -839,7 +901,7 @@ impl GlobalBarrierManager { // Add the command to tracker. if let Some(command) = tracker.add( TrackingCommand { - context: node.command_ctx.clone(), + context: command_ctx.clone(), notifiers, }, &version_stats, @@ -848,9 +910,9 @@ impl GlobalBarrierManager { commands.push(command); } // Update the progress of all commands. - for progress in resps.iter().flat_map(|r| &r.create_mview_progress) { + for progress in create_mview_progress { // Those with actors complete can be finished immediately. - if let Some(command) = tracker.update(progress, &version_stats) { + if let Some(command) = tracker.update(&progress, &version_stats) { tracing::trace!(?progress, "finish progress"); commands.push(command); } else { @@ -864,41 +926,87 @@ impl GlobalBarrierManager { tracker.stash_command_to_finish(command); } - if let Some(table_id) = node.command_ctx.table_to_cancel() { + if let Some(table_id) = command_ctx.table_to_cancel() { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. tracker.cancel_command(table_id); } - let remaining = tracker.finish_jobs(kind.is_checkpoint()).await?; - // If there are remaining commands (that requires checkpoint to finish), we force - // the next barrier to be a checkpoint. - if remaining { - assert_matches!(kind, BarrierKind::Barrier); - self.scheduled_barriers.force_checkpoint_in_next_barrier(); - } + let has_remaining_job = tracker + .finish_jobs(command_ctx.kind.is_checkpoint()) + .await?; - let duration_sec = node.timer.take().unwrap().stop_and_record(); - node.wait_commit_timer.take().unwrap().observe_duration(); + Ok(has_remaining_job) + } + } + } + fn report_complete_event(&self, duration_sec: f64, command_ctx: &CommandContext) { + { + { { // Record barrier latency in event log. use risingwave_pb::meta::event_log; let event = event_log::EventBarrierComplete { - prev_epoch: node.command_ctx.prev_epoch.value().0, - cur_epoch: node.command_ctx.curr_epoch.value().0, + prev_epoch: command_ctx.prev_epoch.value().0, + cur_epoch: command_ctx.curr_epoch.value().0, duration_sec, - command: node.command_ctx.command.to_string(), - barrier_kind: node.command_ctx.kind.as_str_name().to_string(), + command: command_ctx.command.to_string(), + barrier_kind: command_ctx.kind.as_str_name().to_string(), }; self.env .event_log_manager_ref() .add_event_logs(vec![event_log::Event::BarrierComplete(event)]); } + } + } + } +} - Ok(()) +impl CheckpointControl { + pub(super) fn next_completed_barrier( + &mut self, + ) -> impl Future, bool)>> + '_ { + if matches!(&self.completing_command, CompletingCommand::None) { + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + if let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() + && !state.is_inflight() + { + let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); + let command_ctx = node.command_ctx.clone(); + let join_handle = tokio::spawn(self.context.clone().complete_barrier(node)); + self.completing_command = CompletingCommand::Completing { + command_ctx, + join_handle, + }; } } + + poll_fn(|cx| { + if let CompletingCommand::Completing { + join_handle, + command_ctx, + } = &mut self.completing_command + { + let join_result = ready!(join_handle.poll_unpin(cx)) + .map_err(|e| { + anyhow!("failed to join completing command: {:?}", e.as_report()).into() + }) + .and_then(|result| result); + let command_ctx = command_ctx.clone(); + if let Err(e) = &join_result { + self.completing_command = CompletingCommand::Err(e.clone()); + } else { + self.completing_command = CompletingCommand::None; + } + let result = join_result.map(move |has_remaining| (command_ctx, has_remaining)); + + Poll::Ready(result) + } else { + Poll::Pending + } + }) } } @@ -989,37 +1097,39 @@ impl GlobalBarrierManagerContext { pub type BarrierManagerRef = GlobalBarrierManagerContext; -fn collect_commit_epoch_info(resps: &mut [BarrierCompleteResponse]) -> CommitEpochInfo { +fn collect_commit_epoch_info( + resps: Vec, +) -> (CommitEpochInfo, Vec) { let mut sst_to_worker: HashMap = HashMap::new(); let mut synced_ssts: Vec = vec![]; - for resp in &mut *resps { - let mut t: Vec = resp - .synced_sstables - .iter_mut() - .map(|grouped| { - let sst_info = std::mem::take(&mut grouped.sst).expect("field not None"); - sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id); - ExtendedSstableInfo::new( - grouped.compaction_group_id, - sst_info, - std::mem::take(&mut grouped.table_stats_map), - ) - }) - .collect_vec(); - synced_ssts.append(&mut t); + let mut table_watermarks = Vec::with_capacity(resps.len()); + let mut progresses = Vec::new(); + for resp in resps { + let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| { + let sst_info = grouped.sst.expect("field not None"); + sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id); + ExtendedSstableInfo::new( + grouped.compaction_group_id, + sst_info, + grouped.table_stats_map, + ) + }); + synced_ssts.extend(ssts_iter); + table_watermarks.push(resp.table_watermarks); + progresses.extend(resp.create_mview_progress); } - CommitEpochInfo::new( + let info = CommitEpochInfo::new( synced_ssts, merge_multiple_new_table_watermarks( - resps - .iter() - .map(|resp| { - resp.table_watermarks - .iter() + table_watermarks + .into_iter() + .map(|watermarks| { + watermarks + .into_iter() .map(|(table_id, watermarks)| { ( - TableId::new(*table_id), - TableWatermarks::from_protobuf(watermarks), + TableId::new(table_id), + TableWatermarks::from_protobuf(&watermarks), ) }) .collect() @@ -1027,5 +1137,6 @@ fn collect_commit_epoch_info(resps: &mut [BarrierCompleteResponse]) -> CommitEpo .collect_vec(), ), sst_to_worker, - ) + ); + (info, progresses) }