Skip to content

Commit

Permalink
fix: don't panic and trigger recovery when applying cancel command fo…
Browse files Browse the repository at this point in the history
…r created job
  • Loading branch information
yezizp2012 committed Nov 7, 2024
1 parent c61f06f commit 5cac022
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 39 deletions.
54 changes: 23 additions & 31 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,46 +979,24 @@ impl CommandContext {
tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job");
self.clean_up(table_fragments.actor_ids()).await?;

// NOTE(kwannoel): At this point, meta has already registered the table ids.
// We should unregister them.
// This is required for background ddl, for foreground ddl this is a no-op.
// Foreground ddl is handled entirely by stream manager, so it will unregister
// the table ids on failure.
// On the other hand background ddl could be handled by barrier manager.
// It won't clean the tables on failure,
// since the failure could be recoverable.
// As such it needs to be handled here.
self.barrier_manager_context
.hummock_manager
.unregister_table_ids(table_fragments.all_table_ids().map(TableId::new))
.await?;

match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// NOTE(kwannoel): At this point, catalog manager has persisted the tables already.
// We need to cleanup the table state. So we can do it here.
// The logic is the same as above, for hummock_manager.unregister_table_ids.
if let Err(e) = mgr
.catalog_manager
mgr.catalog_manager
.cancel_create_materialized_view_procedure(
table_fragments.table_id().table_id,
table_fragments.internal_table_ids(),
)
.await
{
let table_id = table_fragments.table_id().table_id;
tracing::warn!(
table_id,
error = %e.as_report(),
"cancel_create_table_procedure failed for CancelStreamingJob",
);
// If failed, check that table is not in meta store.
// If any table is, just panic, let meta do bootstrap recovery.
// Otherwise our persisted state is dirty.
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
mgr.catalog_manager.assert_tables_deleted(table_ids).await;
}
.inspect_err(|e| {
let table_id = table_fragments.table_id().table_id;
tracing::warn!(
table_id,
error = %e.as_report(),
"cancel_create_table_procedure failed for CancelStreamingJob",
);
})?;

// We need to drop table fragments here,
// since this is not done in stream manager (foreground ddl)
Expand All @@ -1038,6 +1016,20 @@ impl CommandContext {
.await?;
}
}

// NOTE(kwannoel): At this point, meta has already registered the table ids.
// We should unregister them.
// This is required for background ddl, for foreground ddl this is a no-op.
// Foreground ddl is handled entirely by stream manager, so it will unregister
// the table ids on failure.
// On the other hand background ddl could be handled by barrier manager.
// It won't clean the tables on failure,
// since the failure could be recoverable.
// As such it needs to be handled here.
self.barrier_manager_context
.hummock_manager
.unregister_table_ids(table_fragments.all_table_ids().map(TableId::new))
.await?;
}

Command::CreateStreamingJob { info, job_type } => {
Expand Down
8 changes: 0 additions & 8 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,14 +1132,6 @@ impl CatalogManager {
};
}

pub async fn assert_tables_deleted(&self, table_ids: Vec<TableId>) {
let core = self.core.lock().await;
let tables = &core.database.tables;
for id in table_ids {
assert_eq!(tables.get(&id), None,)
}
}

/// We clean the following tables:
/// 1. Those which belonged to incomplete Foreground jobs.
/// 2. Those which did not persist their table fragments, we can't recover these.
Expand Down

0 comments on commit 5cac022

Please sign in to comment.