diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 814f9509bbef..dc7a40a41402 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -75,6 +75,8 @@ impl GlobalBarrierManager { // List stream jobs, includes both created / creating. // Need to rewrite as well. // List ALL stream jobs. + // The dirty tables should have been cleaned, + // so when we list the stream job ids here, they should be already purged. let stream_job_ids = self.catalog_manager.list_stream_job_ids().await?; let to_drop_table_fragments = self .fragment_manager @@ -84,6 +86,11 @@ impl GlobalBarrierManager { !stream_job_ids.contains(&tf.table_id().table_id) }) .await; + let fragment_ids = to_drop_table_fragments + .iter() + .map(|f| f.table_id()) + .collect_vec(); + println!("Cleaning dirty fragment ids {fragment_ids:?}"); let to_drop_streaming_ids = to_drop_table_fragments .iter() diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 61af2d0ea122..bba35b874c5d 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -592,6 +592,11 @@ impl FragmentManager { commit_meta_with_trx!(self, trx, table_fragments)?; guard.table_revision = next_revision; } + let to_delete_ids = to_delete_table_fragments + .iter() + .map(|f| f.table_id()) + .collect_vec(); + println!("to_delete_table_fragments: {:#?}", to_delete_ids); for table_fragments in to_delete_table_fragments { if table_fragments.state() != State::Initial { diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index ff8a4620f0b8..f74ee494f8a5 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -763,6 +763,7 @@ impl CatalogManager { let mut table_ids = vec![]; let mv_table_ids = tables.iter().map(|t| t.id).collect_vec(); + println!("Cleaning dirty mv table ids: {mv_table_ids:?}"); for table_id in &mv_table_ids { let fragment = fragment_manager .select_table_fragments_by_table_id(&table_id.into()) @@ -840,6 +841,7 @@ impl CatalogManager { table_id: TableId, internal_table_ids: Vec, ) -> MetaResult<()> { + println!("remove create table with id: {table_id}"); let core = &mut self.core.lock().await; let table = { let database_core = &mut core.database; @@ -881,6 +883,7 @@ impl CatalogManager { table_id: TableId, fragment: &TableFragments, ) -> MetaResult<()> { + println!("remove create table with fragment: {table_id}"); let core = &mut self.core.lock().await; let table = { let database_core = &mut core.database;