diff --git a/proto/meta.proto b/proto/meta.proto index b6c9521ad673..15a16f36bddd 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -753,8 +753,8 @@ message EventLog { string error = 3; } message EventCollectBarrierFail { - uint64 prev_epoch = 1; - uint64 cur_epoch = 2; + reserved 1, 2; + reserved "prev_epoch", "cur_epoch"; string error = 3; } message EventWorkerNodePanic { diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index a064848dc267..beb77b3217ad 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -21,8 +21,10 @@ use fail::fail_point; use prometheus::HistogramTimer; use risingwave_common::catalog::{DatabaseId, TableId}; use risingwave_meta_model::WorkerId; +use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; +use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo; use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::{debug, warn}; @@ -35,22 +37,32 @@ use crate::barrier::notifier::Notifier; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{from_partial_graph_id, ControlStreamManager}; use crate::barrier::schedule::{NewBarrier, PeriodicBarriers}; -use crate::barrier::utils::{collect_commit_epoch_info, collect_creating_job_commit_epoch_info}; +use crate::barrier::utils::collect_creating_job_commit_epoch_info; use crate::barrier::{ BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, SnapshotBackfillInfo, TracedEpoch, }; use crate::manager::ActiveStreamingWorkerNodes; use crate::rpc::metrics::GLOBAL_META_METRICS; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; #[derive(Default)] pub(crate) struct CheckpointControl { - pub(crate) databases: HashMap, - pub(crate) hummock_version_stats: HummockVersionStats, + databases: HashMap, + hummock_version_stats: HummockVersionStats, } impl CheckpointControl { + pub(crate) fn new( + databases: HashMap, + hummock_version_stats: HummockVersionStats, + ) -> Self { + Self { + databases, + hummock_version_stats, + } + } + pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) { self.hummock_version_stats = output.hummock_version_stats; for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack { @@ -204,23 +216,88 @@ impl CheckpointControl { .values() .for_each(|database| database.update_barrier_nums_metrics()); } + + pub(crate) fn gen_ddl_progress(&self) -> HashMap { + let mut progress = HashMap::new(); + for database_checkpoint_control in self.databases.values() { + // Progress of normal backfill + progress.extend( + database_checkpoint_control + .create_mview_tracker + .gen_ddl_progress(), + ); + // Progress of snapshot backfill + for creating_job in database_checkpoint_control + .creating_streaming_job_controls + .values() + { + progress.extend([( + creating_job.info.table_fragments.table_id().table_id, + creating_job.gen_ddl_progress(), + )]); + } + } + progress + } + + pub(crate) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool { + for database_checkpoint_control in self.databases.values() { + let failed_barrier = + database_checkpoint_control.barrier_wait_collect_from_worker(worker_id as _); + if failed_barrier.is_some() + || database_checkpoint_control + .state + .inflight_graph_info + .contains_worker(worker_id as _) + || database_checkpoint_control + .creating_streaming_job_controls + .values() + .any(|job| job.is_wait_on_worker(worker_id)) + { + return true; + } + } + false + } + + pub(crate) fn clear_on_err(&mut self, err: &MetaError) { + for (_, node) in self + .databases + .values_mut() + .flat_map(|database| take(&mut database.command_ctx_queue)) + { + for notifier in node.notifiers { + notifier.notify_failed(err.clone()); + } + node.enqueue_time.observe_duration(); + } + self.databases + .values_mut() + .for_each(|database| database.create_mview_tracker.abort_all()); + } + + pub(crate) fn subscriptions(&self) -> impl Iterator + '_ { + self.databases + .values() + .flat_map(|database| &database.state.inflight_subscription_info) + } } /// Controls the concurrent execution of commands. pub(crate) struct DatabaseCheckpointControl { database_id: DatabaseId, - pub(crate) state: BarrierWorkerState, + state: BarrierWorkerState, /// Save the state and message of barrier in order. /// Key is the `prev_epoch`. - pub(crate) command_ctx_queue: BTreeMap, + command_ctx_queue: BTreeMap, /// The barrier that are completing. /// Some((`prev_epoch`, `should_pause_inject_barrier`)) completing_barrier: Option<(u64, bool)>, - pub(crate) creating_streaming_job_controls: HashMap, + creating_streaming_job_controls: HashMap, - pub(crate) create_mview_tracker: CreateMviewProgressTracker, + create_mview_tracker: CreateMviewProgressTracker, } impl DatabaseCheckpointControl { @@ -531,9 +608,12 @@ impl DatabaseCheckpointControl { let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty"); assert!(node.state.creating_jobs_to_wait.is_empty()); assert!(node.state.node_to_collect.is_empty()); - let mut finished_jobs = self - .create_mview_tracker - .apply_collected_command(&node, hummock_version_stats); + let mut finished_jobs = self.create_mview_tracker.apply_collected_command( + node.command_ctx.command.as_ref(), + &node.command_ctx.barrier_info, + &node.state.resps, + hummock_version_stats, + ); if !node.command_ctx.barrier_info.kind.is_checkpoint() { assert!(finished_jobs.is_empty()); node.notifiers.into_iter().for_each(|notifier| { @@ -561,10 +641,9 @@ impl DatabaseCheckpointControl { })); }); let task = task.get_or_insert_default(); - collect_commit_epoch_info( + node.command_ctx.collect_commit_epoch_info( &mut task.commit_info, take(&mut node.state.resps), - &node.command_ctx, self.collect_backfill_pinned_upstream_log_epoch(), ); self.completing_barrier = Some(( @@ -630,25 +709,25 @@ impl DatabaseCheckpointControl { } /// The state and message of this barrier, a node for concurrent checkpoint. -pub(crate) struct EpochNode { +struct EpochNode { /// Timer for recording barrier latency, taken after `complete_barriers`. - pub(crate) enqueue_time: HistogramTimer, + enqueue_time: HistogramTimer, /// Whether this barrier is in-flight or completed. - pub(crate) state: BarrierEpochState, + state: BarrierEpochState, /// Context of this command to generate barrier and do some post jobs. - pub(crate) command_ctx: CommandContext, + command_ctx: CommandContext, /// Notifiers of this barrier. - pub(crate) notifiers: Vec, + notifiers: Vec, } #[derive(Debug)] /// The state of barrier. -pub(crate) struct BarrierEpochState { - pub(crate) node_to_collect: HashSet, +struct BarrierEpochState { + node_to_collect: HashSet, - pub(crate) resps: Vec, + resps: Vec, creating_jobs_to_wait: HashSet, diff --git a/src/meta/src/barrier/checkpoint/mod.rs b/src/meta/src/barrier/checkpoint/mod.rs index a5144913258c..f9840f5eb387 100644 --- a/src/meta/src/barrier/checkpoint/mod.rs +++ b/src/meta/src/barrier/checkpoint/mod.rs @@ -16,5 +16,5 @@ mod control; mod creating_job; mod state; -pub(super) use control::{CheckpointControl, DatabaseCheckpointControl, EpochNode}; +pub(super) use control::{CheckpointControl, DatabaseCheckpointControl}; pub(super) use state::BarrierWorkerState; diff --git a/src/meta/src/barrier/checkpoint/state.rs b/src/meta/src/barrier/checkpoint/state.rs index 871ff7e7faf6..8f1f91272b2f 100644 --- a/src/meta/src/barrier/checkpoint/state.rs +++ b/src/meta/src/barrier/checkpoint/state.rs @@ -35,16 +35,16 @@ pub(crate) struct BarrierWorkerState { pending_non_checkpoint_barriers: Vec, /// Inflight running actors info. - pub(crate) inflight_graph_info: InflightDatabaseInfo, + pub(super) inflight_graph_info: InflightDatabaseInfo, - pub(crate) inflight_subscription_info: InflightSubscriptionInfo, + pub(super) inflight_subscription_info: InflightSubscriptionInfo, /// Whether the cluster is paused and the reason. paused_reason: Option, } impl BarrierWorkerState { - pub fn new() -> Self { + pub(super) fn new() -> Self { Self { in_flight_prev_epoch: TracedEpoch::new(Epoch::now()), pending_non_checkpoint_barriers: vec![], diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 23a8857e9ad1..73ebc8d44629 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -12,15 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Formatter; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; +use risingwave_common::must_match; use risingwave_common::types::Timestamptz; use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; +use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_meta_model::WorkerId; use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::common::PbWorkerNode; @@ -36,12 +39,15 @@ use risingwave_pb::stream_plan::{ DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation, }; +use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::warn; use super::info::{CommandFragmentChanges, InflightStreamingJobInfo}; use crate::barrier::info::BarrierInfo; +use crate::barrier::utils::collect_resp_info; use crate::barrier::InflightSubscriptionInfo; use crate::controller::fragment::InflightFragmentInfo; +use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use crate::manager::{DdlType, StreamingJob}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; @@ -430,23 +436,23 @@ impl BarrierKind { /// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given /// [`Command`]. -pub struct CommandContext { +pub(super) struct CommandContext { /// Resolved info in this barrier loop. - pub node_map: HashMap, - pub subscription_info: InflightSubscriptionInfo, + pub(super) node_map: HashMap, + subscription_info: InflightSubscriptionInfo, - pub barrier_info: BarrierInfo, + pub(super) barrier_info: BarrierInfo, - pub table_ids_to_commit: HashSet, + pub(super) table_ids_to_commit: HashSet, - pub command: Option, + pub(super) command: Option, /// The tracing span of this command. /// /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding /// barrier, including the process of waiting for the barrier to be sent, flowing through the /// stream graph on compute nodes, and finishing its `post_collect` stuffs. - pub _span: tracing::Span, + _span: tracing::Span, } impl std::fmt::Debug for CommandContext { @@ -477,7 +483,7 @@ impl CommandContext { } } - pub fn get_truncate_epoch(&self, retention_second: u64) -> Epoch { + fn get_truncate_epoch(&self, retention_second: u64) -> Epoch { let Some(truncate_timestamptz) = Timestamptz::from_secs( self.barrier_info .prev_epoch @@ -491,6 +497,86 @@ impl CommandContext { }; Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64) } + + pub(super) fn collect_commit_epoch_info( + &self, + info: &mut CommitEpochInfo, + resps: Vec, + backfill_pinned_log_epoch: HashMap)>, + ) { + let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = + collect_resp_info(resps); + + let new_table_fragment_infos = if let Some(Command::CreateStreamingJob { info, job_type }) = + &self.command + && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) + { + let table_fragments = &info.table_fragments; + let mut table_ids: HashSet<_> = table_fragments + .internal_table_ids() + .into_iter() + .map(TableId::new) + .collect(); + if let Some(mv_table_id) = table_fragments.mv_table_id() { + table_ids.insert(TableId::new(mv_table_id)); + } + + vec![NewTableFragmentInfo { table_ids }] + } else { + vec![] + }; + + let mut mv_log_store_truncate_epoch = HashMap::new(); + let mut update_truncate_epoch = + |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch + .entry(table_id.table_id) + { + Entry::Occupied(mut entry) => { + let prev_truncate_epoch = entry.get_mut(); + if truncate_epoch < *prev_truncate_epoch { + *prev_truncate_epoch = truncate_epoch; + } + } + Entry::Vacant(entry) => { + entry.insert(truncate_epoch); + } + }; + for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions { + if let Some(truncate_epoch) = subscriptions + .values() + .max() + .map(|max_retention| self.get_truncate_epoch(*max_retention).0) + { + update_truncate_epoch(*mv_table_id, truncate_epoch); + } + } + for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch { + for mv_table_id in upstream_mv_table_ids { + update_truncate_epoch(mv_table_id, backfill_epoch); + } + } + + let table_new_change_log = build_table_change_log_delta( + old_value_ssts.into_iter(), + synced_ssts.iter().map(|sst| &sst.sst_info), + must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs), + mv_log_store_truncate_epoch.into_iter(), + ); + + let epoch = self.barrier_info.prev_epoch(); + for table_id in &self.table_ids_to_commit { + info.tables_to_commit + .try_insert(*table_id, epoch) + .expect("non duplicate"); + } + + info.sstables.extend(synced_ssts); + info.new_table_watermarks.extend(new_table_watermarks); + info.sst_to_context.extend(sst_to_context); + info.new_table_fragment_infos + .extend(new_table_fragment_infos); + info.change_log_delta.extend(table_new_change_log); + } } impl Command { diff --git a/src/meta/src/barrier/context/mod.rs b/src/meta/src/barrier/context/mod.rs index f1e073792951..e69b9644de8d 100644 --- a/src/meta/src/barrier/context/mod.rs +++ b/src/meta/src/barrier/context/mod.rs @@ -66,18 +66,44 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static { async fn reload_runtime_info(&self) -> MetaResult; } -pub(crate) struct GlobalBarrierWorkerContextImpl { - pub(crate) scheduled_barriers: ScheduledBarriers, +pub(super) struct GlobalBarrierWorkerContextImpl { + scheduled_barriers: ScheduledBarriers, - pub(crate) status: Arc>, + status: Arc>, - pub(crate) metadata_manager: MetadataManager, + pub(super) metadata_manager: MetadataManager, - pub(crate) hummock_manager: HummockManagerRef, + hummock_manager: HummockManagerRef, - pub(crate) source_manager: SourceManagerRef, + source_manager: SourceManagerRef, - pub(crate) scale_controller: ScaleControllerRef, + scale_controller: ScaleControllerRef, - pub(crate) env: MetaSrvEnv, + pub(super) env: MetaSrvEnv, +} + +impl GlobalBarrierWorkerContextImpl { + pub(super) fn new( + scheduled_barriers: ScheduledBarriers, + status: Arc>, + metadata_manager: MetadataManager, + hummock_manager: HummockManagerRef, + source_manager: SourceManagerRef, + scale_controller: ScaleControllerRef, + env: MetaSrvEnv, + ) -> Self { + Self { + scheduled_barriers, + status, + metadata_manager, + hummock_manager, + source_manager, + scale_controller, + env, + } + } + + pub(super) fn status(&self) -> Arc> { + self.status.clone() + } } diff --git a/src/meta/src/barrier/manager.rs b/src/meta/src/barrier/manager.rs index 4b6573372123..1a8ff9be53f2 100644 --- a/src/meta/src/barrier/manager.rs +++ b/src/meta/src/barrier/manager.rs @@ -113,6 +113,8 @@ impl GlobalBarrierManager { scale_controller: ScaleControllerRef, ) -> (Arc, JoinHandle<()>, oneshot::Sender<()>) { let (request_tx, request_rx) = unbounded_channel(); + let hummock_manager_clone = hummock_manager.clone(); + let metadata_manager_clone = metadata_manager.clone(); let barrier_worker = GlobalBarrierWorker::new( scheduled_barriers, env, @@ -125,10 +127,10 @@ impl GlobalBarrierManager { ) .await; let manager = Self { - status: barrier_worker.context.status.clone(), - hummock_manager: barrier_worker.context.hummock_manager.clone(), + status: barrier_worker.context.status(), + hummock_manager: hummock_manager_clone, request_tx, - metadata_manager: barrier_worker.context.metadata_manager.clone(), + metadata_manager: metadata_manager_clone, }; let (join_handle, shutdown_tx) = barrier_worker.start(); (Arc::new(manager), join_handle, shutdown_tx) diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 17f044142b02..36d1a9a0b242 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -23,8 +23,9 @@ use risingwave_pb::catalog::CreateType; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; +use risingwave_pb::stream_service::PbBarrierCompleteResponse; -use crate::barrier::checkpoint::EpochNode; +use crate::barrier::info::BarrierInfo; use crate::barrier::{ Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan, }; @@ -419,12 +420,13 @@ impl CreateMviewProgressTracker { /// Return the finished jobs when the barrier kind is `Checkpoint` pub(super) fn apply_collected_command( &mut self, - epoch_node: &EpochNode, + command: Option<&Command>, + barrier_info: &BarrierInfo, + resps: impl IntoIterator, version_stats: &HummockVersionStats, ) -> Vec { - let command_ctx = &epoch_node.command_ctx; let new_tracking_job_info = - if let Some(Command::CreateStreamingJob { info, job_type }) = &command_ctx.command { + if let Some(Command::CreateStreamingJob { info, job_type }) = command { match job_type { CreateStreamingJobType::Normal => Some((info, None)), CreateStreamingJobType::SinkIntoTable(replace_table) => { @@ -438,26 +440,19 @@ impl CreateMviewProgressTracker { } else { None }; - assert!(epoch_node.state.node_to_collect.is_empty()); self.update_tracking_jobs( new_tracking_job_info, - epoch_node - .state - .resps - .iter() + resps + .into_iter() .flat_map(|resp| resp.create_mview_progress.iter()), version_stats, ); - if let Some(table_id) = command_ctx - .command - .as_ref() - .and_then(Command::table_to_cancel) - { + if let Some(table_id) = command.and_then(Command::table_to_cancel) { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. self.cancel_command(table_id); } - if command_ctx.barrier_info.kind.is_checkpoint() { + if barrier_info.kind.is_checkpoint() { self.take_finished_jobs() } else { vec![] diff --git a/src/meta/src/barrier/utils.rs b/src/meta/src/barrier/utils.rs index dfdc2a6ab7e8..7e49a0f9d49a 100644 --- a/src/meta/src/barrier/utils.rs +++ b/src/meta/src/barrier/utils.rs @@ -12,13 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::must_match; -use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map; use risingwave_hummock_sdk::table_watermark::{ @@ -27,8 +24,6 @@ use risingwave_hummock_sdk::table_watermark::{ use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo}; use risingwave_pb::stream_service::BarrierCompleteResponse; -use crate::barrier::command::CommandContext; -use crate::barrier::{BarrierKind, Command, CreateStreamingJobType}; use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; #[expect(clippy::type_complexity)] @@ -80,86 +75,6 @@ pub(super) fn collect_resp_info( ) } -pub(super) fn collect_commit_epoch_info( - info: &mut CommitEpochInfo, - resps: Vec, - command_ctx: &CommandContext, - backfill_pinned_log_epoch: HashMap)>, -) { - let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = - collect_resp_info(resps); - - let new_table_fragment_infos = if let Some(Command::CreateStreamingJob { info, job_type }) = - &command_ctx.command - && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) - { - let table_fragments = &info.table_fragments; - let mut table_ids: HashSet<_> = table_fragments - .internal_table_ids() - .into_iter() - .map(TableId::new) - .collect(); - if let Some(mv_table_id) = table_fragments.mv_table_id() { - table_ids.insert(TableId::new(mv_table_id)); - } - - vec![NewTableFragmentInfo { table_ids }] - } else { - vec![] - }; - - let mut mv_log_store_truncate_epoch = HashMap::new(); - let mut update_truncate_epoch = - |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch - .entry(table_id.table_id) - { - Entry::Occupied(mut entry) => { - let prev_truncate_epoch = entry.get_mut(); - if truncate_epoch < *prev_truncate_epoch { - *prev_truncate_epoch = truncate_epoch; - } - } - Entry::Vacant(entry) => { - entry.insert(truncate_epoch); - } - }; - for (mv_table_id, subscriptions) in &command_ctx.subscription_info.mv_depended_subscriptions { - if let Some(truncate_epoch) = subscriptions - .values() - .max() - .map(|max_retention| command_ctx.get_truncate_epoch(*max_retention).0) - { - update_truncate_epoch(*mv_table_id, truncate_epoch); - } - } - for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch { - for mv_table_id in upstream_mv_table_ids { - update_truncate_epoch(mv_table_id, backfill_epoch); - } - } - - let table_new_change_log = build_table_change_log_delta( - old_value_ssts.into_iter(), - synced_ssts.iter().map(|sst| &sst.sst_info), - must_match!(&command_ctx.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs), - mv_log_store_truncate_epoch.into_iter(), - ); - - let epoch = command_ctx.barrier_info.prev_epoch(); - for table_id in &command_ctx.table_ids_to_commit { - info.tables_to_commit - .try_insert(*table_id, epoch) - .expect("non duplicate"); - } - - info.sstables.extend(synced_ssts); - info.new_table_watermarks.extend(new_table_watermarks); - info.sst_to_context.extend(sst_to_context); - info.new_table_fragment_infos - .extend(new_table_fragment_infos); - info.change_log_delta.extend(table_new_change_log); -} - pub(super) fn collect_creating_job_commit_epoch_info( commit_info: &mut CommitEpochInfo, epoch: u64, diff --git a/src/meta/src/barrier/worker.rs b/src/meta/src/barrier/worker.rs index dfc902f31cfc..ee0cdd97fb61 100644 --- a/src/meta/src/barrier/worker.rs +++ b/src/meta/src/barrier/worker.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::HashMap; -use std::mem::{replace, take}; +use std::mem::replace; use std::sync::Arc; use std::time::Duration; @@ -118,15 +118,15 @@ impl GlobalBarrierWorker { let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))); - let context = Arc::new(GlobalBarrierWorkerContextImpl { + let context = Arc::new(GlobalBarrierWorkerContextImpl::new( scheduled_barriers, status, metadata_manager, hummock_manager, source_manager, scale_controller, - env: env.clone(), - }); + env.clone(), + )); let control_stream_manager = ControlStreamManager::new(env.clone()); @@ -258,15 +258,7 @@ impl GlobalBarrierWorker { if let Some(request) = request { match request { BarrierManagerRequest::GetDdlProgress(result_tx) => { - let mut progress = HashMap::new(); - for database_checkpoint_control in self.checkpoint_control.databases.values() { - // Progress of normal backfill - progress.extend(database_checkpoint_control.create_mview_tracker.gen_ddl_progress()); - // Progress of snapshot backfill - for creating_job in database_checkpoint_control.creating_streaming_job_controls.values() { - progress.extend([(creating_job.info.table_fragments.table_id().table_id, creating_job.gen_ddl_progress())]); - } - } + let progress = self.checkpoint_control.gen_ddl_progress(); if result_tx.send(progress).is_err() { error!("failed to send get ddl progress"); } @@ -287,7 +279,7 @@ impl GlobalBarrierWorker { info!(?changed_worker, "worker changed"); if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { - self.control_stream_manager.add_worker(node, self.checkpoint_control.databases.values().flat_map(|database| &database.state.inflight_subscription_info), &*self.context).await; + self.control_stream_manager.add_worker(node, self.checkpoint_control.subscriptions(), &*self.context).await; } } @@ -328,23 +320,11 @@ impl GlobalBarrierWorker { (worker_id, resp_result) = self.control_stream_manager.next_collect_barrier_response() => { if let Err(e) = resp_result.and_then(|resp| self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)) { { - let mut err = None; - for database_checkpoint_control in self.checkpoint_control.databases.values() { - let failed_barrier = database_checkpoint_control.barrier_wait_collect_from_worker(worker_id as _); - if failed_barrier.is_some() - || database_checkpoint_control.state.inflight_graph_info.contains_worker(worker_id as _) - || database_checkpoint_control.creating_streaming_job_controls.values().any(|job| job.is_wait_on_worker(worker_id)) { - - err = Some((e, failed_barrier)); - break; - } - } - if let Some((e, failed_barrier)) = err { + + if self.checkpoint_control.is_failed_at_worker_err(worker_id) { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); - if let Some(failed_barrier) = failed_barrier { - self.report_collect_failure(failed_barrier, &err); - } + self.report_collect_failure(&err); self.failure_recovery(err).await; } else { warn!(worker_id, "no barrier to collect from worker, ignore err"); @@ -424,21 +404,7 @@ impl GlobalBarrierWorker { } } } - for (_, node) in self - .checkpoint_control - .databases - .values_mut() - .flat_map(|database| take(&mut database.command_ctx_queue)) - { - for notifier in node.notifiers { - notifier.notify_failed(err.clone()); - } - node.enqueue_time.observe_duration(); - } - self.checkpoint_control - .databases - .values_mut() - .for_each(|database| database.create_mview_tracker.abort_all()); + self.checkpoint_control.clear_on_err(err); } } @@ -505,12 +471,10 @@ impl GlobalBarrierWorker { impl GlobalBarrierWorker { /// Send barrier-complete-rpc and wait for responses from all CNs - pub(super) fn report_collect_failure(&self, barrier_info: &BarrierInfo, error: &MetaError) { + pub(super) fn report_collect_failure(&self, error: &MetaError) { // Record failure in event log. use risingwave_pb::meta::event_log; let event = event_log::EventCollectBarrierFail { - prev_epoch: barrier_info.prev_epoch(), - cur_epoch: barrier_info.curr_epoch.value().0, error: error.to_report_string(), }; self.env @@ -728,10 +692,10 @@ impl GlobalBarrierWorker { ( active_streaming_nodes, control_stream_manager, - CheckpointControl { + CheckpointControl::new( databases, hummock_version_stats, - }, + ), ) }; if recovery_result.is_err() {