From 02e9a44a809cd2029269269ca3fff28c4d5e7c65 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sun, 18 Feb 2024 08:51:33 +0000 Subject: [PATCH] fix(sql-backend): clean up dirty source catalogs for failed table with connectors and recovery (#15110) (#15111) Co-authored-by: August --- .../model_v2/migration/src/m20230908_072257_init.rs | 8 ++++++++ src/meta/src/controller/catalog.rs | 1 + src/meta/src/controller/streaming_job.rs | 11 +++++++++++ 3 files changed, 20 insertions(+) diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index 96b459e584647..d3a3c6a916e0f 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -519,6 +519,14 @@ impl MigrationTrait for Migration { .to(Connection::Table, Connection::ConnectionId) .to_owned(), ) + .foreign_key( + &mut ForeignKey::create() + .name("FK_source_optional_associated_table_id") + .from(Source::Table, Source::OptionalAssociatedTableId) + .to(Object::Table, Object::Oid) + .on_delete(ForeignKeyAction::Cascade) + .to_owned(), + ) .to_owned(), ) .await?; diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 250cac6d9dde0..55b6e77ef7ce9 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -468,6 +468,7 @@ impl CatalogController { .clone() .into_iter() .chain(state_table_ids.clone().into_iter()) + .chain(associated_source_ids.clone().into_iter()) .collect(); let res = Object::delete_many() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index f4004e25b500c..4fc84dda21d55 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -401,6 +401,14 @@ impl CatalogController { .all(&txn) .await?; + let associated_source_id: Option = Table::find_by_id(job_id) + .select_only() + .column(table::Column::OptionalAssociatedSourceId) + .filter(table::Column::OptionalAssociatedSourceId.is_not_null()) + .into_tuple() + .one(&txn) + .await?; + Object::delete_by_id(job_id).exec(&txn).await?; if !internal_table_ids.is_empty() { Object::delete_many() @@ -408,6 +416,9 @@ impl CatalogController { .exec(&txn) .await?; } + if let Some(source_id) = associated_source_id { + Object::delete_by_id(source_id).exec(&txn).await?; + } txn.commit().await?; Ok(true)