diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index d6fae40f44d2f..e4b93a286a3f8 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -22,6 +22,7 @@ use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_meta_model_v2::StreamingParallelism; use risingwave_pb::common::ActorInfo; +use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::barrier_mutation::Mutation; @@ -607,7 +608,7 @@ impl GlobalBarrierManagerContext { let table_parallelisms: HashMap<_, _> = { let streaming_parallelisms = mgr .catalog_controller - .get_all_streaming_parallelisms() + .get_all_created_streaming_parallelisms() .await?; streaming_parallelisms @@ -751,6 +752,7 @@ impl GlobalBarrierManagerContext { guard .table_fragments() .iter() + .filter(|&(_, table)| matches!(table.state(), State::Created)) .map(|(table_id, table)| { let target_parallelism = derive_target_parallelism_for_custom(current_parallelism, table); diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 8d34e4076c1e9..f76f38e84dde7 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2134,12 +2134,13 @@ impl CatalogController { .collect()) } - pub async fn get_all_streaming_parallelisms( + pub async fn get_all_created_streaming_parallelisms( &self, ) -> MetaResult> { let inner = self.inner.read().await; let job_parallelisms = StreamingJob::find() + .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) .select_only() .columns([ streaming_job::Column::JobId, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 6dd0d1e3b72cb..e598aefb291d3 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -36,7 +36,7 @@ use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolic use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; -use risingwave_pb::meta::table_fragments::{self, ActorStatus, Fragment}; +use risingwave_pb::meta::table_fragments::{self, ActorStatus, Fragment, State}; use risingwave_pb::meta::FragmentParallelUnitMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode}; @@ -2528,6 +2528,7 @@ impl GlobalStreamManager { guard .table_fragments() .iter() + .filter(|&(_, table)| matches!(table.state(), State::Created)) .map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism)) .collect() }; @@ -2574,7 +2575,7 @@ impl GlobalStreamManager { let table_parallelisms: HashMap<_, _> = { let streaming_parallelisms = mgr .catalog_controller - .get_all_streaming_parallelisms() + .get_all_created_streaming_parallelisms() .await?; streaming_parallelisms