Skip to content

Commit

Permalink
fix(sql-backend): clean up dirty source catalogs for failed table wit…
Browse files Browse the repository at this point in the history
…h connectors and recovery (#15110)
  • Loading branch information
yezizp2012 authored Feb 18, 2024
1 parent 16ae65b commit 0975be2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,14 @@ impl MigrationTrait for Migration {
.to(Connection::Table, Connection::ConnectionId)
.to_owned(),
)
.foreign_key(
&mut ForeignKey::create()
.name("FK_source_optional_associated_table_id")
.from(Source::Table, Source::OptionalAssociatedTableId)
.to(Object::Table, Object::Oid)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.to_owned(),
)
.await?;
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ impl CatalogController {
.clone()
.into_iter()
.chain(state_table_ids.clone().into_iter())
.chain(associated_source_ids.clone().into_iter())
.collect();

let res = Object::delete_many()
Expand Down
11 changes: 11 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,24 @@ impl CatalogController {
.all(&txn)
.await?;

let associated_source_id: Option<SourceId> = Table::find_by_id(job_id)
.select_only()
.column(table::Column::OptionalAssociatedSourceId)
.filter(table::Column::OptionalAssociatedSourceId.is_not_null())
.into_tuple()
.one(&txn)
.await?;

Object::delete_by_id(job_id).exec(&txn).await?;
if !internal_table_ids.is_empty() {
Object::delete_many()
.filter(object::Column::Oid.is_in(internal_table_ids))
.exec(&txn)
.await?;
}
if let Some(source_id) = associated_source_id {
Object::delete_by_id(source_id).exec(&txn).await?;
}
txn.commit().await?;

Ok(true)
Expand Down

0 comments on commit 0975be2

Please sign in to comment.