From 0da9bf926cf8579b00eab1d9d9de90efc362779e Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 17 Jun 2024 14:52:47 +0800 Subject: [PATCH 1/2] fix(meta): purge hummock state table after pre apply drop command --- src/meta/src/barrier/recovery.rs | 28 +++++++++--------------- src/meta/src/controller/streaming_job.rs | 19 ++++------------ src/meta/src/rpc/ddl_controller_v2.rs | 2 +- 3 files changed, 15 insertions(+), 34 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index f17b4e163901c..f58f588e95466 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -313,30 +313,22 @@ impl GlobalBarrierManagerContext { let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled(); let applied = !dropped_actors.is_empty() || !cancelled.is_empty(); if !cancelled.is_empty() { - let unregister_table_ids = match &self.metadata_manager { + match &self.metadata_manager { MetadataManager::V1(mgr) => { mgr.fragment_manager .drop_table_fragments_vec(&cancelled) - .await? + .await?; } MetadataManager::V2(mgr) => { - let mut unregister_table_ids = Vec::new(); for job_id in cancelled { - let (_, table_ids_to_unregister) = mgr - .catalog_controller + mgr.catalog_controller .try_abort_creating_streaming_job(job_id.table_id as _, true) .await?; - unregister_table_ids.extend(table_ids_to_unregister); } - unregister_table_ids - .into_iter() - .map(|table_id| table_id as u32) - .collect() } }; - self.hummock_manager - .unregister_table_ids(&unregister_table_ids) - .await?; + // no need to unregister state table id from hummock manager here, because it's expected that + // we call `purge_state_table_from_hummock` anyway after the current method returns. } Ok(applied) } @@ -377,11 +369,6 @@ impl GlobalBarrierManager { .await .context("clean dirty streaming jobs")?; - self.context - .purge_state_table_from_hummock() - .await - .context("purge state table from hummock")?; - // Mview progress needs to be recovered. tracing::info!("recovering mview progress"); self.context @@ -462,6 +449,11 @@ impl GlobalBarrierManager { })? } + self.context + .purge_state_table_from_hummock() + .await + .context("purge state table from hummock")?; + // update and build all actors. self.context.update_actors(&info).await.inspect_err(|err| { warn!(error = %err.as_report(), "update actors failed"); diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 3066ce223785e..ad852cf8ac51c 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -411,7 +411,7 @@ impl CatalogController { &self, job_id: ObjectId, is_cancelled: bool, - ) -> MetaResult<(bool, Vec)> { + ) -> MetaResult { let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -421,7 +421,7 @@ impl CatalogController { id = job_id, "streaming job not found when aborting creating, might be cleaned by recovery" ); - return Ok((true, Vec::new())); + return Ok(true); } if !is_cancelled { @@ -436,7 +436,7 @@ impl CatalogController { id = job_id, "streaming job is created in background and still in creating status" ); - return Ok((false, Vec::new())); + return Ok(false); } } } @@ -449,13 +449,6 @@ impl CatalogController { .all(&txn) .await?; - let mv_table_id: Option = Table::find_by_id(job_id) - .select_only() - .column(table::Column::TableId) - .into_tuple() - .one(&txn) - .await?; - let associated_source_id: Option = Table::find_by_id(job_id) .select_only() .column(table::Column::OptionalAssociatedSourceId) @@ -476,11 +469,7 @@ impl CatalogController { } txn.commit().await?; - let mut state_table_ids = internal_table_ids; - - state_table_ids.extend(mv_table_id.into_iter()); - - Ok((true, state_table_ids)) + Ok(true) } pub async fn post_collect_table_fragments( diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 13ecfca13c782..37448e50a07d9 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -104,7 +104,7 @@ impl DdlController { self.env.event_log_manager_ref().add_event_logs(vec![ risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event), ]); - let (aborted, _) = mgr + let aborted = mgr .catalog_controller .try_abort_creating_streaming_job(job_id as _, false) .await?; From 453d4a8c64d73a7757dee8e3606c838790e116f7 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 17 Jun 2024 14:56:40 +0800 Subject: [PATCH 2/2] pre-apply and purge earlier --- src/meta/src/barrier/recovery.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index f58f588e95466..7200426d78300 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -423,18 +423,6 @@ impl GlobalBarrierManager { })? }; - let mut control_stream_manager = - ControlStreamManager::new(self.context.clone()); - - control_stream_manager - .reset(prev_epoch.value().0, active_streaming_nodes.current()) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "reset compute nodes failed"); - })?; - - self.context.sink_manager.reset().await; - if self .context .pre_apply_drop_cancel(&self.scheduled_barriers) @@ -454,6 +442,18 @@ impl GlobalBarrierManager { .await .context("purge state table from hummock")?; + let mut control_stream_manager = + ControlStreamManager::new(self.context.clone()); + + control_stream_manager + .reset(prev_epoch.value().0, active_streaming_nodes.current()) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "reset compute nodes failed"); + })?; + + self.context.sink_manager.reset().await; + // update and build all actors. self.context.update_actors(&info).await.inspect_err(|err| { warn!(error = %err.as_report(), "update actors failed");