diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index 2da479531abf0..0c3d9d36c4853 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -519,6 +519,14 @@ impl MigrationTrait for Migration { .to(Connection::Table, Connection::ConnectionId) .to_owned(), ) + .foreign_key( + &mut ForeignKey::create() + .name("FK_source_optional_associated_table_id") + .from(Source::Table, Source::OptionalAssociatedTableId) + .to(Object::Table, Object::Oid) + .on_delete(ForeignKeyAction::Cascade) + .to_owned(), + ) .to_owned(), ) .await?; diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 76eab54f631a0..6077efa7f88c1 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -468,6 +468,7 @@ impl CatalogController { .clone() .into_iter() .chain(state_table_ids.clone().into_iter()) + .chain(associated_source_ids.clone().into_iter()) .collect(); let res = Object::delete_many() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index f4004e25b500c..4fc84dda21d55 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -401,6 +401,14 @@ impl CatalogController { .all(&txn) .await?; + let associated_source_id: Option = Table::find_by_id(job_id) + .select_only() + .column(table::Column::OptionalAssociatedSourceId) + .filter(table::Column::OptionalAssociatedSourceId.is_not_null()) + .into_tuple() + .one(&txn) + .await?; + Object::delete_by_id(job_id).exec(&txn).await?; if !internal_table_ids.is_empty() { Object::delete_many() @@ -408,6 +416,9 @@ impl CatalogController { .exec(&txn) .await?; } + if let Some(source_id) = associated_source_id { + Object::delete_by_id(source_id).exec(&txn).await?; + } txn.commit().await?; Ok(true)