Skip to content

Commit

Permalink
fix cancel to remove tables
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Sep 27, 2023
1 parent 87a7dad commit 919adb3
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
21 changes: 19 additions & 2 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,10 +783,12 @@ impl CatalogManager {

let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let user_core = &mut core.user;
for table in &tables {
for relation_id in &table.dependent_relations {
database_core.decrease_ref_count(*relation_id);
}
user_core.decrease_ref(table.owner);
}

let tables = &mut database_core.tables;
Expand Down Expand Up @@ -844,7 +846,7 @@ impl CatalogManager {
&self,
table: &Table,
fragment_manager: FragmentManagerRef,
) {
) -> MetaResult<()> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let user_core = &mut core.user;
Expand All @@ -854,7 +856,22 @@ impl CatalogManager {
database_core.decrease_ref_count(dependent_relation_id);
}
user_core.decrease_ref(table.owner);
self.clean_dirty_tables(fragment_manager).await.unwrap();

let mut table_ids = vec![table.id];
let fragment = fragment_manager
.select_table_fragments_by_table_id(&table.id.into())
.await?;
let internal_table_ids = fragment.internal_table_ids();
table_ids.extend(internal_table_ids);

let tables = &mut database_core.tables;
let mut tables = BTreeMapTransaction::new(tables);
for table_id in table_ids {
let table = tables.remove(table_id);
assert!(table.is_some())
}
commit_meta!(self, tables)?;
Ok(())
}

/// return id of streaming jobs in the database which need to be dropped by stream manager.
Expand Down
15 changes: 10 additions & 5 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ impl DdlController {
let (ctx, table_fragments) = match result {
Ok(r) => r,
Err(e) => {
self.cancel_stream_job(&stream_job, internal_tables).await;
self.cancel_stream_job(&stream_job, internal_tables).await?;
return Err(e);
}
};
Expand Down Expand Up @@ -512,7 +512,7 @@ impl DdlController {
.create_streaming_job(table_fragments, ctx)
.await;
if let Err(e) = result {
self.cancel_stream_job(&stream_job, internal_tables).await;
self.cancel_stream_job(&stream_job, internal_tables).await?;
return Err(e);
};
self.finish_stream_job(stream_job, internal_tables).await
Expand Down Expand Up @@ -724,7 +724,11 @@ impl DdlController {
}

/// `cancel_stream_job` cancels a stream job and clean some states.
async fn cancel_stream_job(&self, stream_job: &StreamingJob, internal_tables: Vec<Table>) {
async fn cancel_stream_job(
&self,
stream_job: &StreamingJob,
internal_tables: Vec<Table>,
) -> MetaResult<()> {
let mut creating_internal_table_ids =
internal_tables.into_iter().map(|t| t.id).collect_vec();
// 1. cancel create procedure.
Expand All @@ -733,7 +737,7 @@ impl DdlController {
creating_internal_table_ids.push(table.id);
self.catalog_manager
.cancel_create_table_procedure(table, self.fragment_manager.clone())
.await;
.await?;
}
StreamingJob::Sink(sink) => {
self.catalog_manager
Expand All @@ -749,7 +753,7 @@ impl DdlController {
} else {
self.catalog_manager
.cancel_create_table_procedure(table, self.fragment_manager.clone())
.await;
.await?;
}
}
StreamingJob::Index(index, table) => {
Expand All @@ -763,6 +767,7 @@ impl DdlController {
self.catalog_manager
.unmark_creating_tables(&creating_internal_table_ids, true)
.await;
Ok(())
}

/// `finish_stream_job` finishes a stream job and clean some states.
Expand Down

0 comments on commit 919adb3

Please sign in to comment.