From e52d5c4dfe6a4bcef31ecd1721238fca934aa1e2 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 5 Feb 2024 16:31:51 +0800 Subject: [PATCH] Update recovery with new method & created-state filter. --- src/meta/src/barrier/recovery.rs | 6 +++++- src/meta/src/controller/catalog.rs | 3 ++- src/meta/src/stream/scale.rs | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index d6fae40f44d2f..a25cdb48180a2 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -22,10 +22,13 @@ use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_meta_model_v2::StreamingParallelism; use risingwave_pb::common::ActorInfo; +use risingwave_pb::meta::table_fragments::actor_status::ActorState; +use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::AddMutation; +use risingwave_pb::PbFieldNotFound; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -607,7 +610,7 @@ impl GlobalBarrierManagerContext { let table_parallelisms: HashMap<_, _> = { let streaming_parallelisms = mgr .catalog_controller - .get_all_streaming_parallelisms() + .get_all_created_streaming_parallelisms() .await?; streaming_parallelisms @@ -751,6 +754,7 @@ impl GlobalBarrierManagerContext { guard .table_fragments() .iter() + .filter(|&(_, table)| matches!(table.state(), State::Created)) .map(|(table_id, table)| { let target_parallelism = derive_target_parallelism_for_custom(current_parallelism, table); diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 8d34e4076c1e9..f76f38e84dde7 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2134,12 +2134,13 @@ impl CatalogController { .collect()) } - pub async fn get_all_streaming_parallelisms( + pub async fn get_all_created_streaming_parallelisms( &self, ) -> MetaResult> { let inner = self.inner.read().await; let job_parallelisms = StreamingJob::find() + .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) .select_only() .columns([ streaming_job::Column::JobId, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 6dd0d1e3b72cb..6f16cb0a0b931 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2574,7 +2574,7 @@ impl GlobalStreamManager { let table_parallelisms: HashMap<_, _> = { let streaming_parallelisms = mgr .catalog_controller - .get_all_streaming_parallelisms() + .get_all_created_streaming_parallelisms() .await?; streaming_parallelisms