Skip to content

Commit

Permalink
fix(meta): make drop streaming job catalog and notify frontend transa…
Browse files Browse the repository at this point in the history
…ctional (#17578)
  • Loading branch information
kwannoel authored Jul 5, 2024
1 parent 096889f commit 3c88611
Showing 1 changed file with 45 additions and 50 deletions.
95 changes: 45 additions & 50 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1295,63 +1295,57 @@ impl CatalogManager {
table_id: TableId,
internal_table_ids: Vec<TableId>,
) -> MetaResult<bool> {
let (table, internal_tables) = {
let core = &mut self.core.lock().await;
let database_core = &mut core.database;
let tables = &mut database_core.tables;
let Some(table) = tables.get(&table_id).cloned() else {
tracing::warn!(
"table_id {} missing when attempting to cancel job, could be cleaned on recovery",
table_id
);
return Ok(false);
};
let mut internal_tables = vec![];
for internal_table_id in &internal_table_ids {
if let Some(table) = tables.get(internal_table_id) {
internal_tables.push(table.clone());
}
let core = &mut self.core.lock().await;
let database_core = &mut core.database;
let tables = &mut database_core.tables;
let Some(table) = tables.get(&table_id).cloned() else {
tracing::warn!(
"table_id {} missing when attempting to cancel job, could be cleaned on recovery",
table_id
);
return Ok(false);
};
let mut internal_tables = vec![];
for internal_table_id in &internal_table_ids {
if let Some(table) = tables.get(internal_table_id) {
internal_tables.push(table.clone());
}
}

// `Unspecified` maps to Created state, due to backwards compatibility.
// `Created` states should not be cancelled.
if table
.get_stream_job_status()
.unwrap_or(StreamJobStatus::Created)
!= StreamJobStatus::Creating
{
return Err(MetaError::invalid_parameter(format!(
"table is not in creating state id={:#?}",
table_id
)));
}
// `Unspecified` maps to Created state, due to backwards compatibility.
// `Created` states should not be cancelled.
if table
.get_stream_job_status()
.unwrap_or(StreamJobStatus::Created)
!= StreamJobStatus::Creating
{
return Err(MetaError::invalid_parameter(format!(
"table is not in creating state id={:#?}",
table_id
)));
}

tracing::trace!("cleanup tables for {}", table.id);
let mut table_ids = vec![table.id];
table_ids.extend(internal_table_ids);
tracing::trace!("cleanup tables for {}", table.id);
let mut table_ids = vec![table.id];
table_ids.extend(internal_table_ids);

let tables = &mut database_core.tables;
let mut tables = BTreeMapTransaction::new(tables);
for table_id in table_ids {
let res = tables.remove(table_id);
assert!(res.is_some(), "table_id {} missing", table_id);
}
commit_meta!(self, tables)?;
(table, internal_tables)
};
let tables = &mut database_core.tables;
let mut tables = BTreeMapTransaction::new(tables);
for table_id in table_ids {
let res = tables.remove(table_id);
assert!(res.is_some(), "table_id {} missing", table_id);
}
commit_meta!(self, tables)?;

{
let core = &mut self.core.lock().await;
{
let user_core = &mut core.user;
user_core.decrease_ref(table.owner);
}
let user_core = &mut core.user;
user_core.decrease_ref(table.owner);
}

{
let database_core = &mut core.database;
for &dependent_relation_id in &table.dependent_relations {
database_core.decrease_relation_ref_count(dependent_relation_id);
}
{
let database_core = &mut core.database;
for &dependent_relation_id in &table.dependent_relations {
database_core.decrease_relation_ref_count(dependent_relation_id);
}
}

Expand All @@ -1371,6 +1365,7 @@ impl CatalogManager {
}),
)
.await;

Ok(true)
}

Expand Down

0 comments on commit 3c88611

Please sign in to comment.