Skip to content

Commit

Permalink
ignore drop notification if not exists
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 18, 2024
1 parent 2e753f3 commit 54b30c5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 18 deletions.
62 changes: 44 additions & 18 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ impl SchemaCatalog {
}

pub fn drop_table(&mut self, id: TableId) {
let table_ref = self.table_by_id.remove(&id).unwrap();
let Some(table_ref) = self.table_by_id.remove(&id) else {
tracing::warn!(?id, "table to drop not found, cleaning up?");
return;
};

self.table_by_name.remove(&table_ref.name).unwrap();
self.indexes_by_table_id.remove(&table_ref.id);
}
Expand Down Expand Up @@ -190,7 +194,11 @@ impl SchemaCatalog {
}

pub fn drop_index(&mut self, id: IndexId) {
let index_ref = self.index_by_id.remove(&id).unwrap();
let Some(index_ref) = self.index_by_id.remove(&id) else {
tracing::warn!(?id, "index to drop not found, cleaning up?");
return;
};

self.index_by_name.remove(&index_ref.name).unwrap();
match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
Occupied(mut entry) => {
Expand Down Expand Up @@ -225,7 +233,11 @@ impl SchemaCatalog {
}

pub fn drop_source(&mut self, id: SourceId) {
let source_ref = self.source_by_id.remove(&id).unwrap();
let Some(source_ref) = self.source_by_id.remove(&id) else {
tracing::warn!(?id, "source to drop not found, cleaning up?");
return;
};

self.source_by_name.remove(&source_ref.name).unwrap();
if let Some(connection_id) = source_ref.connection_id {
if let Occupied(mut e) = self.connection_source_ref.entry(connection_id) {
Expand Down Expand Up @@ -274,7 +286,11 @@ impl SchemaCatalog {
}

pub fn drop_sink(&mut self, id: SinkId) {
let sink_ref = self.sink_by_id.remove(&id).unwrap();
let Some(sink_ref) = self.sink_by_id.remove(&id) else {
tracing::warn!(?id, "sink to drop not found, cleaning up?");
return;
};

self.sink_by_name.remove(&sink_ref.name).unwrap();
if let Some(connection_id) = sink_ref.connection_id {
if let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0) {
Expand Down Expand Up @@ -318,7 +334,11 @@ impl SchemaCatalog {
}

pub fn drop_subscription(&mut self, id: SubscriptionId) {
let subscription_ref = self.subscription_by_id.remove(&id).unwrap();
let Some(subscription_ref) = self.subscription_by_id.remove(&id) else {
tracing::warn!(?id, "subscription to drop not found, cleaning up?");
return;
};

self.subscription_by_name
.remove(&subscription_ref.name)
.unwrap();
Expand Down Expand Up @@ -354,7 +374,11 @@ impl SchemaCatalog {
}

pub fn drop_view(&mut self, id: ViewId) {
let view_ref = self.view_by_id.remove(&id).unwrap();
let Some(view_ref) = self.view_by_id.remove(&id) else {
tracing::warn!(?id, "view to drop not found, cleaning up?");
return;
};

self.view_by_name.remove(&view_ref.name).unwrap();
}

Expand Down Expand Up @@ -411,10 +435,10 @@ impl SchemaCatalog {
}

pub fn drop_function(&mut self, id: FunctionId) {
let function_ref = self
.function_by_id
.remove(&id)
.expect("function not found by id");
let Some(function_ref) = self.function_by_id.remove(&id) else {
tracing::warn!(?id, "function to drop not found, cleaning up?");
return;
};

self.function_registry
.remove(Self::get_func_sign(&function_ref))
Expand Down Expand Up @@ -483,10 +507,11 @@ impl SchemaCatalog {
}

pub fn drop_connection(&mut self, connection_id: ConnectionId) {
let connection_ref = self
.connection_by_id
.remove(&connection_id)
.expect("connection not found by id");
let Some(connection_ref) = self.connection_by_id.remove(&connection_id) else {
tracing::warn!(?connection_id, "connection to drop not found, cleaning up?");
return;
};

self.connection_by_name
.remove(&connection_ref.name)
.expect("connection not found by name");
Expand Down Expand Up @@ -523,10 +548,11 @@ impl SchemaCatalog {
}

pub fn drop_secret(&mut self, secret_id: SecretId) {
let secret_ref = self
.secret_by_id
.remove(&secret_id)
.expect("secret not found by id");
let Some(secret_ref) = self.secret_by_id.remove(&secret_id) else {
tracing::warn!(?secret_id, "secret to drop not found, cleaning up?");
return;
};

self.secret_by_name
.remove(&secret_ref.name)
.expect("secret not found by name");
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ impl CatalogController {
txn.commit().await?;

if !objs.is_empty() {
// We **may** also have notified the frontend about these objects,
// so we need to notify the frontend to delete them here.
// The frontend will ignore the request if the object does not exist,
// so it's safe to always notify.
self.notify_frontend(Operation::Delete, build_relation_group_for_delete(objs))
.await;
}
Expand Down

0 comments on commit 54b30c5

Please sign in to comment.