diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 521a8b9ad1c0d..a027348cc9619 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -43,7 +43,7 @@ impl ServingVnodeMapping { /// Upsert mapping for given fragments according to the latest `workers`. /// Returns (successful updates, failed updates). - fn upsert( + pub fn upsert( &self, streaming_parallelisms: HashMap, workers: &[WorkerNode], @@ -79,7 +79,7 @@ impl ServingVnodeMapping { } } -fn to_fragment_parallel_unit_mapping( +pub(crate) fn to_fragment_parallel_unit_mapping( mappings: &HashMap, ) -> Vec { mappings @@ -91,7 +91,7 @@ fn to_fragment_parallel_unit_mapping( .collect() } -fn to_deleted_fragment_parallel_unit_mapping( +pub(crate) fn to_deleted_fragment_parallel_unit_mapping( fragment_ids: &[FragmentId], ) -> Vec { fragment_ids diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 99a4d6fc76e92..9656b2409226c 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -27,9 +27,11 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_pb::common::{ActorInfo, ParallelUnit, WorkerNode}; use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy}; +use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::{self, ActorStatus, Fragment}; +use risingwave_pb::meta::FragmentParallelUnitMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode}; use risingwave_pb::stream_service::{ @@ -40,6 +42,10 @@ use uuid::Uuid; use crate::barrier::Reschedule; use crate::manager::{ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments}; +use crate::serving::{ + to_deleted_fragment_parallel_unit_mapping, to_fragment_parallel_unit_mapping, + ServingVnodeMapping, +}; use crate::storage::{MetaStore, MetaStoreError, MetaStoreRef, Transaction, DEFAULT_COLUMN_FAMILY}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -1532,6 +1538,48 @@ impl ScaleController { .post_apply_reschedules(reschedules.clone()) .await?; + // Update serving fragment info after rescheduling in meta store. + if !reschedules.is_empty() { + let workers = self + .cluster_manager + .list_active_serving_compute_nodes() + .await; + let streaming_parallelisms = self + .fragment_manager + .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect())) + .await; + let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default()); + let (upserted, failed) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); + if !upserted.is_empty() { + tracing::debug!( + "Update serving vnode mapping for fragments {:?}.", + upserted.keys() + ); + self.env + .notification_manager() + .notify_frontend_without_version( + Operation::Update, + Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { + mappings: to_fragment_parallel_unit_mapping(&upserted), + }), + ); + } + if !failed.is_empty() { + tracing::debug!( + "Fail to update serving vnode mapping for fragments {:?}.", + failed + ); + self.env + .notification_manager() + .notify_frontend_without_version( + Operation::Delete, + Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { + mappings: to_deleted_fragment_parallel_unit_mapping(&failed), + }), + ); + } + } + let mut stream_source_actor_splits = HashMap::new(); let mut stream_source_dropped_actors = HashSet::new();