Skip to content

Commit

Permalink
fix(stream): clean dirty tables for barrier recovery (#12990)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Oct 24, 2023
1 parent 23b3346 commit fcad5e1
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 25 deletions.
1 change: 1 addition & 0 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ chmod +x ./risingwave_simulation

export RUST_LOG="info,\
risingwave_meta::barrier::recovery=debug,\
risingwave_meta::manager::catalog=debug,\
risingwave_meta::rpc::ddl_controller=debug,\
risingwave_meta::barrier::mod=debug,\
risingwave_simulation=debug"
Expand Down
7 changes: 2 additions & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl GlobalBarrierManager {
let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
let paused_reason = paused.then_some(PausedReason::Manual);

self.recovery(prev_epoch, paused_reason, true)
self.recovery(prev_epoch, paused_reason)
.instrument(span)
.await
};
Expand Down Expand Up @@ -981,10 +981,7 @@ impl GlobalBarrierManager {

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
*state = self
.recovery(prev_epoch, None, false)
.instrument(span)
.await;
*state = self.recovery(prev_epoch, None).instrument(span).await;
self.set_status(BarrierManagerStatus::Running).await;
} else {
panic!("failed to execute barrier: {:?}", err);
Expand Down
9 changes: 3 additions & 6 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,16 @@ impl GlobalBarrierManager {
&self,
prev_epoch: TracedEpoch,
paused_reason: Option<PausedReason>,
bootstrap_recovery: bool,
) -> BarrierManagerState {
// Mark blocked and abort buffered schedules, they might be dirty already.
self.scheduled_barriers
.abort_and_mark_blocked("cluster is under recovering")
.await;

tracing::info!("recovery start!");
if bootstrap_recovery {
self.clean_dirty_tables()
.await
.expect("clean dirty tables should not fail");
}
self.clean_dirty_tables()
.await
.expect("clean dirty tables should not fail");
self.clean_dirty_fragments()
.await
.expect("clean dirty fragments");
Expand Down
32 changes: 18 additions & 14 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,14 +853,18 @@ impl CatalogManager {
database_core.clear_creating_stream_jobs();
let user_core = &mut core.user;
for table in &tables_to_clean {
// Recovered when init database manager.
for relation_id in &table.dependent_relations {
database_core.decrease_ref_count(*relation_id);
// If table type is internal, no need to update the ref count OR
// user ref count.
if table.table_type != TableType::Internal as i32 {
// Recovered when init database manager.
for relation_id in &table.dependent_relations {
database_core.decrease_ref_count(*relation_id);
}
// Recovered when init user manager.
tracing::debug!("decrease ref for {}", table.id);
user_core.decrease_ref(table.owner);
}
// Recovered when init user manager.
user_core.decrease_ref(table.owner);
}

Ok(())
}

Expand Down Expand Up @@ -919,10 +923,11 @@ impl CatalogManager {
let database_core = &mut core.database;
let tables = &mut database_core.tables;
let Some(table) = tables.get(&table_id).cloned() else {
bail!(
"table_id {} missing when attempting to cancel job",
tracing::warn!(
"table_id {} missing when attempting to cancel job, could be cleaned on recovery",
table_id
)
);
return Ok(());
};
table
};
Expand All @@ -938,7 +943,8 @@ impl CatalogManager {
let tables = &mut database_core.tables;
let mut tables = BTreeMapTransaction::new(tables);
for table_id in table_ids {
tables.remove(table_id);
let res = tables.remove(table_id);
assert!(res.is_some());
}
commit_meta!(self, tables)?;
}
Expand Down Expand Up @@ -2032,8 +2038,7 @@ impl CatalogManager {
let user_core = &mut core.user;
let key = (index.database_id, index.schema_id, index.name.clone());
assert!(
!database_core.indexes.contains_key(&index.id)
&& database_core.has_in_progress_creation(&key),
!database_core.indexes.contains_key(&index.id),
"index must be in creating procedure"
);

Expand Down Expand Up @@ -2188,8 +2193,7 @@ impl CatalogManager {
let user_core = &mut core.user;
let key = (sink.database_id, sink.schema_id, sink.name.clone());
assert!(
!database_core.sinks.contains_key(&sink.id)
&& database_core.has_in_progress_creation(&key),
!database_core.sinks.contains_key(&sink.id),
"sink must be in creating procedure"
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {
.run("create materialized view m1 as select * from t1;")
.await?;

// If the CN is killed before first barrier pass for the MV, the MV will be dropped.
// This is because it's table fragments will NOT be committed until first barrier pass.
sleep(Duration::from_secs(5)).await;
kill_cn_and_wait_recover(&cluster).await;

// Send some upstream updates.
Expand Down

0 comments on commit fcad5e1

Please sign in to comment.