diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index ddf0edff777fb..86044a77a463d 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -31,7 +31,7 @@ use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamin use crate::barrier::checkpoint::state::BarrierWorkerState; use crate::barrier::command::CommandContext; use crate::barrier::complete_task::{ - BarrierCompleteOutput, CompleteBarrierTask, CreatingJobCompleteBarrierTask, + BarrierCommitOutput, CompleteBarrierTask, CreatingJobCompleteBarrierTask, DatabaseCompleteBarrierTask, }; use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo}; @@ -66,7 +66,7 @@ impl CheckpointControl { pub(crate) fn ack_completed( &mut self, - output: BarrierCompleteOutput, + output: BarrierCommitOutput, control_stream_manager: &mut ControlStreamManager, ) { self.hummock_version_stats = output.hummock_version_stats; diff --git a/src/meta/src/barrier/complete_task.rs b/src/meta/src/barrier/complete_task.rs index 2ea0dbd656fc8..cbb3a054097ee 100644 --- a/src/meta/src/barrier/complete_task.rs +++ b/src/meta/src/barrier/complete_task.rs @@ -134,9 +134,9 @@ impl CompleteBarrierTask { } } -impl CompleteBarrierTask { - pub(super) async fn complete_barrier( - self, +impl CommittingTask { + pub(super) async fn commit_barrier( + task: CompleteBarrierTask, commit_info: CommitEpochInfo, context: &impl GlobalBarrierWorkerContext, env: MetaSrvEnv, @@ -146,7 +146,7 @@ impl CompleteBarrierTask { .barrier_wait_commit_latency .start_timer(); let version_stats = context.commit_epoch(commit_info).await?; - for command_ctx in self + for command_ctx in task .tasks .values() .flat_map(|(command, _)| command.as_ref().map(|task| &task.command)) @@ -162,22 +162,22 @@ impl CompleteBarrierTask { let version_stats = match result { Ok(version_stats) => version_stats, Err(e) => { - for notifier in self.notifiers { + for notifier in task.notifiers { notifier.notify_collection_failed(e.clone()); } return Err(e); } }; - self.notifiers.into_iter().for_each(|notifier| { + task.notifiers.into_iter().for_each(|notifier| { notifier.notify_collected(); }); try_join_all( - self.finished_jobs + task.finished_jobs .into_iter() .map(|finished_job| context.finish_creating_job(finished_job)), ) .await?; - for task in self.tasks.into_values().flat_map(|(task, _)| task) { + for task in task.tasks.into_values().flat_map(|(task, _)| task) { let duration_sec = task.enqueue_time.stop_and_record(); Self::report_complete_event(&env, duration_sec, &task.command); GLOBAL_META_METRICS @@ -189,9 +189,7 @@ impl CompleteBarrierTask { Ok(version_stats) } -} -impl CompleteBarrierTask { fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) { // Record barrier latency in event log. use risingwave_pb::meta::event_log; @@ -211,20 +209,20 @@ impl CompleteBarrierTask { } } -pub(super) struct BarrierCompleteOutput { +pub(super) struct BarrierCommitOutput { #[expect(clippy::type_complexity)] /// `database_id` -> (`Some(database_graph_committed_epoch)`, vec(`creating_job_id`, `creating_job_committed_epoch`, `is_finished`)]) pub epochs_to_ack: HashMap, Vec<(TableId, u64, bool)>)>, pub hummock_version_stats: HummockVersionStats, } -pub(super) struct SyncingTask { +pub(super) struct CompletingTask { node_to_collect: HashSet, collected_resps: HashMap, task: CompleteBarrierTask, } -impl SyncingTask { +impl CompletingTask { fn new( task_id: u64, task: CompleteBarrierTask, @@ -239,11 +237,11 @@ impl SyncingTask { }) } - pub(super) fn is_collected(&self) -> bool { + fn is_completed(&self) -> bool { self.node_to_collect.is_empty() } - pub(super) fn into_commit_info(self) -> (CommitEpochInfo, CompleteBarrierTask) { + fn into_commit_info(self) -> (CommitEpochInfo, CompleteBarrierTask) { assert!(self.node_to_collect.is_empty()); let (mut commit_info, old_value_ssts) = collect_resp_info(self.collected_resps); for (database_task, creating_jobs) in self.task.tasks.values() { @@ -277,18 +275,16 @@ impl SyncingTask { } } -pub(super) struct CompletingTask { +pub(super) struct CompletingTasks { next_task_id: u64, - pub(super) committing_task: CommittingTask, - pub(super) syncing_tasks: BTreeMap, + tasks: BTreeMap, } -impl CompletingTask { +impl CompletingTasks { pub(super) fn new() -> Self { Self { next_task_id: 0, - committing_task: CommittingTask::None, - syncing_tasks: Default::default(), + tasks: Default::default(), } } @@ -299,42 +295,70 @@ impl CompletingTask { ) -> MetaResult<()> { let task_id = self.next_task_id; self.next_task_id += 1; - let task = SyncingTask::new(task_id, task, control_stream_manager)?; - self.syncing_tasks.insert(task_id, task); + let task = CompletingTask::new(task_id, task, control_stream_manager)?; + self.tasks.insert(task_id, task); Ok(()) } - pub(super) fn next_completed_barrier<'a>( + pub(super) fn next_completed_task(&mut self) -> Option<(CommitEpochInfo, CompleteBarrierTask)> { + if let Some((_, task)) = self.tasks.first_key_value() + && task.is_completed() + { + let (_, task) = self.tasks.pop_first().expect("non-empty"); + Some(task.into_commit_info()) + } else { + None + } + } + + pub(super) fn on_barrier_complete_resp( + &mut self, + worker_id: WorkerId, + resp: BarrierCompleteResponse, + ) { + let task = self.tasks.get_mut(&resp.task_id).expect("should exist"); + assert!(task.node_to_collect.remove(&worker_id)); + task.collected_resps + .try_insert(worker_id, resp) + .expect("non-duplicate"); + } + + pub(super) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool { + self.tasks + .values() + .any(|task| task.node_to_collect.contains(&worker_id)) + } +} + +impl CommittingTask { + pub(super) fn next_committed_barrier<'a>( &'a mut self, + completing_tasks: &mut CompletingTasks, context: &Arc, env: &MetaSrvEnv, - ) -> impl Future> + 'a { + ) -> impl Future> + 'a { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. - if let CommittingTask::None = &self.committing_task { - if let Some((_, task)) = self.syncing_tasks.first_key_value() - && task.is_collected() - { - let (_, task) = self.syncing_tasks.pop_first().expect("non-empty"); - let (commit_info, task) = task.into_commit_info(); + if let CommittingTask::None = &self { + if let Some((commit_info, task)) = completing_tasks.next_completed_task() { let epochs_to_ack = task.epochs_to_ack(); let context = context.clone(); let env = env.clone(); let join_handle = tokio::spawn(async move { - task.complete_barrier(commit_info, &*context, env).await + CommittingTask::commit_barrier(task, commit_info, &*context, env).await }); - self.committing_task = CommittingTask::Committing { + *self = CommittingTask::Committing { epochs_to_ack, join_handle, }; } } - self.next_completed_barrier_inner() + self.next_committed_barrier_inner() } - async fn next_completed_barrier_inner(&mut self) -> MetaResult { - let CommittingTask::Committing { join_handle, .. } = &mut self.committing_task else { + async fn next_committed_barrier_inner(&mut self) -> MetaResult { + let CommittingTask::Committing { join_handle, .. } = self else { return pending().await; }; @@ -347,20 +371,19 @@ impl CompletingTask { }; // It's important to reset the completing_command after await no matter the result is err // or not, and otherwise the join handle will be polled again after ready. - let next_completing_command_status = if let Err(e) = &join_result { + let next_committing_task_status = if let Err(e) = &join_result { CommittingTask::Err(e.clone()) } else { CommittingTask::None }; - let completed_task = - replace(&mut self.committing_task, next_completing_command_status); + let committed_task = replace(self, next_committing_task_status); let hummock_version_stats = join_result?; - must_match!(completed_task, CommittingTask::Committing { + must_match!(committed_task, CommittingTask::Committing { epochs_to_ack, .. } => { - Ok(BarrierCompleteOutput { + Ok(BarrierCommitOutput { epochs_to_ack, hummock_version_stats, }) @@ -368,25 +391,4 @@ impl CompletingTask { } } } - - pub(super) fn on_barrier_complete_resp( - &mut self, - worker_id: WorkerId, - resp: BarrierCompleteResponse, - ) { - let task = self - .syncing_tasks - .get_mut(&resp.task_id) - .expect("should exist"); - assert!(task.node_to_collect.remove(&worker_id)); - task.collected_resps - .try_insert(worker_id, resp) - .expect("non-duplicate"); - } - - pub(super) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool { - self.syncing_tasks - .values() - .any(|task| task.node_to_collect.contains(&worker_id)) - } } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 5b84244e23ad2..75d94515ec470 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, HashSet}; use std::error::Error; use std::future::poll_fn; -use std::task::Poll; +use std::task::{Context, Poll}; use std::time::Duration; use anyhow::anyhow; @@ -38,7 +38,6 @@ use risingwave_pb::stream_service::{ InjectBarrierRequest, StreamingControlStreamRequest, }; use risingwave_rpc_client::StreamingControlHandle; -use rw_futures_util::pending_on_none; use thiserror_ext::AsReport; use tokio::time::{sleep, timeout}; use tokio_retry::strategy::ExponentialBackoff; @@ -170,16 +169,17 @@ impl ControlStreamManager { *self = Self::new(self.env.clone()); } - async fn next_response_inner( + fn poll_next_response( &mut self, - ) -> Option<( + cx: &mut Context<'_>, + ) -> Poll<( WorkerId, MetaResult, )> { if self.nodes.is_empty() { - return None; + return Poll::Pending; } - let (worker_id, result) = poll_fn(|cx| { + let result: Poll<(WorkerId, MetaResult<_>)> = { for (worker_id, node) in &mut self.nodes { match node.handle.response_stream.poll_next_unpin(cx) { Poll::Ready(result) => { @@ -213,18 +213,17 @@ impl ControlStreamManager { } } Poll::Pending - }) - .await; + }; - if let Err(err) = &result { + if let Poll::Ready((worker_id, Err(err))) = &result { let node = self .nodes - .remove(&worker_id) + .remove(worker_id) .expect("should exist when get shutdown resp"); warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); } - Some((worker_id, result)) + result } pub(super) async fn next_response( @@ -233,7 +232,7 @@ impl ControlStreamManager { WorkerId, MetaResult, ) { - pending_on_none(self.next_response_inner()).await + poll_fn(|cx| self.poll_next_response(cx)).await } pub(super) async fn collect_errors( @@ -244,14 +243,17 @@ impl ControlStreamManager { let mut errors = vec![(worker_id, first_err)]; #[cfg(not(madsim))] { - let _ = timeout(COLLECT_ERROR_TIMEOUT, async { - while let Some((worker_id, result)) = self.next_response_inner().await { - if let Err(e) = result { - errors.push((worker_id, e)); + if !self.nodes.is_empty() { + let _ = timeout(COLLECT_ERROR_TIMEOUT, async { + loop { + let (worker_id, result) = self.next_response().await; + if let Err(e) = result { + errors.push((worker_id, e)); + } } - } - }) - .await; + }) + .await; + } } tracing::debug!(?errors, "collected stream errors"); errors diff --git a/src/meta/src/barrier/worker.rs b/src/meta/src/barrier/worker.rs index 37b770011e3c2..51f0257178547 100644 --- a/src/meta/src/barrier/worker.rs +++ b/src/meta/src/barrier/worker.rs @@ -40,7 +40,7 @@ use tracing::{debug, error, info, warn, Instrument}; use crate::barrier::checkpoint::{ BarrierWorkerState, CheckpointControl, DatabaseCheckpointControl, }; -use crate::barrier::complete_task::{CommittingTask, CompletingTask}; +use crate::barrier::complete_task::{CommittingTask, CompletingTasks}; use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl}; use crate::barrier::info::BarrierInfo; use crate::barrier::progress::CreateMviewProgressTracker; @@ -88,8 +88,11 @@ pub(super) struct GlobalBarrierWorker { checkpoint_control: CheckpointControl, /// Command that has been collected but is still completing. - /// The join handle of the completing future is stored. - completing_task: CompletingTask, + completing_tasks: CompletingTasks, + + /// Command that has been completed but is still commiting. + /// The join handle of the committing future is stored. + committing_task: CommittingTask, request_rx: mpsc::UnboundedReceiver, @@ -148,7 +151,8 @@ impl GlobalBarrierWorker { context, env, checkpoint_control: CheckpointControl::default(), - completing_task: CompletingTask::new(), + completing_tasks: CompletingTasks::new(), + committing_task: CommittingTask::None, request_rx, active_streaming_nodes, sink_manager, @@ -252,7 +256,7 @@ impl GlobalBarrierWorker { .next_complete_barrier_task(&mut self.periodic_barriers) { if let Err(e) = self - .completing_task + .completing_tasks .push(next_complete_barrier_task, &mut self.control_stream_manager) { self.failure_recovery(e).await; @@ -313,8 +317,9 @@ impl GlobalBarrierWorker { } } complete_result = self - .completing_task - .next_completed_barrier( + .committing_task + .next_committed_barrier( + &mut self.completing_tasks, &self.context, &self.env, ) => { @@ -331,7 +336,7 @@ impl GlobalBarrierWorker { if let Err(e) = resp_result.and_then(|resp| { match resp { Response::CompleteBarrier(resp) => { - self.completing_task.on_barrier_complete_resp(worker_id, resp); + self.completing_tasks.on_barrier_complete_resp(worker_id, resp); Ok(()) }, Response::CollectBarrier(resp) => { @@ -344,7 +349,7 @@ impl GlobalBarrierWorker { }) { { - if self.checkpoint_control.is_failed_at_worker_err(worker_id) || self.completing_task.is_failed_at_worker_err(worker_id) { + if self.checkpoint_control.is_failed_at_worker_err(worker_id) || self.completing_tasks.is_failed_at_worker_err(worker_id) { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); self.report_collect_failure(&err); @@ -370,14 +375,15 @@ impl GlobalBarrierWorker { } // TODO: move this method to `complete_task.rs` and mark some structs and fields as private before merge -impl CompletingTask { +impl CommittingTask { pub(super) async fn clear_on_err( &mut self, + completing_tasks: &mut CompletingTasks, context: &impl GlobalBarrierWorkerContext, env: &MetaSrvEnv, ) { // join spawned completing command to finish no matter it succeeds or not. - let is_err = match replace(&mut self.committing_task, CommittingTask::None) { + let is_err = match replace(self, CommittingTask::None) { CommittingTask::None => false, CommittingTask::Committing { join_handle, .. } => { info!("waiting for completing command to finish in recovery"); @@ -397,13 +403,9 @@ impl CompletingTask { }; if !is_err { // continue to finish the pending collected barrier. - while let Some((_, task)) = self.syncing_tasks.pop_first() - && task.is_collected() - { - let (commit_info, task) = task.into_commit_info(); - if let Err(e) = task - .complete_barrier(commit_info, context, env.clone()) - .await + while let Some((commit_info, task)) = completing_tasks.next_completed_task() { + if let Err(e) = + CommittingTask::commit_barrier(task, commit_info, context, env.clone()).await { error!( err = ?e.as_report(), @@ -413,16 +415,16 @@ impl CompletingTask { } } } - *self = Self::new(); } } impl GlobalBarrierWorker { /// We need to make sure there are no changes when doing recovery pub async fn clear_on_err(&mut self, err: &MetaError) { - self.completing_task - .clear_on_err(&*self.context, &self.env) + self.committing_task + .clear_on_err(&mut self.completing_tasks, &*self.context, &self.env) .await; + self.completing_tasks = CompletingTasks::new(); self.checkpoint_control.clear_on_err(err); }