From 62abf0ac24a75d904af9967ebe50655f311eae61 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 5 Feb 2024 20:11:03 +0800 Subject: [PATCH] fix: only processing streaming jobs that are in the 'created' state in auto-scaling (#15000) --- src/meta/src/barrier/recovery.rs | 4 +++- src/meta/src/controller/catalog.rs | 3 ++- src/meta/src/stream/scale.rs | 5 +++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 7943930ebc660..8382fb90fbacd 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -25,6 +25,7 @@ use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_meta_model_v2::StreamingParallelism; use risingwave_pb::common::ActorInfo; +use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::barrier_mutation::Mutation; @@ -614,7 +615,7 @@ impl GlobalBarrierManagerContext { let table_parallelisms: HashMap<_, _> = { let streaming_parallelisms = mgr .catalog_controller - .get_all_streaming_parallelisms() + .get_all_created_streaming_parallelisms() .await?; streaming_parallelisms @@ -758,6 +759,7 @@ impl GlobalBarrierManagerContext { guard .table_fragments() .iter() + .filter(|&(_, table)| matches!(table.state(), State::Created)) .map(|(table_id, table)| { let target_parallelism = derive_target_parallelism_for_custom(current_parallelism, table); diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 8d34e4076c1e9..f76f38e84dde7 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2134,12 +2134,13 @@ impl CatalogController { .collect()) } - pub async fn get_all_streaming_parallelisms( + pub async fn get_all_created_streaming_parallelisms( &self, ) -> MetaResult> { let inner = self.inner.read().await; let job_parallelisms = StreamingJob::find() + .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) .select_only() .columns([ streaming_job::Column::JobId, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index c7ee64d93c2ac..c034eb846fc4b 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -36,7 +36,7 @@ use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolic use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; -use risingwave_pb::meta::table_fragments::{self, ActorStatus, Fragment}; +use risingwave_pb::meta::table_fragments::{self, ActorStatus, Fragment, State}; use risingwave_pb::meta::FragmentParallelUnitMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode}; @@ -2545,6 +2545,7 @@ impl GlobalStreamManager { guard .table_fragments() .iter() + .filter(|&(_, table)| matches!(table.state(), State::Created)) .map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism)) .collect() }; @@ -2591,7 +2592,7 @@ impl GlobalStreamManager { let table_parallelisms: HashMap<_, _> = { let streaming_parallelisms = mgr .catalog_controller - .get_all_streaming_parallelisms() + .get_all_created_streaming_parallelisms() .await?; streaming_parallelisms