From 9f8f533fc3b02bcaa3000bca447da7a6583d7496 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 25 Mar 2024 13:54:19 +0800 Subject: [PATCH] fix cdc table stuck. due to track id --- proto/stream_plan.proto | 1 + src/meta/src/barrier/progress.rs | 20 ++++++++++++++++++++ src/meta/src/barrier/schedule.rs | 11 +++++++++-- src/meta/src/manager/catalog/database.rs | 2 +- src/meta/src/model/stream.rs | 7 +++++-- src/meta/src/stream/stream_manager.rs | 5 +++++ 6 files changed, 41 insertions(+), 5 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 0d3f8aa16613f..b4393153b57a8 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -876,6 +876,7 @@ enum FragmentTypeFlag { FRAGMENT_TYPE_FLAG_MVIEW = 2; FRAGMENT_TYPE_FLAG_SINK = 4; FRAGMENT_TYPE_FLAG_NOW = 8; // TODO: Remove this and insert a `BarrierRecv` instead. + // Include StreamScan and StreamCdcScan FRAGMENT_TYPE_FLAG_STREAM_SCAN = 16; FRAGMENT_TYPE_FLAG_BARRIER_RECV = 32; FRAGMENT_TYPE_FLAG_VALUES = 64; diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 1374e2e0a736b..a33380ea1cdd5 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -238,6 +238,25 @@ impl TrackingJob { } } +impl std::fmt::Debug for TrackingJob { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TrackingJob::New(command) => write!( + f, + "TrackingJob::New({:?})", + command.context.table_to_create() + ), + TrackingJob::Recovered(recovered) => { + write!( + f, + "TrackingJob::Recovered({:?})", + recovered.fragments.table_id() + ) + } + } + } +} + pub struct RecoveredTrackingJob { pub fragments: TableFragments, pub finished: Notifier, @@ -367,6 +386,7 @@ impl CreateMviewProgressTracker { /// /// Returns whether there are still remaining stashed jobs to finish. pub(super) async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { + tracing::trace!(finished_jobs=?self.finished_jobs, progress_map=?self.progress_map, "finishing jobs"); for job in self .finished_jobs .extract_if(|job| checkpoint || !job.is_checkpoint_required()) diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 662a1006baae9..596a8bfd15d60 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -279,15 +279,18 @@ impl BarrierScheduler { for (injected_rx, collect_rx, finish_rx) in contexts { // Wait for this command to be injected, and record the result. + tracing::trace!("waiting for injected_rx"); let info = injected_rx.await.ok().context("failed to inject barrier")?; infos.push(info); + tracing::trace!("waiting for collect_rx"); // Throw the error if it occurs when collecting this barrier. collect_rx .await .ok() .context("failed to collect barrier")??; + tracing::trace!("waiting for finish_rx"); // Wait for this command to be finished. finish_rx.await.ok().context("failed to finish command")??; } @@ -316,9 +319,13 @@ impl BarrierScheduler { /// /// Returns the barrier info of the actual command. pub async fn run_command(&self, command: Command) -> MetaResult { - self.run_multiple_commands(vec![command]) + tracing::trace!("run_command: {:?}", command); + let ret = self + .run_multiple_commands(vec![command]) .await - .map(|i| i[0]) + .map(|i| i[0]); + tracing::trace!("run_command finished"); + ret } /// Flush means waiting for the next barrier to collect. diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index fee5eacc38471..e6c2da61018b1 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -227,7 +227,7 @@ impl DatabaseManager { && x.name.eq(&relation_key.2) }) { if t.stream_job_status == StreamJobStatus::Creating as i32 { - bail!("table is in creating procedure: {}", t.id); + bail!("table is in creating procedure, table id: {}", t.id); } else { Err(MetaError::catalog_duplicated("table", &relation_key.2)) } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index da1a310ee8d86..670ecb99e2172 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -370,9 +370,12 @@ impl TableFragments { /// Returns actor ids that need to be tracked when creating MV. pub fn tracking_progress_actor_ids(&self) -> Vec { Self::filter_actor_ids(self, |fragment_type_mask| { - (fragment_type_mask + let is_value_or_scan = (fragment_type_mask & (FragmentTypeFlag::Values as u32 | FragmentTypeFlag::StreamScan as u32)) - != 0 + != 0; + // Note: CDC table fragment is both StreamScan and CdcFilter fragment. We don't want to track CDC progress. + let is_cdc = (fragment_type_mask & FragmentTypeFlag::CdcFilter as u32) == 0; + is_value_or_scan && !is_cdc }) } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index f9cf11512cfd2..7cb85d91af7d0 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -434,6 +434,10 @@ impl GlobalStreamManager { self.build_actors(&table_fragments, &building_locations, &existing_locations) .await?; + tracing::debug!( + table_id = %table_fragments.table_id(), + "built actors finished" + ); if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { self.build_actors( @@ -495,6 +499,7 @@ impl GlobalStreamManager { ddl_type, replace_table: replace_table_command, }; + tracing::debug!("sending Command::CreateStreamingJob"); if let Err(err) = self.barrier_scheduler.run_command(command).await { if create_type == CreateType::Foreground || err.is_cancelled() { let mut table_ids = HashSet::from_iter(std::iter::once(table_id));