Skip to content

Commit

Permalink
fix(meta): persist internal tables of CREATE TABLE (#13039)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Oct 25, 2023
1 parent 09a67ab commit 7f82929
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
33 changes: 14 additions & 19 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ impl CatalogManager {
self.start_create_table_procedure_with_source(source, table)
.await
} else {
self.start_create_table_procedure(table, vec![]).await
self.start_create_table_procedure(table, internal_tables)
.await
}
}
}
Expand Down Expand Up @@ -765,7 +766,9 @@ impl CatalogManager {
/// 2. Not belonging to a background stream job.
/// Clean up these hanging tables by the id.
pub async fn clean_dirty_tables(&self, fragment_manager: FragmentManagerRef) -> MetaResult<()> {
let creating_tables: Vec<Table> = self.list_persisted_creating_tables().await;
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let creating_tables: Vec<Table> = database_core.list_persisted_creating_tables();
tracing::debug!(
"creating_tables ids: {:#?}",
creating_tables.iter().map(|t| t.id).collect_vec()
Expand Down Expand Up @@ -839,14 +842,13 @@ impl CatalogManager {
}
}

let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let tables = &mut database_core.tables;
let mut tables = BTreeMapTransaction::new(tables);
for table in &tables_to_clean {
tracing::debug!("cleaning table_id: {}", table.id);
let table = tables.remove(table.id);
assert!(table.is_some())
let table_id = table.id;
tracing::debug!("cleaning table_id: {}", table_id);
let table = tables.remove(table_id);
assert!(table.is_some(), "table_id {} missing", table_id)
}
commit_meta!(self, tables)?;

Expand Down Expand Up @@ -929,25 +931,20 @@ impl CatalogManager {
);
return Ok(());
};
table
};

tracing::trace!("cleanup tables for {}", table.id);
{
let core = &mut self.core.lock().await;
let database_core = &mut core.database;

tracing::trace!("cleanup tables for {}", table.id);
let mut table_ids = vec![table.id];
table_ids.extend(internal_table_ids);

let tables = &mut database_core.tables;
let mut tables = BTreeMapTransaction::new(tables);
for table_id in table_ids {
let res = tables.remove(table_id);
assert!(res.is_some());
assert!(res.is_some(), "table_id {} missing", table_id);
}
commit_meta!(self, tables)?;
}
table
};

{
let core = &mut self.core.lock().await;
Expand Down Expand Up @@ -1984,9 +1981,7 @@ impl CatalogManager {
let table_key = (table.database_id, table.schema_id, table.name.clone());
assert!(
!database_core.sources.contains_key(&source.id)
&& !database_core.tables.contains_key(&table.id)
&& database_core.has_in_progress_creation(&source_key)
&& database_core.has_in_progress_creation(&table_key),
&& !database_core.tables.contains_key(&table.id),
"table and source must be in creating procedure"
);

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ impl DdlController {

let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap());

// Persist tables
tracing::debug!(id = stream_job.id(), "preparing stream job");
let fragment_graph = self
.prepare_stream_job(&mut stream_job, fragment_graph)
Expand Down

0 comments on commit 7f82929

Please sign in to comment.