Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't panic and trigger recovery when applying cancel command for created job #19291

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 23 additions & 31 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,46 +979,24 @@ impl CommandContext {
tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job");
self.clean_up(table_fragments.actor_ids()).await?;

// NOTE(kwannoel): At this point, meta has already registered the table ids.
// We should unregister them.
// This is required for background ddl, for foreground ddl this is a no-op.
// Foreground ddl is handled entirely by stream manager, so it will unregister
// the table ids on failure.
// On the other hand background ddl could be handled by barrier manager.
// It won't clean the tables on failure,
// since the failure could be recoverable.
// As such it needs to be handled here.
self.barrier_manager_context
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the job is already created, unregistering from hummock will lead to data inconsistency. Meta will crash loop during commit epoch because of missing state table id. We only do unregister after the catalog is successfully deleted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed in main/2.1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the behavior already changed for SQL backend in main/2.1.

.hummock_manager
.unregister_table_ids(table_fragments.all_table_ids().map(TableId::new))
.await?;

match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// NOTE(kwannoel): At this point, catalog manager has persisted the tables already.
// We need to cleanup the table state. So we can do it here.
// The logic is the same as above, for hummock_manager.unregister_table_ids.
if let Err(e) = mgr
.catalog_manager
mgr.catalog_manager
.cancel_create_materialized_view_procedure(
table_fragments.table_id().table_id,
table_fragments.internal_table_ids(),
)
.await
{
let table_id = table_fragments.table_id().table_id;
tracing::warn!(
table_id,
error = %e.as_report(),
"cancel_create_table_procedure failed for CancelStreamingJob",
);
// If failed, check that table is not in meta store.
// If any table is, just panic, let meta do bootstrap recovery.
// Otherwise our persisted state is dirty.
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
mgr.catalog_manager.assert_tables_deleted(table_ids).await;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error returned by cancel_create_materialized_view_procedure only occurs in situations where: 1. Writing to the metastore fails; 2. The job has already been successfully created. When the job has been successfully created, this assertion will cause a meta panic. Because the cancel command has already stopped the actor on CNs, here directly throw an error to let recovery rebuild.

}
.inspect_err(|e| {
let table_id = table_fragments.table_id().table_id;
tracing::warn!(
table_id,
error = %e.as_report(),
"cancel_create_table_procedure failed for CancelStreamingJob",
);
})?;

// We need to drop table fragments here,
// since this is not done in stream manager (foreground ddl)
Expand All @@ -1038,6 +1016,20 @@ impl CommandContext {
.await?;
}
}

// NOTE(kwannoel): At this point, meta has already registered the table ids.
// We should unregister them.
// This is required for background ddl, for foreground ddl this is a no-op.
// Foreground ddl is handled entirely by stream manager, so it will unregister
// the table ids on failure.
// On the other hand background ddl could be handled by barrier manager.
// It won't clean the tables on failure,
// since the failure could be recoverable.
// As such it needs to be handled here.
self.barrier_manager_context
.hummock_manager
.unregister_table_ids(table_fragments.all_table_ids().map(TableId::new))
.await?;
}

Command::CreateStreamingJob { info, job_type } => {
Expand Down
8 changes: 0 additions & 8 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,14 +1132,6 @@ impl CatalogManager {
};
}

pub async fn assert_tables_deleted(&self, table_ids: Vec<TableId>) {
let core = self.core.lock().await;
let tables = &core.database.tables;
for id in table_ids {
assert_eq!(tables.get(&id), None,)
}
}

/// We clean the following tables:
/// 1. Those which belonged to incomplete Foreground jobs.
/// 2. Those which did not persist their table fragments, we can't recover these.
Expand Down
Loading