diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 49bb47eb984b6..65303bb83e917 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -143,7 +143,7 @@ steps: files: "*-junit.xml" format: "junit" - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 13 + timeout_in_minutes: 15 retry: *auto-retry - label: "end-to-end test (parallel, in-memory) (release)" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 45502ab3037c0..f34964cd49717 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -565,7 +565,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 20 + timeout_in_minutes: 25 retry: *auto-retry - label: "end-to-end test (deterministic simulation)" diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 62cc8746aeca1..c6f8179002bb9 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -22,7 +22,7 @@ message InjectBarrierRequest { repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11; } -message BarrierCompleteResponse { +message BarrierCollectResponse { message CreateMviewProgress { // Note: ideally we should use `executor_id`, but `actor_id` is ok-ish. // See . @@ -34,9 +34,19 @@ message BarrierCompleteResponse { uint64 consumed_rows = 4; uint32 pending_barrier_num = 5; } - string request_id = 1; - common.Status status = 2; + uint64 partial_graph_id = 1; + // prev_epoch of barrier + uint64 epoch = 2; repeated CreateMviewProgress create_mview_progress = 3; +} + +message BarrierCompleteRequest { + uint64 task_id = 1; + map partial_graph_sync_epochs = 2; +} + +message BarrierCompleteResponse { + uint64 task_id = 1; message LocalSstableInfo { reserved 1; reserved "compaction_group_id"; @@ -48,9 +58,6 @@ message BarrierCompleteResponse { uint32 worker_id = 5; map table_watermarks = 6; repeated hummock.SstableInfo old_value_sstables = 7; - uint64 partial_graph_id = 8; - // prev_epoch of barrier - uint64 epoch = 9; } message WaitEpochCommitRequest { @@ -85,6 +92,7 @@ message StreamingControlStreamRequest { InjectBarrierRequest inject_barrier = 2; RemovePartialGraphRequest remove_partial_graph = 3; CreatePartialGraphRequest create_partial_graph = 4; + BarrierCompleteRequest complete_barrier = 5; } } @@ -96,6 +104,7 @@ message StreamingControlStreamResponse { InitResponse init = 1; BarrierCompleteResponse complete_barrier = 2; ShutdownResponse shutdown = 3; + BarrierCollectResponse collect_barrier = 4; } } diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 2671fd533262f..f7b6b270d6507 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -86,10 +86,16 @@ impl MonitorService for MonitorServiceImpl { Default::default() }; + let mut barrier_traces_next_key = 0; + let barrier_traces_next_key = &mut barrier_traces_next_key; let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() { reg.collect::() .into_iter() - .map(|(k, v)| (k.prev_epoch, v.to_string())) + .map(|(k, v)| { + let key = *barrier_traces_next_key; + *barrier_traces_next_key += 1; + (key, format!("{:?}", (k.sync_graph_epochs, v.to_string()))) + }) .collect() } else { Default::default() diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index 1d7eef6f81b5e..0e06dc3828e53 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::hash_map::Entry; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::mem::take; use anyhow::anyhow; @@ -24,19 +24,21 @@ use risingwave_meta_model::WorkerId; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; -use risingwave_pb::stream_service::BarrierCompleteResponse; +use risingwave_pb::stream_service::BarrierCollectResponse; use tracing::{debug, warn}; use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl}; use crate::barrier::checkpoint::state::BarrierWorkerState; use crate::barrier::command::CommandContext; -use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask}; +use crate::barrier::complete_task::{ + BarrierCommitOutput, CompleteBarrierTask, CreatingJobCompleteBarrierTask, + DatabaseCompleteBarrierTask, +}; use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo}; use crate::barrier::notifier::Notifier; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{from_partial_graph_id, ControlStreamManager}; use crate::barrier::schedule::{NewBarrier, PeriodicBarriers}; -use crate::barrier::utils::collect_creating_job_commit_epoch_info; use crate::barrier::{ BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, InflightSubscriptionInfo, SnapshotBackfillInfo, TracedEpoch, @@ -62,38 +64,50 @@ impl CheckpointControl { } } - pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) { + pub(crate) fn ack_completed( + &mut self, + output: BarrierCommitOutput, + control_stream_manager: &mut ControlStreamManager, + ) { self.hummock_version_stats = output.hummock_version_stats; for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack { self.databases .get_mut(&database_id) .expect("should exist") - .ack_completed(command_prev_epoch, creating_job_epochs); + .ack_completed( + command_prev_epoch, + creating_job_epochs, + control_stream_manager, + ); } } pub(crate) fn next_complete_barrier_task( &mut self, - mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>, + periodic_barriers: &mut PeriodicBarriers, ) -> Option { let mut task = None; for database in self.databases.values_mut() { - let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c)); - database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats); + database.next_complete_barrier_task( + &mut task, + periodic_barriers, + &self.hummock_version_stats, + ); } task } pub(crate) fn barrier_collected( &mut self, - resp: BarrierCompleteResponse, + worker_id: WorkerId, + resp: BarrierCollectResponse, control_stream_manager: &mut ControlStreamManager, ) -> MetaResult<()> { let database_id = from_partial_graph_id(resp.partial_graph_id).0; self.databases .get_mut(&database_id) .expect("should exist") - .barrier_collected(resp, control_stream_manager) + .barrier_collected(worker_id, resp, control_stream_manager) } pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool { @@ -298,8 +312,9 @@ pub(crate) struct DatabaseCheckpointControl { /// Key is the `prev_epoch`. command_ctx_queue: BTreeMap, /// The barrier that are completing. - /// Some((`prev_epoch`, `should_pause_inject_barrier`)) - completing_barrier: Option<(u64, bool)>, + /// (`prev_epoch`, `should_pause_inject_barrier`) + /// Newer epoch at the front. + completing_barriers: VecDeque<(u64, bool)>, creating_streaming_job_controls: HashMap, @@ -312,7 +327,7 @@ impl DatabaseCheckpointControl { database_id, state: BarrierWorkerState::new(), command_ctx_queue: Default::default(), - completing_barrier: None, + completing_barriers: Default::default(), creating_streaming_job_controls: Default::default(), create_mview_tracker: Default::default(), } @@ -327,18 +342,14 @@ impl DatabaseCheckpointControl { database_id, state, command_ctx_queue: Default::default(), - completing_barrier: None, + completing_barriers: Default::default(), creating_streaming_job_controls: Default::default(), create_mview_tracker, } } fn total_command_num(&self) -> usize { - self.command_ctx_queue.len() - + match &self.completing_barrier { - Some(_) => 1, - None => 0, - } + self.command_ctx_queue.len() + self.completing_barriers.len() } /// Update the metrics of barrier nums. @@ -410,7 +421,7 @@ impl DatabaseCheckpointControl { enqueue_time: timer, state: BarrierEpochState { node_to_collect, - resps: vec![], + collected_resps: Default::default(), creating_jobs_to_wait, finished_jobs: HashMap::new(), }, @@ -424,10 +435,10 @@ impl DatabaseCheckpointControl { /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. fn barrier_collected( &mut self, - resp: BarrierCompleteResponse, + worker_id: WorkerId, + resp: BarrierCollectResponse, control_stream_manager: &mut ControlStreamManager, ) -> MetaResult<()> { - let worker_id = resp.worker_id; let prev_epoch = resp.epoch; tracing::trace!( worker_id, @@ -440,8 +451,11 @@ impl DatabaseCheckpointControl { match creating_job_id { None => { if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) { - assert!(node.state.node_to_collect.remove(&(worker_id as _))); - node.state.resps.push(resp); + assert!(node.state.node_to_collect.remove(&worker_id)); + node.state + .collected_resps + .try_insert(worker_id, resp) + .expect("non duplicate"); } else { panic!( "collect barrier on non-existing barrier: {}, {}", @@ -472,15 +486,17 @@ impl DatabaseCheckpointControl { let should_pause = self .command_ctx_queue .last_key_value() - .and_then(|(_, x)| { + .map(|(_, x)| { x.command_ctx .command .as_ref() .map(Command::should_pause_inject_barrier) + .unwrap_or(false) }) .or(self - .completing_barrier - .map(|(_, should_pause)| should_pause)) + .completing_barriers + .front() + .map(|(_, should_pause)| *should_pause)) .unwrap_or(false); debug_assert_eq!( self.command_ctx_queue @@ -492,9 +508,9 @@ impl DatabaseCheckpointControl { .map(Command::should_pause_inject_barrier) }) .chain( - self.completing_barrier - .map(|(_, should_pause)| should_pause) - .into_iter() + self.completing_barriers + .iter() + .map(|(_, should_pause)| *should_pause) ) .any(|should_pause| should_pause), should_pause @@ -520,7 +536,7 @@ impl DatabaseCheckpointControl { impl DatabaseCheckpointControl { /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`}) - fn collect_backfill_pinned_upstream_log_epoch( + pub(crate) fn collect_backfill_pinned_upstream_log_epoch( &self, ) -> HashMap)> { self.creating_streaming_job_controls @@ -547,7 +563,7 @@ impl DatabaseCheckpointControl { fn next_complete_barrier_task( &mut self, task: &mut Option, - mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>, + periodic_barriers: &mut PeriodicBarriers, hummock_version_stats: &HummockVersionStats, ) { // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough @@ -560,32 +576,33 @@ impl DatabaseCheckpointControl { .first_key_value() .map(|(epoch, _)| *epoch); for (table_id, job) in &mut self.creating_streaming_job_controls { - if let Some((epoch, resps, status)) = + if let Some((epoch, status, workers)) = job.start_completing(min_upstream_inflight_barrier) { - let is_first_time = match status { + let is_first_commit = match status { CompleteJobType::First => true, CompleteJobType::Normal => false, CompleteJobType::Finished => { - finished_jobs.push((*table_id, epoch, resps)); + finished_jobs.push((*table_id, epoch, workers)); continue; } }; - creating_jobs_task.push((*table_id, epoch, resps, is_first_time)); + creating_jobs_task.push(CreatingJobCompleteBarrierTask { + job_id: *table_id, + epoch, + is_first_commit, + tables_to_commit: job + .info + .stream_job_fragments + .all_table_ids() + .map(TableId::new) + .collect(), + workers, + is_finished: false, + }); } } - if !finished_jobs.is_empty() - && let Some((_, control_stream_manager)) = &mut context - { - control_stream_manager.remove_partial_graph( - self.database_id, - finished_jobs - .iter() - .map(|(table_id, _, _)| *table_id) - .collect(), - ); - } - for (table_id, epoch, resps) in finished_jobs { + for (table_id, epoch, workers) in finished_jobs { let epoch_state = &mut self .command_ctx_queue .get_mut(&epoch) @@ -593,20 +610,16 @@ impl DatabaseCheckpointControl { .state; assert!(epoch_state.creating_jobs_to_wait.remove(&table_id)); debug!(epoch, ?table_id, "finish creating job"); - // It's safe to remove the creating job, because on CompleteJobType::Finished, - // all previous barriers have been collected and completed. let creating_streaming_job = self .creating_streaming_job_controls - .remove(&table_id) + .get(&table_id) .expect("should exist"); - assert!(creating_streaming_job.is_finished()); assert!(epoch_state .finished_jobs - .insert(table_id, (creating_streaming_job.info, resps)) + .insert(table_id, (creating_streaming_job.info.clone(), workers)) .is_none()); } } - assert!(self.completing_barrier.is_none()); while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() && !state.is_inflight() { @@ -617,7 +630,7 @@ impl DatabaseCheckpointControl { let mut finished_jobs = self.create_mview_tracker.apply_collected_command( node.command_ctx.command.as_ref(), &node.command_ctx.barrier_info, - &node.state.resps, + node.state.collected_resps.values(), hummock_version_stats, ); if !node.command_ctx.barrier_info.kind.is_checkpoint() { @@ -625,34 +638,39 @@ impl DatabaseCheckpointControl { node.notifiers.into_iter().for_each(|notifier| { notifier.notify_collected(); }); - if let Some((scheduled_barriers, _)) = &mut context - && self.create_mview_tracker.has_pending_finished_jobs() + if self.create_mview_tracker.has_pending_finished_jobs() && self .command_ctx_queue .values() .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint()) { - scheduled_barriers.force_checkpoint_in_next_barrier(); + periodic_barriers.force_checkpoint_in_next_barrier(); } continue; } node.state .finished_jobs .drain() - .for_each(|(_, (info, resps))| { - node.state.resps.extend(resps); + .for_each(|(_, (info, workers))| { + creating_jobs_task.push(CreatingJobCompleteBarrierTask { + job_id: info.stream_job_fragments.stream_job_id(), + epoch: node.command_ctx.barrier_info.prev_epoch(), + is_first_commit: false, + tables_to_commit: info + .stream_job_fragments + .all_table_ids() + .map(TableId::new) + .collect(), + workers, + is_finished: true, + }); finished_jobs.push(TrackingJob::New(TrackingCommand { info, replace_stream_job: None, })); }); let task = task.get_or_insert_default(); - node.command_ctx.collect_commit_epoch_info( - &mut task.commit_info, - take(&mut node.state.resps), - self.collect_backfill_pinned_upstream_log_epoch(), - ); - self.completing_barrier = Some(( + self.completing_barriers.push_front(( node.command_ctx.barrier_info.prev_epoch(), node.command_ctx .command @@ -662,10 +680,19 @@ impl DatabaseCheckpointControl { )); task.finished_jobs.extend(finished_jobs); task.notifiers.extend(node.notifiers); - task.epoch_infos + task.tasks .try_insert( self.database_id, - (Some((node.command_ctx, node.enqueue_time)), vec![]), + ( + Some(DatabaseCompleteBarrierTask { + command: node.command_ctx, + enqueue_time: node.enqueue_time, + backfill_pinned_upstream_log_epoch: self + .collect_backfill_pinned_upstream_log_epoch(), + workers: node.state.collected_resps.keys().cloned().collect(), + }), + vec![], + ), ) .expect("non duplicate"); break; @@ -673,21 +700,9 @@ impl DatabaseCheckpointControl { } if !creating_jobs_task.is_empty() { let task = task.get_or_insert_default(); - for (table_id, epoch, resps, is_first_time) in creating_jobs_task { - collect_creating_job_commit_epoch_info( - &mut task.commit_info, - epoch, - resps, - self.creating_streaming_job_controls[&table_id] - .info - .stream_job_fragments - .all_table_ids() - .map(TableId::new), - is_first_time, - ); - let (_, creating_job_epochs) = - task.epoch_infos.entry(self.database_id).or_default(); - creating_job_epochs.push((table_id, epoch)); + for job_task in creating_jobs_task { + let (_, creating_job_epochs) = task.tasks.entry(self.database_id).or_default(); + creating_job_epochs.push(job_task); } } } @@ -695,20 +710,35 @@ impl DatabaseCheckpointControl { fn ack_completed( &mut self, command_prev_epoch: Option, - creating_job_epochs: Vec<(TableId, u64)>, + creating_job_epochs: Vec<(TableId, u64, bool)>, + control_stream_manager: &mut ControlStreamManager, ) { { - assert_eq!( - self.completing_barrier - .take() - .map(|(prev_epoch, _)| prev_epoch), - command_prev_epoch - ); - for (table_id, epoch) in creating_job_epochs { + if let Some(command_prev_epoch) = command_prev_epoch { + assert_eq!( + self.completing_barriers.pop_back().expect("should exist").0, + command_prev_epoch + ); + } + let mut finished_jobs: Option> = None; + for (table_id, epoch, is_finished) in creating_job_epochs { self.creating_streaming_job_controls .get_mut(&table_id) .expect("should exist") - .ack_completed(epoch) + .ack_completed(epoch); + if is_finished { + let job = self + .creating_streaming_job_controls + .remove(&table_id) + .expect("should exist"); + assert!(job.is_finished()); + finished_jobs + .get_or_insert_default() + .push(job.info.stream_job_fragments.stream_job_id()); + } + } + if let Some(jobs) = finished_jobs { + control_stream_manager.remove_partial_graph(self.database_id, jobs); } } } @@ -732,12 +762,11 @@ struct EpochNode { /// The state of barrier. struct BarrierEpochState { node_to_collect: HashSet, - - resps: Vec, + collected_resps: HashMap, creating_jobs_to_wait: HashSet, - finished_jobs: HashMap)>, + finished_jobs: HashMap)>, } impl BarrierEpochState { diff --git a/src/meta/src/barrier/checkpoint/creating_job/barrier_control.rs b/src/meta/src/barrier/checkpoint/creating_job/barrier_control.rs index 7d238c37d4ebb..40c6383535c04 100644 --- a/src/meta/src/barrier/checkpoint/creating_job/barrier_control.rs +++ b/src/meta/src/barrier/checkpoint/creating_job/barrier_control.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashSet, VecDeque}; -use std::mem::take; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::ops::Bound::Unbounded; use std::ops::{Bound, RangeBounds}; use std::time::Instant; @@ -22,7 +21,7 @@ use prometheus::HistogramTimer; use risingwave_common::catalog::TableId; use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge}; use risingwave_meta_model::WorkerId; -use risingwave_pb::stream_service::BarrierCompleteResponse; +use risingwave_pb::stream_service::BarrierCollectResponse; use tracing::debug; use crate::rpc::metrics::GLOBAL_META_METRICS; @@ -31,7 +30,7 @@ use crate::rpc::metrics::GLOBAL_META_METRICS; struct CreatingStreamingJobEpochState { epoch: u64, node_to_collect: HashSet, - resps: Vec, + collected_resps: HashMap, is_checkpoint: bool, enqueue_time: Instant, } @@ -46,7 +45,8 @@ pub(super) struct CreatingStreamingJobBarrierControl { max_collected_epoch: Option, // newer epoch at the front. pending_barriers_to_complete: VecDeque, - completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>, + // newer epoch at the front. + completing_barriers: VecDeque<(CreatingStreamingJobEpochState, HistogramTimer)>, // metrics consuming_snapshot_barrier_latency: LabelGuardedHistogram<2>, @@ -66,7 +66,7 @@ impl CreatingStreamingJobBarrierControl { initial_epoch: None, max_collected_epoch: None, pending_barriers_to_complete: Default::default(), - completing_barrier: None, + completing_barriers: Default::default(), consuming_snapshot_barrier_latency: GLOBAL_META_METRICS .snapshot_backfill_barrier_latency @@ -107,7 +107,7 @@ impl CreatingStreamingJobBarrierControl { pub(super) fn is_empty(&self) -> bool { self.inflight_barrier_queue.is_empty() && self.pending_barriers_to_complete.is_empty() - && self.completing_barrier.is_none() + && self.completing_barriers.is_empty() } pub(super) fn enqueue_epoch( @@ -132,7 +132,7 @@ impl CreatingStreamingJobBarrierControl { let epoch_state = CreatingStreamingJobEpochState { epoch, node_to_collect, - resps: vec![], + collected_resps: Default::default(), is_checkpoint, enqueue_time: Instant::now(), }; @@ -149,7 +149,7 @@ impl CreatingStreamingJobBarrierControl { &mut self, epoch: u64, worker_id: WorkerId, - resp: BarrierCompleteResponse, + resp: BarrierCollectResponse, ) { debug!( epoch, @@ -163,7 +163,10 @@ impl CreatingStreamingJobBarrierControl { .get_mut(&epoch) .expect("should exist"); assert!(state.node_to_collect.remove(&worker_id)); - state.resps.push(resp); + state + .collected_resps + .try_insert(worker_id, resp) + .expect("non-duplicate"); while let Some((_, state)) = self.inflight_barrier_queue.first_key_value() && state.node_to_collect.is_empty() { @@ -183,13 +186,12 @@ impl CreatingStreamingJobBarrierControl { pub(super) fn start_completing( &mut self, epoch_end_bound: Bound, - ) -> Option<(u64, Vec, bool)> { - assert!(self.completing_barrier.is_none()); + ) -> Option<(u64, bool, HashSet)> { let epoch_range: (Bound, Bound) = (Unbounded, epoch_end_bound); while let Some(epoch_state) = self.pending_barriers_to_complete.back() && epoch_range.contains(&epoch_state.epoch) { - let mut epoch_state = self + let epoch_state = self .pending_barriers_to_complete .pop_back() .expect("non-empty"); @@ -200,10 +202,10 @@ impl CreatingStreamingJobBarrierControl { } else if !epoch_state.is_checkpoint { continue; } - - let resps = take(&mut epoch_state.resps); - self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer())); - return Some((epoch, resps, is_first)); + let workers = epoch_state.collected_resps.keys().cloned().collect(); + self.completing_barriers + .push_front((epoch_state, self.wait_commit_latency.start_timer())); + return Some((epoch, is_first, workers)); } None } @@ -213,7 +215,7 @@ impl CreatingStreamingJobBarrierControl { /// Return the upstream epoch to be notified when there is any. pub(super) fn ack_completed(&mut self, completed_epoch: u64) { let (epoch_state, wait_commit_timer) = - self.completing_barrier.take().expect("should exist"); + self.completing_barriers.pop_back().expect("should exist"); wait_commit_timer.observe_duration(); assert_eq!(epoch_state.epoch, completed_epoch); } diff --git a/src/meta/src/barrier/checkpoint/creating_job/mod.rs b/src/meta/src/barrier/checkpoint/creating_job/mod.rs index ef1f5a8a886c4..f0e752e4b5a6b 100644 --- a/src/meta/src/barrier/checkpoint/creating_job/mod.rs +++ b/src/meta/src/barrier/checkpoint/creating_job/mod.rs @@ -16,7 +16,7 @@ mod barrier_control; mod status; use std::cmp::max; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::ops::Bound::{Excluded, Unbounded}; use barrier_control::CreatingStreamingJobBarrierControl; @@ -26,7 +26,7 @@ use risingwave_meta_model::WorkerId; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_plan::barrier_mutation::Mutation; -use risingwave_pb::stream_service::BarrierCompleteResponse; +use risingwave_pb::stream_service::BarrierCollectResponse; use status::{CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus}; use tracing::info; @@ -254,7 +254,7 @@ impl CreatingStreamingJobControl { &mut self, epoch: u64, worker_id: WorkerId, - resp: BarrierCompleteResponse, + resp: BarrierCollectResponse, control_stream_manager: &mut ControlStreamManager, ) -> MetaResult<()> { let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress); @@ -301,7 +301,7 @@ impl CreatingStreamingJobControl { pub(super) fn start_completing( &mut self, min_upstream_inflight_epoch: Option, - ) -> Option<(u64, Vec, CompleteJobType)> { + ) -> Option<(u64, CompleteJobType, HashSet)> { let (finished_at_epoch, epoch_end_bound) = match &self.status { CreatingStreamingJobStatus::Finishing(finish_at_epoch) => { let epoch_end_bound = min_upstream_inflight_epoch @@ -324,12 +324,10 @@ impl CreatingStreamingJobControl { ), }; self.barrier_control.start_completing(epoch_end_bound).map( - |(epoch, resps, is_first_commit)| { + |(epoch, is_first_commit, workers)| { let status = if let Some(finish_at_epoch) = finished_at_epoch { assert!(!is_first_commit); if epoch == finish_at_epoch { - self.barrier_control.ack_completed(epoch); - assert!(self.barrier_control.is_empty()); CompleteJobType::Finished } else { CompleteJobType::Normal @@ -339,7 +337,7 @@ impl CreatingStreamingJobControl { } else { CompleteJobType::Normal }; - (epoch, resps, status) + (epoch, status, workers) }, ) } diff --git a/src/meta/src/barrier/checkpoint/creating_job/status.rs b/src/meta/src/barrier/checkpoint/creating_job/status.rs index f5d18400a98e1..e20f8bff2dbca 100644 --- a/src/meta/src/barrier/checkpoint/creating_job/status.rs +++ b/src/meta/src/barrier/checkpoint/creating_job/status.rs @@ -22,9 +22,7 @@ use risingwave_meta_model::WorkerId; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::StreamActor; -use risingwave_pb::stream_service::barrier_complete_response::{ - CreateMviewProgress, PbCreateMviewProgress, -}; +use risingwave_pb::stream_service::barrier_collect_response::PbCreateMviewProgress; use tracing::warn; use crate::barrier::progress::CreateMviewProgressTracker; @@ -133,7 +131,7 @@ pub(super) struct CreatingJobInjectBarrierInfo { impl CreatingStreamingJobStatus { pub(super) fn update_progress( &mut self, - create_mview_progress: impl IntoIterator, + create_mview_progress: impl IntoIterator, ) -> Option> { match self { Self::ConsumingSnapshot { diff --git a/src/meta/src/barrier/checkpoint/state.rs b/src/meta/src/barrier/checkpoint/state.rs index 8f1f91272b2f8..a4d54125ae344 100644 --- a/src/meta/src/barrier/checkpoint/state.rs +++ b/src/meta/src/barrier/checkpoint/state.rs @@ -21,7 +21,6 @@ use risingwave_pb::meta::PausedReason; use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightSubscriptionInfo}; use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch}; -use crate::controller::fragment::InflightFragmentInfo; /// The latest state of `GlobalBarrierWorker` after injecting the latest barrier. pub(crate) struct BarrierWorkerState { @@ -152,19 +151,16 @@ impl BarrierWorkerState { let info = self.inflight_graph_info.clone(); let subscription_info = self.inflight_subscription_info.clone(); + let table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect(); if let Some(fragment_changes) = fragment_changes { self.inflight_graph_info.post_apply(&fragment_changes); } - let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect(); let mut jobs_to_wait = HashSet::new(); if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command { for (table_id, (_, graph_info)) in jobs_to_merge { jobs_to_wait.insert(*table_id); - table_ids_to_commit.extend(InflightFragmentInfo::existing_table_ids( - graph_info.fragment_infos(), - )); self.inflight_graph_info.extend(graph_info.clone()); } } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 4e7a659c5029d..f91a4b4354a38 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -23,7 +23,9 @@ use risingwave_common::must_match; use risingwave_common::types::Timestamptz; use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; -use risingwave_hummock_sdk::change_log::build_table_change_log_delta; +use risingwave_hummock_sdk::change_log::{build_table_change_log_delta, ChangeLogDelta}; +use risingwave_hummock_sdk::sstable_info::SstableInfo; +use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_meta_model::WorkerId; use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::common::PbWorkerNode; @@ -39,15 +41,13 @@ use risingwave_pb::stream_plan::{ DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation, }; -use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::warn; use super::info::{CommandFragmentChanges, InflightStreamingJobInfo}; use crate::barrier::info::BarrierInfo; -use crate::barrier::utils::collect_resp_info; use crate::barrier::InflightSubscriptionInfo; use crate::controller::fragment::InflightFragmentInfo; -use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; +use crate::hummock::NewTableFragmentInfo; use crate::manager::{DdlType, StreamingJob}; use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; @@ -505,16 +505,16 @@ impl CommandContext { Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64) } - pub(super) fn collect_commit_epoch_info( + pub(super) fn collect_extra_commit_epoch_info( &self, - info: &mut CommitEpochInfo, - resps: Vec, + synced_ssts: &Vec, + old_value_ssts: &Vec, backfill_pinned_log_epoch: HashMap)>, + tables_to_commit: &mut HashMap, + new_table_fragment_infos: &mut Vec, + change_log_delta: &mut HashMap, ) { - let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = - collect_resp_info(resps); - - let new_table_fragment_infos = if let Some(Command::CreateStreamingJob { info, job_type }) = + let new_table_fragment_info = if let Some(Command::CreateStreamingJob { info, job_type }) = &self.command && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) { @@ -528,9 +528,9 @@ impl CommandContext { table_ids.insert(TableId::new(mv_table_id)); } - vec![NewTableFragmentInfo { table_ids }] + Some(NewTableFragmentInfo { table_ids }) } else { - vec![] + None }; let mut mv_log_store_truncate_epoch = HashMap::new(); @@ -564,7 +564,7 @@ impl CommandContext { } let table_new_change_log = build_table_change_log_delta( - old_value_ssts.into_iter(), + old_value_ssts.iter(), synced_ssts.iter().map(|sst| &sst.sst_info), must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs), mv_log_store_truncate_epoch.into_iter(), @@ -572,17 +572,13 @@ impl CommandContext { let epoch = self.barrier_info.prev_epoch(); for table_id in &self.table_ids_to_commit { - info.tables_to_commit + tables_to_commit .try_insert(*table_id, epoch) .expect("non duplicate"); } - info.sstables.extend(synced_ssts); - info.new_table_watermarks.extend(new_table_watermarks); - info.sst_to_context.extend(sst_to_context); - info.new_table_fragment_infos - .extend(new_table_fragment_infos); - info.change_log_delta.extend(table_new_change_log); + new_table_fragment_infos.extend(new_table_fragment_info); + change_log_delta.extend(table_new_change_log); } } diff --git a/src/meta/src/barrier/complete_task.rs b/src/meta/src/barrier/complete_task.rs index 85fb76b1941bb..cbb3a054097ee 100644 --- a/src/meta/src/barrier/complete_task.rs +++ b/src/meta/src/barrier/complete_task.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::future::{pending, Future}; use std::mem::replace; use std::sync::Arc; @@ -22,27 +22,28 @@ use futures::future::try_join_all; use prometheus::HistogramTimer; use risingwave_common::catalog::{DatabaseId, TableId}; use risingwave_common::must_match; +use risingwave_meta_model::WorkerId; use risingwave_pb::hummock::HummockVersionStats; +use risingwave_pb::stream_service::BarrierCompleteResponse; use tokio::task::JoinHandle; -use crate::barrier::checkpoint::CheckpointControl; use crate::barrier::command::CommandContext; use crate::barrier::context::GlobalBarrierWorkerContext; use crate::barrier::notifier::Notifier; use crate::barrier::progress::TrackingJob; use crate::barrier::rpc::ControlStreamManager; -use crate::barrier::schedule::PeriodicBarriers; -use crate::hummock::CommitEpochInfo; +use crate::barrier::utils::collect_resp_info; +use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use crate::manager::MetaSrvEnv; use crate::rpc::metrics::GLOBAL_META_METRICS; use crate::{MetaError, MetaResult}; -pub(super) enum CompletingTask { +pub(super) enum CommittingTask { None, - Completing { + Committing { #[expect(clippy::type_complexity)] - /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)]) - epochs_to_ack: HashMap, Vec<(TableId, u64)>)>, + /// `database_id` -> (`Some(database_graph_committed_epoch)`, vec(`creating_job_id`, `creating_job_committed_epoch`, `is_finished`)]) + epochs_to_ack: HashMap, Vec<(TableId, u64, bool)>)>, // The join handle of a spawned task that completes the barrier. // The return value indicate whether there is some create streaming job command @@ -53,26 +54,43 @@ pub(super) enum CompletingTask { Err(MetaError), } +#[derive(Debug)] +pub(super) struct DatabaseCompleteBarrierTask { + pub(super) command: CommandContext, + pub(super) enqueue_time: HistogramTimer, + pub(super) backfill_pinned_upstream_log_epoch: HashMap)>, + pub(super) workers: HashSet, +} + +#[derive(Debug)] +pub(super) struct CreatingJobCompleteBarrierTask { + pub(super) job_id: TableId, + pub(super) epoch: u64, + pub(super) is_first_commit: bool, + pub(super) tables_to_commit: HashSet, + pub(super) workers: HashSet, + pub(super) is_finished: bool, +} + #[derive(Default)] pub(super) struct CompleteBarrierTask { - pub(super) commit_info: CommitEpochInfo, pub(super) finished_jobs: Vec, pub(super) notifiers: Vec, - /// `database_id` -> (Some((`command_ctx`, `enqueue_time`)), vec!((`creating_job_id`, `epoch`))) - #[expect(clippy::type_complexity)] - pub(super) epoch_infos: HashMap< + pub(super) tasks: HashMap< DatabaseId, ( - Option<(CommandContext, HistogramTimer)>, - Vec<(TableId, u64)>, + Option, + Vec, ), >, } impl CompleteBarrierTask { #[expect(clippy::type_complexity)] - pub(super) fn epochs_to_ack(&self) -> HashMap, Vec<(TableId, u64)>)> { - self.epoch_infos + pub(super) fn epochs_to_ack( + &self, + ) -> HashMap, Vec<(TableId, u64, bool)>)> { + self.tasks .iter() .map(|(database_id, (command_context, creating_job_epochs))| { ( @@ -80,18 +98,46 @@ impl CompleteBarrierTask { ( command_context .as_ref() - .map(|(command, _)| command.barrier_info.prev_epoch.value().0), - creating_job_epochs.clone(), + .map(|task| task.command.barrier_info.prev_epoch.value().0), + creating_job_epochs + .iter() + .map(|task| (task.job_id, task.epoch, task.is_finished)) + .collect(), ), ) }) .collect() } + + fn graph_to_complete( + &self, + ) -> impl Iterator, &'_ HashSet, u64)> + '_ { + self.tasks + .iter() + .flat_map(|(database_id, (database, creating_jobs))| { + database + .iter() + .map(|database| { + ( + *database_id, + None, + &database.workers, + database.command.barrier_info.prev_epoch(), + ) + }) + .chain( + creating_jobs.iter().map(|task| { + (*database_id, Some(task.job_id), &task.workers, task.epoch) + }), + ) + }) + } } -impl CompleteBarrierTask { - pub(super) async fn complete_barrier( - self, +impl CommittingTask { + pub(super) async fn commit_barrier( + task: CompleteBarrierTask, + commit_info: CommitEpochInfo, context: &impl GlobalBarrierWorkerContext, env: MetaSrvEnv, ) -> MetaResult { @@ -99,11 +145,11 @@ impl CompleteBarrierTask { let wait_commit_timer = GLOBAL_META_METRICS .barrier_wait_commit_latency .start_timer(); - let version_stats = context.commit_epoch(self.commit_info).await?; - for command_ctx in self - .epoch_infos + let version_stats = context.commit_epoch(commit_info).await?; + for command_ctx in task + .tasks .values() - .flat_map(|(command, _)| command.as_ref().map(|(command, _)| command)) + .flat_map(|(command, _)| command.as_ref().map(|task| &task.command)) { context.post_collect_command(command_ctx).await?; } @@ -116,40 +162,34 @@ impl CompleteBarrierTask { let version_stats = match result { Ok(version_stats) => version_stats, Err(e) => { - for notifier in self.notifiers { + for notifier in task.notifiers { notifier.notify_collection_failed(e.clone()); } return Err(e); } }; - self.notifiers.into_iter().for_each(|notifier| { + task.notifiers.into_iter().for_each(|notifier| { notifier.notify_collected(); }); try_join_all( - self.finished_jobs + task.finished_jobs .into_iter() .map(|finished_job| context.finish_creating_job(finished_job)), ) .await?; - for (command_ctx, enqueue_time) in self - .epoch_infos - .into_values() - .flat_map(|(command_context, _)| command_context) - { - let duration_sec = enqueue_time.stop_and_record(); - Self::report_complete_event(&env, duration_sec, &command_ctx); + for task in task.tasks.into_values().flat_map(|(task, _)| task) { + let duration_sec = task.enqueue_time.stop_and_record(); + Self::report_complete_event(&env, duration_sec, &task.command); GLOBAL_META_METRICS .last_committed_barrier_time - .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64); + .set(task.command.barrier_info.curr_epoch.value().as_unix_secs() as i64); } version_stats }; Ok(version_stats) } -} -impl CompleteBarrierTask { fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) { // Record barrier latency in event log. use risingwave_pb::meta::event_log; @@ -169,47 +209,156 @@ impl CompleteBarrierTask { } } -pub(super) struct BarrierCompleteOutput { +pub(super) struct BarrierCommitOutput { #[expect(clippy::type_complexity)] - /// `database_id` -> (`Some(database_graph_committed_epoch)`, [(`creating_job_id`, `creating_job_committed_epoch`)]) - pub epochs_to_ack: HashMap, Vec<(TableId, u64)>)>, + /// `database_id` -> (`Some(database_graph_committed_epoch)`, vec(`creating_job_id`, `creating_job_committed_epoch`, `is_finished`)]) + pub epochs_to_ack: HashMap, Vec<(TableId, u64, bool)>)>, pub hummock_version_stats: HummockVersionStats, } +pub(super) struct CompletingTask { + node_to_collect: HashSet, + collected_resps: HashMap, + task: CompleteBarrierTask, +} + impl CompletingTask { - pub(super) fn next_completed_barrier<'a>( - &'a mut self, - scheduled_barriers: &mut PeriodicBarriers, - checkpoint_control: &mut CheckpointControl, + fn new( + task_id: u64, + task: CompleteBarrierTask, + control_stream_manager: &mut ControlStreamManager, + ) -> MetaResult { + let node_to_collect = + control_stream_manager.complete_barrier(task_id, task.graph_to_complete())?; + Ok(Self { + node_to_collect, + collected_resps: HashMap::new(), + task, + }) + } + + fn is_completed(&self) -> bool { + self.node_to_collect.is_empty() + } + + fn into_commit_info(self) -> (CommitEpochInfo, CompleteBarrierTask) { + assert!(self.node_to_collect.is_empty()); + let (mut commit_info, old_value_ssts) = collect_resp_info(self.collected_resps); + for (database_task, creating_jobs) in self.task.tasks.values() { + if let Some(task) = database_task { + task.command.collect_extra_commit_epoch_info( + &commit_info.sstables, + &old_value_ssts, + task.backfill_pinned_upstream_log_epoch.clone(), + &mut commit_info.tables_to_commit, + &mut commit_info.new_table_fragment_infos, + &mut commit_info.change_log_delta, + ) + } + for task in creating_jobs { + task.tables_to_commit.iter().for_each(|table_id| { + commit_info + .tables_to_commit + .try_insert(*table_id, task.epoch) + .expect("non duplicate"); + }); + if task.is_first_commit { + commit_info + .new_table_fragment_infos + .push(NewTableFragmentInfo { + table_ids: task.tables_to_commit.clone(), + }); + }; + } + } + (commit_info, self.task) + } +} + +pub(super) struct CompletingTasks { + next_task_id: u64, + tasks: BTreeMap, +} + +impl CompletingTasks { + pub(super) fn new() -> Self { + Self { + next_task_id: 0, + tasks: Default::default(), + } + } + + pub(super) fn push( + &mut self, + task: CompleteBarrierTask, control_stream_manager: &mut ControlStreamManager, + ) -> MetaResult<()> { + let task_id = self.next_task_id; + self.next_task_id += 1; + let task = CompletingTask::new(task_id, task, control_stream_manager)?; + self.tasks.insert(task_id, task); + Ok(()) + } + + pub(super) fn next_completed_task(&mut self) -> Option<(CommitEpochInfo, CompleteBarrierTask)> { + if let Some((_, task)) = self.tasks.first_key_value() + && task.is_completed() + { + let (_, task) = self.tasks.pop_first().expect("non-empty"); + Some(task.into_commit_info()) + } else { + None + } + } + + pub(super) fn on_barrier_complete_resp( + &mut self, + worker_id: WorkerId, + resp: BarrierCompleteResponse, + ) { + let task = self.tasks.get_mut(&resp.task_id).expect("should exist"); + assert!(task.node_to_collect.remove(&worker_id)); + task.collected_resps + .try_insert(worker_id, resp) + .expect("non-duplicate"); + } + + pub(super) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool { + self.tasks + .values() + .any(|task| task.node_to_collect.contains(&worker_id)) + } +} + +impl CommittingTask { + pub(super) fn next_committed_barrier<'a>( + &'a mut self, + completing_tasks: &mut CompletingTasks, context: &Arc, env: &MetaSrvEnv, - ) -> impl Future> + 'a { + ) -> impl Future> + 'a { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. - if let CompletingTask::None = self { - if let Some(task) = checkpoint_control - .next_complete_barrier_task(Some((scheduled_barriers, control_stream_manager))) - { - { - let epochs_to_ack = task.epochs_to_ack(); - let context = context.clone(); - let env = env.clone(); - let join_handle = - tokio::spawn(async move { task.complete_barrier(&*context, env).await }); - *self = CompletingTask::Completing { - epochs_to_ack, - join_handle, - }; - } + if let CommittingTask::None = &self { + if let Some((commit_info, task)) = completing_tasks.next_completed_task() { + let epochs_to_ack = task.epochs_to_ack(); + let context = context.clone(); + let env = env.clone(); + let join_handle = tokio::spawn(async move { + CommittingTask::commit_barrier(task, commit_info, &*context, env).await + }); + *self = CommittingTask::Committing { + epochs_to_ack, + join_handle, + }; } } - self.next_completed_barrier_inner() + self.next_committed_barrier_inner() } - async fn next_completed_barrier_inner(&mut self) -> MetaResult { - let CompletingTask::Completing { join_handle, .. } = self else { + async fn next_committed_barrier_inner(&mut self) -> MetaResult { + let CommittingTask::Committing { join_handle, .. } = self else { return pending().await; }; @@ -222,19 +371,19 @@ impl CompletingTask { }; // It's important to reset the completing_command after await no matter the result is err // or not, and otherwise the join handle will be polled again after ready. - let next_completing_command_status = if let Err(e) = &join_result { - CompletingTask::Err(e.clone()) + let next_committing_task_status = if let Err(e) = &join_result { + CommittingTask::Err(e.clone()) } else { - CompletingTask::None + CommittingTask::None }; - let completed_command = replace(self, next_completing_command_status); + let committed_task = replace(self, next_committing_task_status); let hummock_version_stats = join_result?; - must_match!(completed_command, CompletingTask::Completing { + must_match!(committed_task, CommittingTask::Committing { epochs_to_ack, .. } => { - Ok(BarrierCompleteOutput { + Ok(BarrierCommitOutput { epochs_to_ack, hummock_version_stats, }) diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index aa676c01b3bdb..1a327e9ebe7f7 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -22,8 +22,8 @@ use risingwave_meta_model::ObjectId; use risingwave_pb::catalog::CreateType; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; -use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; -use risingwave_pb::stream_service::PbBarrierCompleteResponse; +use risingwave_pb::stream_service::barrier_collect_response::PbCreateMviewProgress; +use risingwave_pb::stream_service::PbBarrierCollectResponse; use crate::barrier::info::BarrierInfo; use crate::barrier::{ @@ -383,7 +383,7 @@ impl CreateMviewProgressTracker { &CreateStreamingJobCommandInfo, Option<&ReplaceStreamJobPlan>, )>, - create_mview_progress: impl IntoIterator, + create_mview_progress: impl IntoIterator, version_stats: &HummockVersionStats, ) { { @@ -425,7 +425,7 @@ impl CreateMviewProgressTracker { &mut self, command: Option<&Command>, barrier_info: &BarrierInfo, - resps: impl IntoIterator, + resps: impl IntoIterator, version_stats: &HummockVersionStats, ) -> Vec { let new_tracking_job_info = @@ -593,7 +593,7 @@ impl CreateMviewProgressTracker { /// If all actors in this MV have finished, returns the command. pub fn update( &mut self, - progress: &CreateMviewProgress, + progress: &PbCreateMviewProgress, version_stats: &HummockVersionStats, ) -> Option { tracing::trace!(?progress, "update progress"); diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index dfb9f1cc13d37..5c55035bd2704 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, HashSet}; use std::error::Error; use std::future::poll_fn; -use std::task::Poll; +use std::task::{Context, Poll}; use std::time::Duration; use anyhow::anyhow; @@ -34,11 +34,10 @@ use risingwave_pb::stream_service::streaming_control_stream_request::{ CreatePartialGraphRequest, PbInitRequest, PbInitialPartialGraph, RemovePartialGraphRequest, }; use risingwave_pb::stream_service::{ - streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse, + streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteRequest, InjectBarrierRequest, StreamingControlStreamRequest, }; use risingwave_rpc_client::StreamingControlHandle; -use rw_futures_util::pending_on_none; use thiserror_ext::AsReport; use tokio::time::{sleep, timeout}; use tokio_retry::strategy::ExponentialBackoff; @@ -170,20 +169,22 @@ impl ControlStreamManager { *self = Self::new(self.env.clone()); } - async fn next_response( + fn poll_next_response( &mut self, - ) -> Option<( + cx: &mut Context<'_>, + ) -> Poll<( WorkerId, MetaResult, )> { if self.nodes.is_empty() { - return None; + return Poll::Pending; } - let (worker_id, result) = poll_fn(|cx| { + let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending; + { for (worker_id, node) in &mut self.nodes { match node.handle.response_stream.poll_next_unpin(cx) { Poll::Ready(result) => { - return Poll::Ready(( + poll_result = Poll::Ready(( *worker_id, result .ok_or_else(|| anyhow!("end of stream").into()) @@ -204,47 +205,35 @@ impl ControlStreamManager { resp => Ok(resp), } }) - }), + }) )); + break; } Poll::Pending => { continue; } } } - Poll::Pending - }) - .await; + }; - if let Err(err) = &result { + if let Poll::Ready((worker_id, Err(err))) = &poll_result { let node = self .nodes - .remove(&worker_id) + .remove(worker_id) .expect("should exist when get shutdown resp"); warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); } - Some((worker_id, result)) + poll_result } - pub(super) async fn next_collect_barrier_response( + pub(super) async fn next_response( &mut self, - ) -> (WorkerId, MetaResult) { - use streaming_control_stream_response::Response; - - { - let (worker_id, result) = pending_on_none(self.next_response()).await; - - ( - worker_id, - result.map(|resp| match resp { - Response::CompleteBarrier(resp) => resp, - Response::Shutdown(_) | Response::Init(_) => { - unreachable!("should be treated as error") - } - }), - ) - } + ) -> ( + WorkerId, + MetaResult, + ) { + poll_fn(|cx| self.poll_next_response(cx)).await } pub(super) async fn collect_errors( @@ -255,14 +244,17 @@ impl ControlStreamManager { let mut errors = vec![(worker_id, first_err)]; #[cfg(not(madsim))] { - let _ = timeout(COLLECT_ERROR_TIMEOUT, async { - while let Some((worker_id, result)) = self.next_response().await { - if let Err(e) = result { - errors.push((worker_id, e)); + { + let _ = timeout(COLLECT_ERROR_TIMEOUT, async { + while !self.nodes.is_empty() { + let (worker_id, result) = self.next_response().await; + if let Err(e) = result { + errors.push((worker_id, e)); + } } - } - }) - .await; + }) + .await; + } } tracing::debug!(?errors, "collected stream errors"); errors @@ -446,6 +438,45 @@ impl ControlStreamManager { Ok(node_need_collect) } + pub(super) fn complete_barrier( + &mut self, + task_id: u64, + infos: impl Iterator, &HashSet, u64)>, + ) -> MetaResult> { + let mut workers = HashSet::new(); + let mut worker_request: HashMap<_, HashMap<_, _>> = HashMap::new(); + for (database_id, creating_job_id, workers, epoch) in infos { + let partial_graph_id = to_partial_graph_id(database_id, creating_job_id); + for worker_id in workers { + worker_request + .entry(*worker_id) + .or_default() + .try_insert(partial_graph_id, epoch) + .expect("non-duplicate"); + } + } + + worker_request + .into_iter() + .try_for_each::<_, Result<_, MetaError>>(|(worker_id, partial_graph_sync_epochs)| { + workers.insert(worker_id); + self.nodes + .get_mut(&worker_id) + .ok_or_else(|| anyhow!("unconnected node: {}", worker_id))? + .handle + .send_request(StreamingControlStreamRequest { + request: Some(streaming_control_stream_request::Request::CompleteBarrier( + BarrierCompleteRequest { + task_id, + partial_graph_sync_epochs, + }, + )), + })?; + Ok(()) + })?; + Ok(workers) + } + pub(super) fn add_partial_graph( &mut self, database_id: DatabaseId, diff --git a/src/meta/src/barrier/utils.rs b/src/meta/src/barrier/utils.rs index 7e49a0f9d49af..01180df0775a0 100644 --- a/src/meta/src/barrier/utils.rs +++ b/src/meta/src/barrier/utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -22,25 +22,20 @@ use risingwave_hummock_sdk::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, }; use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo}; +use risingwave_meta_model::WorkerId; use risingwave_pb::stream_service::BarrierCompleteResponse; -use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; +use crate::hummock::CommitEpochInfo; -#[expect(clippy::type_complexity)] pub(super) fn collect_resp_info( - resps: Vec, -) -> ( - HashMap, - Vec, - HashMap, - Vec, -) { + resps: HashMap, +) -> (CommitEpochInfo, Vec) { let mut sst_to_worker: HashMap = HashMap::new(); let mut synced_ssts: Vec = vec![]; let mut table_watermarks = Vec::with_capacity(resps.len()); let mut old_value_ssts = Vec::with_capacity(resps.len()); - for resp in resps { + for resp in resps.into_values() { let ssts_iter = resp.synced_sstables.into_iter().map(|local_sst| { let sst_info = local_sst.sst.expect("field not None"); sst_to_worker.insert(sst_info.object_id, resp.worker_id); @@ -56,51 +51,26 @@ pub(super) fn collect_resp_info( } ( - sst_to_worker, - synced_ssts, - merge_multiple_new_table_watermarks( - table_watermarks - .into_iter() - .map(|watermarks| { - watermarks - .into_iter() - .map(|(table_id, watermarks)| { - (TableId::new(table_id), TableWatermarks::from(&watermarks)) - }) - .collect() - }) - .collect_vec(), - ), + CommitEpochInfo { + sstables: synced_ssts, + new_table_watermarks: merge_multiple_new_table_watermarks( + table_watermarks + .into_iter() + .map(|watermarks| { + watermarks + .into_iter() + .map(|(table_id, watermarks)| { + (TableId::new(table_id), TableWatermarks::from(&watermarks)) + }) + .collect() + }) + .collect_vec(), + ), + sst_to_context: sst_to_worker, + new_table_fragment_infos: vec![], + change_log_delta: Default::default(), + tables_to_commit: Default::default(), + }, old_value_ssts, ) } - -pub(super) fn collect_creating_job_commit_epoch_info( - commit_info: &mut CommitEpochInfo, - epoch: u64, - resps: Vec, - tables_to_commit: impl Iterator, - is_first_time: bool, -) { - let (sst_to_context, sstables, new_table_watermarks, old_value_sst) = collect_resp_info(resps); - assert!(old_value_sst.is_empty()); - commit_info.sst_to_context.extend(sst_to_context); - commit_info.sstables.extend(sstables); - commit_info - .new_table_watermarks - .extend(new_table_watermarks); - let tables_to_commit: HashSet<_> = tables_to_commit.collect(); - tables_to_commit.iter().for_each(|table_id| { - commit_info - .tables_to_commit - .try_insert(*table_id, epoch) - .expect("non duplicate"); - }); - if is_first_time { - commit_info - .new_table_fragment_infos - .push(NewTableFragmentInfo { - table_ids: tables_to_commit, - }); - }; -} diff --git a/src/meta/src/barrier/worker.rs b/src/meta/src/barrier/worker.rs index e69acda8f4651..a9db6ab73fd95 100644 --- a/src/meta/src/barrier/worker.rs +++ b/src/meta/src/barrier/worker.rs @@ -17,6 +17,7 @@ use std::mem::replace; use std::sync::{Arc, LazyLock}; use std::time::Duration; +use anyhow::anyhow; use arc_swap::ArcSwap; use itertools::Itertools; use risingwave_common::system_param::reader::SystemParamsRead; @@ -27,6 +28,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{PausedReason, Recovery}; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::AddMutation; +use risingwave_pb::stream_service::streaming_control_stream_response::Response; use thiserror_ext::AsReport; use tokio::sync::mpsc; use tokio::sync::oneshot::{Receiver, Sender}; @@ -38,7 +40,7 @@ use tracing::{debug, error, info, warn, Instrument}; use crate::barrier::checkpoint::{ BarrierWorkerState, CheckpointControl, DatabaseCheckpointControl, }; -use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask}; +use crate::barrier::complete_task::{CommittingTask, CompletingTasks}; use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl}; use crate::barrier::info::BarrierInfo; use crate::barrier::progress::CreateMviewProgressTracker; @@ -86,8 +88,11 @@ pub(super) struct GlobalBarrierWorker { checkpoint_control: CheckpointControl, /// Command that has been collected but is still completing. - /// The join handle of the completing future is stored. - completing_task: CompletingTask, + completing_tasks: CompletingTasks, + + /// Command that has been completed but is still committing. + /// The join handle of the committing future is stored. + committing_task: CommittingTask, request_rx: mpsc::UnboundedReceiver, @@ -146,7 +151,8 @@ impl GlobalBarrierWorker { context, env, checkpoint_control: CheckpointControl::default(), - completing_task: CompletingTask::None, + completing_tasks: CompletingTasks::new(), + committing_task: CommittingTask::None, request_rx, active_streaming_nodes, sink_manager, @@ -245,6 +251,17 @@ impl GlobalBarrierWorker { // Start the event loop. loop { + if let Some(next_complete_barrier_task) = self + .checkpoint_control + .next_complete_barrier_task(&mut self.periodic_barriers) + { + if let Err(e) = self + .completing_tasks + .push(next_complete_barrier_task, &mut self.control_stream_manager) + { + self.failure_recovery(e).await; + } + } tokio::select! { biased; @@ -300,28 +317,39 @@ impl GlobalBarrierWorker { } } complete_result = self - .completing_task - .next_completed_barrier( - &mut self.periodic_barriers, - &mut self.checkpoint_control, - &mut self.control_stream_manager, + .committing_task + .next_committed_barrier( + &mut self.completing_tasks, &self.context, &self.env, ) => { match complete_result { Ok(output) => { - self.checkpoint_control.ack_completed(output); + self.checkpoint_control.ack_completed(output, &mut self.control_stream_manager); } Err(e) => { self.failure_recovery(e).await; } } }, - (worker_id, resp_result) = self.control_stream_manager.next_collect_barrier_response() => { - if let Err(e) = resp_result.and_then(|resp| self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)) { + (worker_id, resp_result) = self.control_stream_manager.next_response() => { + if let Err(e) = resp_result.and_then(|resp| { + match resp { + Response::CompleteBarrier(resp) => { + self.completing_tasks.on_barrier_complete_resp(worker_id, resp); + Ok(()) + }, + Response::CollectBarrier(resp) => { + self.checkpoint_control.barrier_collected(worker_id, resp, &mut self.control_stream_manager) + }, + other => { + Err(anyhow!("get expected response: {:?}", other).into()) + } + } + }) { { - if self.checkpoint_control.is_failed_at_worker_err(worker_id) { + if self.checkpoint_control.is_failed_at_worker_err(worker_id) || self.completing_tasks.is_failed_at_worker_err(worker_id) { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); self.report_collect_failure(&err); @@ -346,17 +374,18 @@ impl GlobalBarrierWorker { } } -impl GlobalBarrierWorker { - /// We need to make sure there are no changes when doing recovery - pub async fn clear_on_err(&mut self, err: &MetaError) { +// TODO: move this method to `complete_task.rs` and mark some structs and fields as private before merge +impl CommittingTask { + pub(super) async fn clear_on_err( + &mut self, + completing_tasks: &mut CompletingTasks, + context: &impl GlobalBarrierWorkerContext, + env: &MetaSrvEnv, + ) { // join spawned completing command to finish no matter it succeeds or not. - let is_err = match replace(&mut self.completing_task, CompletingTask::None) { - CompletingTask::None => false, - CompletingTask::Completing { - epochs_to_ack, - join_handle, - .. - } => { + let is_err = match replace(self, CommittingTask::None) { + CommittingTask::None => false, + CommittingTask::Committing { join_handle, .. } => { info!("waiting for completing command to finish in recovery"); match join_handle.await { Err(e) => { @@ -367,48 +396,38 @@ impl GlobalBarrierWorker { warn!(err = ?e.as_report(), "failed to complete barrier during clear"); true } - Ok(Ok(hummock_version_stats)) => { - self.checkpoint_control - .ack_completed(BarrierCompleteOutput { - epochs_to_ack, - hummock_version_stats, - }); - false - } + Ok(Ok(_)) => false, } } - CompletingTask::Err(_) => true, + CommittingTask::Err(_) => true, }; if !is_err { // continue to finish the pending collected barrier. - while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) { - let epochs_to_ack = task.epochs_to_ack(); - match task - .complete_barrier(&*self.context, self.env.clone()) - .await + while let Some((commit_info, task)) = completing_tasks.next_completed_task() { + if let Err(e) = + CommittingTask::commit_barrier(task, commit_info, context, env.clone()).await { - Ok(hummock_version_stats) => { - self.checkpoint_control - .ack_completed(BarrierCompleteOutput { - epochs_to_ack, - hummock_version_stats, - }); - } - Err(e) => { - error!( - err = ?e.as_report(), - "failed to complete barrier during recovery" - ); - break; - } + error!( + err = ?e.as_report(), + "failed to complete barrier during recovery" + ); + break; } } } - self.checkpoint_control.clear_on_err(err); } } impl GlobalBarrierWorker { + /// We need to make sure there are no changes when doing recovery + pub async fn clear_on_err(&mut self, err: &MetaError) { + self.committing_task + .clear_on_err(&mut self.completing_tasks, &*self.context, &self.env) + .await; + self.completing_tasks = CompletingTasks::new(); + self.checkpoint_control.clear_on_err(err); + } + /// Set barrier manager status. async fn failure_recovery(&mut self, err: MetaError) { self.clear_on_err(&err).await; @@ -645,9 +664,16 @@ impl GlobalBarrierWorker { debug!(?node_to_collect, "inject initial barrier"); while !node_to_collect.is_empty() { let (worker_id, result) = - control_stream_manager.next_collect_barrier_response().await; + control_stream_manager.next_response().await; let resp = result?; - assert_eq!(resp.epoch, barrier_info.prev_epoch()); + match resp { + Response::CollectBarrier(resp) => { + assert_eq!(resp.epoch, barrier_info.prev_epoch()); + } + other => { + return Err(anyhow!("expect Response::CollectBarrier but get {:?}", other).into()); + } + } assert!(node_to_collect.remove(&worker_id)); } debug!("collected initial barrier"); diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 777509d53ed88..688dd615f108f 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -198,10 +198,7 @@ impl HummockMetaClient for MockHummockMetaClient { BTreeSet::new() }; let table_change_log = build_table_change_log_delta( - sync_result - .old_value_ssts - .into_iter() - .map(|sst| sst.sst_info), + sync_result.old_value_ssts.iter().map(|sst| &sst.sst_info), sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info), &vec![epoch], table_change_log_table_ids diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index cf3ded58b946e..0777beab46651 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -149,7 +149,7 @@ where } pub fn build_table_change_log_delta<'a>( - old_value_ssts: impl Iterator, + old_value_ssts: impl Iterator, new_value_ssts: impl Iterator, epochs: &Vec, log_store_table_ids: impl Iterator, diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index fec0d74ab6d5f..22759c8fb51ca 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -26,10 +26,7 @@ use futures::stream::{BoxStream, FuturesOrdered}; use futures::{FutureExt, StreamExt, TryFutureExt}; use itertools::Itertools; use risingwave_common::error::tonic::extra::Score; -use risingwave_pb::stream_plan::barrier::BarrierKind; -use risingwave_pb::stream_service::barrier_complete_response::{ - PbCreateMviewProgress, PbLocalSstableInfo, -}; +use risingwave_pb::stream_service::barrier_complete_response::PbLocalSstableInfo; use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper}; use thiserror_ext::AsReport; use tokio::select; @@ -53,7 +50,7 @@ pub use progress::CreateMviewProgressReporter; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; -use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult}; +use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo, SyncResult}; use risingwave_pb::stream_service::streaming_control_stream_request::{ InitRequest, InitialPartialGraph, Request, }; @@ -61,8 +58,8 @@ use risingwave_pb::stream_service::streaming_control_stream_response::{ InitResponse, ShutdownResponse, }; use risingwave_pb::stream_service::{ - streaming_control_stream_response, BarrierCompleteResponse, InjectBarrierRequest, - StreamingControlStreamRequest, StreamingControlStreamResponse, + streaming_control_stream_response, BarrierCollectResponse, BarrierCompleteResponse, + InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse, }; use crate::executor::exchange::permit::Receiver; @@ -80,11 +77,9 @@ pub const ENABLE_BARRIER_AGGREGATION: bool = false; /// Collect result of some barrier on current compute node. Will be reported to the meta service. #[derive(Debug)] pub struct BarrierCompleteResult { + task_id: u64, /// The result returned from `sync` of `StateStore`. - pub sync_result: Option, - - /// The updated creation progress of materialized view after this barrier. - pub create_mview_progress: Vec, + pub sync_result: SyncResult, } pub(super) struct ControlStreamHandle { @@ -322,13 +317,23 @@ impl LocalBarrierWorker { loop { select! { biased; - (partial_graph_id, barrier) = self.state.next_collected_epoch() => { - self.complete_barrier(partial_graph_id, barrier.epoch.prev); + (partial_graph_id, barrier, create_mview_progress) = self.state.next_collected_epoch() => { + self.control_stream_handle.send_response(StreamingControlStreamResponse { + response: Some( + streaming_control_stream_response::Response::CollectBarrier( + BarrierCollectResponse { + partial_graph_id: partial_graph_id.0, + epoch: barrier.epoch.prev, + create_mview_progress, + }, + ), + ), + }); } - (partial_graph_id, barrier, result) = rw_futures_util::pending_on_none(self.await_epoch_completed_futures.next()) => { + result = rw_futures_util::pending_on_none(self.await_epoch_completed_futures.next()) => { match result { Ok(result) => { - self.on_epoch_completed(partial_graph_id, barrier.epoch.prev, result); + self.on_epoch_completed(result); } Err(err) => { self.notify_other_failure(err, "failed to complete epoch").await; @@ -396,6 +401,17 @@ impl LocalBarrierWorker { self.send_barrier(&barrier, req)?; Ok(()) } + Request::CompleteBarrier(req) => { + self.complete_barrier( + req.task_id, + req.partial_graph_sync_epochs + .iter() + .map(|(partial_graph_id, epoch)| { + (PartialGraphId::new(*partial_graph_id), *epoch) + }), + ); + Ok(()) + } Request::RemovePartialGraph(req) => { self.remove_partial_graphs( req.partial_graph_ids.into_iter().map(PartialGraphId::new), @@ -473,48 +489,30 @@ mod await_epoch_completed_future { use futures::future::BoxFuture; use futures::FutureExt; use risingwave_hummock_sdk::SyncResult; - use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; use crate::error::StreamResult; - use crate::executor::Barrier; use crate::task::{await_tree_key, BarrierCompleteResult, PartialGraphId}; - pub(super) type AwaitEpochCompletedFuture = impl Future)> - + 'static; + pub(super) type AwaitEpochCompletedFuture = + impl Future> + 'static; pub(super) fn instrument_complete_barrier_future( - partial_graph_id: PartialGraphId, - complete_barrier_future: Option>>, - barrier: Barrier, + task_id: u64, + complete_barrier_future: BoxFuture<'static, StreamResult>, + sync_graph_epochs: Vec<(PartialGraphId, u64)>, barrier_await_tree_reg: Option<&await_tree::Registry>, - create_mview_progress: Vec, ) -> AwaitEpochCompletedFuture { - let prev_epoch = barrier.epoch.prev; - let future = async move { - if let Some(future) = complete_barrier_future { - let result = future.await; - result.map(Some) - } else { - Ok(None) - } - } - .map(move |result| { - ( - partial_graph_id, - barrier, - result.map(|sync_result| BarrierCompleteResult { - sync_result, - create_mview_progress, - }), - ) + let future = complete_barrier_future.map(move |result| { + result.map(|sync_result| BarrierCompleteResult { + task_id, + sync_result, + }) }); if let Some(reg) = barrier_await_tree_reg { - reg.register( - await_tree_key::BarrierAwait { prev_epoch }, - format!("SyncEpoch({})", prev_epoch), - ) - .instrument(future) - .left_future() + let span = format!("SyncEpoch({:?})", sync_graph_epochs); + reg.register(await_tree_key::BarrierAwait { sync_graph_epochs }, span) + .instrument(future) + .left_future() } else { future.right_future() } @@ -528,26 +526,26 @@ use risingwave_storage::StateStoreImpl; fn sync_epoch( state_store: &StateStoreImpl, streaming_metrics: &StreamingMetrics, - prev_epoch: u64, - table_ids: HashSet, + sync_table_epochs: Vec<(HummockEpoch, HashSet)>, ) -> BoxFuture<'static, StreamResult> { let timer = streaming_metrics.barrier_sync_latency.start_timer(); let hummock = state_store.as_hummock().cloned(); + let sync_table_epochs_clone = sync_table_epochs.clone(); let future = async move { if let Some(hummock) = hummock { - hummock.sync(vec![(prev_epoch, table_ids)]).await + hummock.sync(sync_table_epochs_clone).await } else { Ok(SyncResult::default()) } }; future - .instrument_await(format!("sync_epoch (epoch {})", prev_epoch)) + .instrument_await(format!("sync_epoch (epoch {:?})", sync_table_epochs)) .inspect_ok(move |_| { timer.observe_duration(); }) .map_err(move |e| { tracing::error!( - prev_epoch, + ?sync_table_epochs, error = %e.as_report(), "Failed to sync state store", ); @@ -557,73 +555,56 @@ fn sync_epoch( } impl LocalBarrierWorker { - fn complete_barrier(&mut self, partial_graph_id: PartialGraphId, prev_epoch: u64) { + fn complete_barrier( + &mut self, + task_id: u64, + sync_graph_epochs: impl Iterator, + ) { + let sync_graph_epochs = sync_graph_epochs.collect_vec(); { - let (barrier, table_ids, create_mview_progress) = self - .state - .pop_barrier_to_complete(partial_graph_id, prev_epoch); - - let complete_barrier_future = match &barrier.kind { - BarrierKind::Unspecified => unreachable!(), - BarrierKind::Initial => { - tracing::info!( - epoch = prev_epoch, - "ignore sealing data for the first barrier" - ); - tracing::info!(?prev_epoch, "ignored syncing data for the first barrier"); - None - } - BarrierKind::Barrier => None, - BarrierKind::Checkpoint => Some(sync_epoch( - &self.actor_manager.env.state_store(), - &self.actor_manager.streaming_metrics, - prev_epoch, - table_ids.expect("should be Some on BarrierKind::Checkpoint"), - )), - }; + let complete_barrier_future = sync_epoch( + &self.actor_manager.env.state_store(), + &self.actor_manager.streaming_metrics, + sync_graph_epochs + .iter() + .map(|(partial_graph_id, prev_epoch)| { + let (barrier, table_ids) = self + .state + .pop_barrier_to_complete(*partial_graph_id, *prev_epoch); + assert!(barrier.kind.is_checkpoint()); + (barrier.epoch.prev, table_ids) + }) + .collect_vec(), + ); self.await_epoch_completed_futures.push_back({ instrument_complete_barrier_future( - partial_graph_id, + task_id, complete_barrier_future, - barrier, + sync_graph_epochs, self.actor_manager.await_tree_reg.as_ref(), - create_mview_progress, ) }); } } - fn on_epoch_completed( - &mut self, - partial_graph_id: PartialGraphId, - epoch: u64, - result: BarrierCompleteResult, - ) { + fn on_epoch_completed(&mut self, result: BarrierCompleteResult) { let BarrierCompleteResult { - create_mview_progress, + task_id, sync_result, } = result; - let (synced_sstables, table_watermarks, old_value_ssts) = sync_result - .map(|sync_result| { - ( - sync_result.uncommitted_ssts, - sync_result.table_watermarks, - sync_result.old_value_ssts, - ) - }) - .unwrap_or_default(); + let (synced_sstables, table_watermarks, old_value_ssts) = ( + sync_result.uncommitted_ssts, + sync_result.table_watermarks, + sync_result.old_value_ssts, + ); let result = StreamingControlStreamResponse { response: Some( streaming_control_stream_response::Response::CompleteBarrier( BarrierCompleteResponse { - request_id: "todo".to_string(), - partial_graph_id: partial_graph_id.into(), - epoch, - status: None, - create_mview_progress, + task_id, synced_sstables: synced_sstables .into_iter() .map( diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index bd5c92570f13d..d3ea6c8ecb97a 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::cell::LazyCell; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; @@ -56,7 +57,7 @@ enum ManagedBarrierStateInner { Issued(IssuedState), /// The barrier has been collected by all remaining actors - AllCollected(Vec), + AwaitComplete, } #[derive(Debug)] @@ -69,7 +70,7 @@ struct BarrierState { use risingwave_common::must_match; use risingwave_pb::stream_plan::SubscriptionUpstreamInfo; -use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; +use risingwave_pb::stream_service::barrier_collect_response::PbCreateMviewProgress; use risingwave_pb::stream_service::streaming_control_stream_request::InitialPartialGraph; use risingwave_pb::stream_service::InjectBarrierRequest; @@ -129,7 +130,7 @@ impl Display for &'_ PartialGraphManagedBarrierState { } write!(f, "]")?; } - ManagedBarrierStateInner::AllCollected(_) => { + ManagedBarrierStateInner::AwaitComplete => { write!(f, "AllCollected")?; } } @@ -569,15 +570,16 @@ impl ManagedBarrierState { pub(super) fn next_collected_epoch( &mut self, - ) -> impl Future + '_ { + ) -> impl Future)> + '_ { poll_fn(|_| { let mut output = None; for (partial_graph_id, graph_state) in &mut self.graph_states { - if let Some(barrier) = graph_state.may_have_collected_all() { + if let Some((barrier, create_mview_progress)) = graph_state.may_have_collected_all() + { if let Some(actors_to_stop) = barrier.all_stop_actors() { self.current_shared_context.drop_actors(actors_to_stop); } - output = Some((*partial_graph_id, barrier)); + output = Some((*partial_graph_id, barrier, create_mview_progress)); break; } } @@ -610,11 +612,7 @@ impl ManagedBarrierState { &mut self, partial_graph_id: PartialGraphId, prev_epoch: u64, - ) -> ( - Barrier, - Option>, - Vec, - ) { + ) -> (Barrier, HashSet) { self.graph_states .get_mut(&partial_graph_id) .expect("should exist") @@ -626,13 +624,13 @@ impl PartialGraphManagedBarrierState { /// This method is called when barrier state is modified in either `Issued` or `Stashed` /// to transform the state to `AllCollected` and start state store `sync` when the barrier /// has been collected from all actors for an `Issued` barrier. - fn may_have_collected_all(&mut self) -> Option { - for barrier_state in self.epoch_barrier_state_map.values_mut() { + fn may_have_collected_all(&mut self) -> Option<(Barrier, Vec)> { + for (prev_epoch, barrier_state) in &mut self.epoch_barrier_state_map { match &barrier_state.inner { ManagedBarrierStateInner::Issued(IssuedState { remaining_actors, .. }) if remaining_actors.is_empty() => {} - ManagedBarrierStateInner::AllCollected(_) => { + ManagedBarrierStateInner::AwaitComplete => { continue; } ManagedBarrierStateInner::Issued(_) => { @@ -650,10 +648,27 @@ impl PartialGraphManagedBarrierState { .map(|(actor, state)| state.to_pb(actor)) .collect(); - let prev_state = replace( - &mut barrier_state.inner, - ManagedBarrierStateInner::AllCollected(create_mview_progress), - ); + let (barrier, prev_state) = match &barrier_state.barrier.kind { + BarrierKind::Unspecified => { + unreachable!() + } + BarrierKind::Initial | BarrierKind::Barrier => { + let prev_epoch = *prev_epoch; + // non-checkpoint barrier no need to further complete + let state = self + .epoch_barrier_state_map + .remove(&prev_epoch) + .expect("should exist"); + (state.barrier, state.inner) + } + BarrierKind::Checkpoint => ( + barrier_state.barrier.clone(), + replace( + &mut barrier_state.inner, + ManagedBarrierStateInner::AwaitComplete, + ), + ), + }; must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState { barrier_inflight_latency: timer, @@ -662,33 +677,30 @@ impl PartialGraphManagedBarrierState { timer.observe_duration(); }); - return Some(barrier_state.barrier.clone()); + return Some((barrier, create_mview_progress)); } None } - fn pop_barrier_to_complete( - &mut self, - prev_epoch: u64, - ) -> ( - Barrier, - Option>, - Vec, - ) { + fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> (Barrier, HashSet) { let (popped_prev_epoch, barrier_state) = self .epoch_barrier_state_map .pop_first() .expect("should exist"); - assert_eq!(prev_epoch, popped_prev_epoch); + assert_eq!( + prev_epoch, popped_prev_epoch, + "barrier_state: {:?}", + barrier_state + ); + assert!(barrier_state.barrier.kind.is_checkpoint()); - let create_mview_progress = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected(create_mview_progress) => { - create_mview_progress - }); + assert_matches!(barrier_state.inner, ManagedBarrierStateInner::AwaitComplete); ( barrier_state.barrier, - barrier_state.table_ids, - create_mview_progress, + barrier_state + .table_ids + .expect("should be Some on checkpoint barrier"), ) } } @@ -820,7 +832,7 @@ impl PartialGraphManagedBarrierState { #[cfg(test)] async fn pop_next_completed_epoch(&mut self) -> u64 { - if let Some(barrier) = self.may_have_collected_all() { + if let Some((barrier, _)) = self.may_have_collected_all() { self.pop_barrier_to_complete(barrier.epoch.prev); return barrier.epoch.prev; } diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index c860b8f430fa1..6534217517aec 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -16,7 +16,7 @@ use std::assert_matches::assert_matches; use std::fmt::{Display, Formatter}; use risingwave_common::util::epoch::EpochPair; -use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; +use risingwave_pb::stream_service::barrier_collect_response::PbCreateMviewProgress; use super::LocalBarrierManager; use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress; diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index a9ba0b4b7ed01..6a9a3689c81df 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -69,7 +69,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); let resp = resp.response.unwrap(); match resp { - streaming_control_stream_response::Response::CompleteBarrier(_complete_barrier) => {} + streaming_control_stream_response::Response::CollectBarrier(_) => {} _ => unreachable!(), } })); @@ -151,7 +151,7 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); let resp = resp.response.unwrap(); match resp { - streaming_control_stream_response::Response::CompleteBarrier(_complete_barrier) => {} + streaming_control_stream_response::Response::CollectBarrier(_) => {} _ => unreachable!(), } })); @@ -228,8 +228,8 @@ async fn test_late_register_barrier_sender() -> StreamResult<()> { let resp = test_env.response_rx.recv().await.unwrap().unwrap(); match resp.response.unwrap() { - streaming_control_stream_response::Response::CompleteBarrier(complete_barrier) => { - assert_eq!(complete_barrier.epoch, barrier1.epoch.prev); + streaming_control_stream_response::Response::CollectBarrier(resp) => { + assert_eq!(resp.epoch, barrier1.epoch.prev); } _ => unreachable!(), } @@ -240,8 +240,8 @@ async fn test_late_register_barrier_sender() -> StreamResult<()> { let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); let resp = resp.response.unwrap(); match resp { - streaming_control_stream_response::Response::CompleteBarrier(complete_barrier) => { - assert_eq!(complete_barrier.epoch, barrier2.epoch.prev); + streaming_control_stream_response::Response::CollectBarrier(resp) => { + assert_eq!(resp.epoch, barrier2.epoch.prev); } _ => unreachable!(), } diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index 39da5e0b4ed93..4e34469bc774f 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -40,7 +40,7 @@ pub type UpDownActorIds = (ActorId, ActorId); pub type UpDownFragmentIds = (FragmentId, FragmentId); #[derive(Hash, Eq, PartialEq, Copy, Clone, Debug)] -struct PartialGraphId(u64); +pub struct PartialGraphId(u64); impl PartialGraphId { fn new(id: u64) -> Self { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 648afb81ebc8d..52306076c829f 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -75,14 +75,16 @@ pub type ActorHandle = JoinHandle<()>; pub type AtomicU64Ref = Arc; pub mod await_tree_key { + use crate::task::PartialGraphId; + /// Await-tree key type for actors. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Actor(pub crate::task::ActorId); /// Await-tree key type for barriers. - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BarrierAwait { - pub prev_epoch: u64, + pub sync_graph_epochs: Vec<(PartialGraphId, u64)>, } }