Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Sep 28, 2023
1 parent 85d3516 commit d99813a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ impl GlobalBarrierManager {
// List stream jobs, includes both created / creating.
// Need to rewrite as well.
// List ALL stream jobs.
// The dirty tables should have been cleaned,
// so when we list the stream job ids here, they should be already purged.
let stream_job_ids = self.catalog_manager.list_stream_job_ids().await?;
let to_drop_table_fragments = self
.fragment_manager
Expand All @@ -84,6 +86,11 @@ impl GlobalBarrierManager {
!stream_job_ids.contains(&tf.table_id().table_id)
})
.await;
let fragment_ids = to_drop_table_fragments
.iter()
.map(|f| f.table_id())
.collect_vec();
println!("Cleaning dirty fragment ids {fragment_ids:?}");

let to_drop_streaming_ids = to_drop_table_fragments
.iter()
Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,11 @@ impl FragmentManager {
commit_meta_with_trx!(self, trx, table_fragments)?;
guard.table_revision = next_revision;
}
let to_delete_ids = to_delete_table_fragments
.iter()
.map(|f| f.table_id())
.collect_vec();
println!("to_delete_table_fragments: {:#?}", to_delete_ids);

for table_fragments in to_delete_table_fragments {
if table_fragments.state() != State::Initial {
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ impl CatalogManager {

let mut table_ids = vec![];
let mv_table_ids = tables.iter().map(|t| t.id).collect_vec();
println!("Cleaning dirty mv table ids: {mv_table_ids:?}");
for table_id in &mv_table_ids {
let fragment = fragment_manager
.select_table_fragments_by_table_id(&table_id.into())
Expand Down Expand Up @@ -840,6 +841,7 @@ impl CatalogManager {
table_id: TableId,
internal_table_ids: Vec<TableId>,
) -> MetaResult<()> {
println!("remove create table with id: {table_id}");
let core = &mut self.core.lock().await;
let table = {
let database_core = &mut core.database;
Expand Down Expand Up @@ -881,6 +883,7 @@ impl CatalogManager {
table_id: TableId,
fragment: &TableFragments,
) -> MetaResult<()> {
println!("remove create table with fragment: {table_id}");
let core = &mut self.core.lock().await;
let table = {
let database_core = &mut core.database;
Expand Down

0 comments on commit d99813a

Please sign in to comment.