Skip to content

Commit

Permalink
fix cdc table stuck. due to track id
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Mar 25, 2024
1 parent 27db3f6 commit 9f8f533
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 5 deletions.
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,7 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_MVIEW = 2;
FRAGMENT_TYPE_FLAG_SINK = 4;
FRAGMENT_TYPE_FLAG_NOW = 8; // TODO: Remove this and insert a `BarrierRecv` instead.
// Include StreamScan and StreamCdcScan
FRAGMENT_TYPE_FLAG_STREAM_SCAN = 16;
FRAGMENT_TYPE_FLAG_BARRIER_RECV = 32;
FRAGMENT_TYPE_FLAG_VALUES = 64;
Expand Down
20 changes: 20 additions & 0 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,25 @@ impl TrackingJob {
}
}

impl std::fmt::Debug for TrackingJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TrackingJob::New(command) => write!(
f,
"TrackingJob::New({:?})",
command.context.table_to_create()
),
TrackingJob::Recovered(recovered) => {
write!(
f,
"TrackingJob::Recovered({:?})",
recovered.fragments.table_id()
)
}
}
}
}

pub struct RecoveredTrackingJob {
pub fragments: TableFragments,
pub finished: Notifier,
Expand Down Expand Up @@ -367,6 +386,7 @@ impl CreateMviewProgressTracker {
///
/// Returns whether there are still remaining stashed jobs to finish.
pub(super) async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult<bool> {
tracing::trace!(finished_jobs=?self.finished_jobs, progress_map=?self.progress_map, "finishing jobs");
for job in self
.finished_jobs
.extract_if(|job| checkpoint || !job.is_checkpoint_required())
Expand Down
11 changes: 9 additions & 2 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,18 @@ impl BarrierScheduler {

for (injected_rx, collect_rx, finish_rx) in contexts {
// Wait for this command to be injected, and record the result.
tracing::trace!("waiting for injected_rx");
let info = injected_rx.await.ok().context("failed to inject barrier")?;
infos.push(info);

tracing::trace!("waiting for collect_rx");
// Throw the error if it occurs when collecting this barrier.
collect_rx
.await
.ok()
.context("failed to collect barrier")??;

tracing::trace!("waiting for finish_rx");
// Wait for this command to be finished.
finish_rx.await.ok().context("failed to finish command")??;
}
Expand Down Expand Up @@ -316,9 +319,13 @@ impl BarrierScheduler {
///
/// Returns the barrier info of the actual command.
pub async fn run_command(&self, command: Command) -> MetaResult<BarrierInfo> {
self.run_multiple_commands(vec![command])
tracing::trace!("run_command: {:?}", command);
let ret = self
.run_multiple_commands(vec![command])
.await
.map(|i| i[0])
.map(|i| i[0]);
tracing::trace!("run_command finished");
ret
}

/// Flush means waiting for the next barrier to collect.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/catalog/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl DatabaseManager {
&& x.name.eq(&relation_key.2)
}) {
if t.stream_job_status == StreamJobStatus::Creating as i32 {
bail!("table is in creating procedure: {}", t.id);
bail!("table is in creating procedure, table id: {}", t.id);
} else {
Err(MetaError::catalog_duplicated("table", &relation_key.2))
}
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,12 @@ impl TableFragments {
/// Returns actor ids that need to be tracked when creating MV.
pub fn tracking_progress_actor_ids(&self) -> Vec<ActorId> {
Self::filter_actor_ids(self, |fragment_type_mask| {
(fragment_type_mask
let is_value_or_scan = (fragment_type_mask
& (FragmentTypeFlag::Values as u32 | FragmentTypeFlag::StreamScan as u32))
!= 0
!= 0;
// Note: CDC table fragment is both StreamScan and CdcFilter fragment. We don't want to track CDC progress.
let is_cdc = (fragment_type_mask & FragmentTypeFlag::CdcFilter as u32) == 0;
is_value_or_scan && !is_cdc
})
}

Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,10 @@ impl GlobalStreamManager {

self.build_actors(&table_fragments, &building_locations, &existing_locations)
.await?;
tracing::debug!(
table_id = %table_fragments.table_id(),
"built actors finished"
);

if let Some((streaming_job, context, table_fragments)) = replace_table_job_info {
self.build_actors(
Expand Down Expand Up @@ -495,6 +499,7 @@ impl GlobalStreamManager {
ddl_type,
replace_table: replace_table_command,
};
tracing::debug!("sending Command::CreateStreamingJob");
if let Err(err) = self.barrier_scheduler.run_command(command).await {
if create_type == CreateType::Foreground || err.is_cancelled() {
let mut table_ids = HashSet::from_iter(std::iter::once(table_id));
Expand Down

0 comments on commit 9f8f533

Please sign in to comment.