diff --git a/src/meta/model_v2/src/actor_dispatcher.rs b/src/meta/model_v2/src/actor_dispatcher.rs index 81211cc572701..7d40af6967d35 100644 --- a/src/meta/model_v2/src/actor_dispatcher.rs +++ b/src/meta/model_v2/src/actor_dispatcher.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::hash::Hash; + use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType}; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; use crate::{ActorId, ActorMapping, FragmentId, I32Array}; -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Deserialize, Serialize)] +#[derive( + Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize, +)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum DispatcherType { #[sea_orm(string_value = "HASH")] diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 65e730166aed8..c09a5c27b172c 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -290,7 +290,6 @@ pub fn start( ), }, }; - validate_config(&config); let total_memory_bytes = resource_util::memory::system_memory_available_bytes(); diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 7228ddbb36eb9..76aec9b7fb949 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -38,6 +38,7 @@ pub mod cluster; pub mod fragment; pub mod id; pub mod rename; +pub mod scale; pub mod session_params; pub mod streaming_job; pub mod system_param; diff --git a/src/meta/src/controller/scale.rs b/src/meta/src/controller/scale.rs new file mode 100644 index 0000000000000..bcb3c116462e1 --- /dev/null +++ b/src/meta/src/controller/scale.rs @@ -0,0 +1,352 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use itertools::Itertools; +use risingwave_meta_model_migration::{ + Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement, + UnionType, WithClause, WithQuery, +}; +use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; +use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment}; +use risingwave_meta_model_v2::{actor, actor_dispatcher, fragment, ActorId, FragmentId, ObjectId}; +use sea_orm::{ + ColumnTrait, ConnectionTrait, DbErr, EntityTrait, JoinType, QueryFilter, QuerySelect, + QueryTrait, RelationTrait, Statement, TransactionTrait, +}; + +use crate::controller::catalog::CatalogController; +use crate::{MetaError, MetaResult}; + +/// This function will construct a query using recursive cte to find `no_shuffle` upstream relation graph for target fragments. +/// +/// # Examples +/// +/// ``` +/// use risingwave_meta::controller::scale::construct_no_shuffle_upstream_traverse_query; +/// use sea_orm::sea_query::*; +/// use sea_orm::*; +/// +/// let query = construct_no_shuffle_upstream_traverse_query(vec![2, 3]); +/// +/// assert_eq!(query.to_string(MysqlQueryBuilder), r#"WITH RECURSIVE `shuffle_deps` (`fragment_id`, `dispatcher_type`, `dispatcher_id`) AS (SELECT DISTINCT `actor`.`fragment_id`, `actor_dispatcher`.`dispatcher_type`, `actor_dispatcher`.`dispatcher_id` FROM `actor` INNER JOIN `actor_dispatcher` ON `actor`.`actor_id` = `actor_dispatcher`.`actor_id` WHERE `actor_dispatcher`.`dispatcher_type` = 'NO_SHUFFLE' AND `actor_dispatcher`.`dispatcher_id` IN (2, 3) UNION ALL (SELECT DISTINCT `actor`.`fragment_id`, `actor_dispatcher`.`dispatcher_type`, `actor_dispatcher`.`dispatcher_id` FROM `actor` INNER JOIN `actor_dispatcher` ON `actor`.`actor_id` = `actor_dispatcher`.`actor_id` INNER JOIN `shuffle_deps` ON `shuffle_deps`.`fragment_id` = `actor_dispatcher`.`dispatcher_id` WHERE `actor_dispatcher`.`dispatcher_type` = 'NO_SHUFFLE')) SELECT DISTINCT `fragment_id`, `dispatcher_type`, `dispatcher_id` FROM `shuffle_deps`"#); +/// assert_eq!(query.to_string(PostgresQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("fragment_id", "dispatcher_type", "dispatcher_id") AS (SELECT DISTINCT "actor"."fragment_id", "actor_dispatcher"."dispatcher_type", "actor_dispatcher"."dispatcher_id" FROM "actor" INNER JOIN "actor_dispatcher" ON "actor"."actor_id" = "actor_dispatcher"."actor_id" WHERE "actor_dispatcher"."dispatcher_type" = 'NO_SHUFFLE' AND "actor_dispatcher"."dispatcher_id" IN (2, 3) UNION ALL (SELECT DISTINCT "actor"."fragment_id", "actor_dispatcher"."dispatcher_type", "actor_dispatcher"."dispatcher_id" FROM "actor" INNER JOIN "actor_dispatcher" ON "actor"."actor_id" = "actor_dispatcher"."actor_id" INNER JOIN "shuffle_deps" ON "shuffle_deps"."fragment_id" = "actor_dispatcher"."dispatcher_id" WHERE "actor_dispatcher"."dispatcher_type" = 'NO_SHUFFLE')) SELECT DISTINCT "fragment_id", "dispatcher_type", "dispatcher_id" FROM "shuffle_deps""#); +/// assert_eq!(query.to_string(SqliteQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("fragment_id", "dispatcher_type", "dispatcher_id") AS (SELECT DISTINCT "actor"."fragment_id", "actor_dispatcher"."dispatcher_type", "actor_dispatcher"."dispatcher_id" FROM "actor" INNER JOIN "actor_dispatcher" ON "actor"."actor_id" = "actor_dispatcher"."actor_id" WHERE "actor_dispatcher"."dispatcher_type" = 'NO_SHUFFLE' AND "actor_dispatcher"."dispatcher_id" IN (2, 3) UNION ALL SELECT DISTINCT "actor"."fragment_id", "actor_dispatcher"."dispatcher_type", "actor_dispatcher"."dispatcher_id" FROM "actor" INNER JOIN "actor_dispatcher" ON "actor"."actor_id" = "actor_dispatcher"."actor_id" INNER JOIN "shuffle_deps" ON "shuffle_deps"."fragment_id" = "actor_dispatcher"."dispatcher_id" WHERE "actor_dispatcher"."dispatcher_type" = 'NO_SHUFFLE') SELECT DISTINCT "fragment_id", "dispatcher_type", "dispatcher_id" FROM "shuffle_deps""#); +/// ``` +pub fn construct_no_shuffle_upstream_traverse_query(fragment_ids: Vec) -> WithQuery { + construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::UPSTREAM) +} + +pub fn construct_no_shuffle_downstream_traverse_query(fragment_ids: Vec) -> WithQuery { + construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::DOWNSTREAM) +} + +enum NoShuffleResolveDirection { + UPSTREAM, + DOWNSTREAM, +} + +fn construct_no_shuffle_traverse_query_helper( + fragment_ids: Vec, + direction: NoShuffleResolveDirection, +) -> WithQuery { + let cte_alias = Alias::new("shuffle_deps"); + + // If we need to look upwards + // resolve by fragment_id -> dispatcher_id + // and if downwards + // resolve by dispatcher_id -> fragment_id + let (cte_ref_column, compared_column) = match direction { + NoShuffleResolveDirection::UPSTREAM => ( + (cte_alias.clone(), actor::Column::FragmentId).into_column_ref(), + (ActorDispatcher, actor_dispatcher::Column::DispatcherId).into_column_ref(), + ), + NoShuffleResolveDirection::DOWNSTREAM => ( + (cte_alias.clone(), actor_dispatcher::Column::DispatcherId).into_column_ref(), + (Actor, actor::Column::FragmentId).into_column_ref(), + ), + }; + + let mut base_query = SelectStatement::new() + .column((Actor, actor::Column::FragmentId)) + .column((ActorDispatcher, actor_dispatcher::Column::DispatcherType)) + .column((ActorDispatcher, actor_dispatcher::Column::DispatcherId)) + .distinct() + .from(Actor) + .inner_join( + ActorDispatcher, + Expr::col((Actor, actor::Column::ActorId)).eq(Expr::col(( + ActorDispatcher, + actor_dispatcher::Column::ActorId, + ))), + ) + .and_where( + Expr::col((ActorDispatcher, actor_dispatcher::Column::DispatcherType)) + .eq(DispatcherType::NoShuffle), + ) + .and_where(Expr::col(compared_column.clone()).is_in(fragment_ids.clone())) + .to_owned(); + + let cte_referencing = SelectStatement::new() + .column((Actor, actor::Column::FragmentId)) + .column((ActorDispatcher, actor_dispatcher::Column::DispatcherType)) + .column((ActorDispatcher, actor_dispatcher::Column::DispatcherId)) + .distinct() + .from(Actor) + .inner_join( + ActorDispatcher, + Expr::col((Actor, actor::Column::ActorId)).eq(Expr::col(( + ActorDispatcher, + actor_dispatcher::Column::ActorId, + ))), + ) + .inner_join( + cte_alias.clone(), + Expr::col(cte_ref_column).eq(Expr::col(compared_column)), + ) + .and_where( + Expr::col((ActorDispatcher, actor_dispatcher::Column::DispatcherType)) + .eq(DispatcherType::NoShuffle), + ) + .to_owned(); + + let common_table_expr = CommonTableExpression::new() + .query(base_query.union(UnionType::All, cte_referencing).to_owned()) + .column(actor::Column::FragmentId) + .column(actor_dispatcher::Column::DispatcherType) + .column(actor_dispatcher::Column::DispatcherId) + .table_name(cte_alias.clone()) + .to_owned(); + + SelectStatement::new() + .column(actor::Column::FragmentId) + .column(actor_dispatcher::Column::DispatcherType) + .column(actor_dispatcher::Column::DispatcherId) + .distinct() + .from(cte_alias.clone()) + .to_owned() + .with( + WithClause::new() + .recursive(true) + .cte(common_table_expr) + .to_owned(), + ) + .to_owned() +} + +#[derive(Debug, Clone)] +pub struct RescheduleWorkingSet { + pub fragments: HashMap, + pub actors: HashMap, + pub actor_dispatchers: HashMap>, + + pub fragment_downstreams: HashMap>, + pub fragment_upstreams: HashMap>, +} + +async fn resolve_no_shuffle_query( + txn: &C, + query: WithQuery, +) -> MetaResult> +where + C: ConnectionTrait, +{ + let (sql, values) = query.build_any(&*txn.get_database_backend().get_query_builder()); + + let result = txn + .query_all(Statement::from_sql_and_values( + txn.get_database_backend(), + sql, + values, + )) + .await? + .into_iter() + .map(|res| res.try_get_many_by_index()) + .collect::, DbErr>>() + .map_err(MetaError::from)?; + + Ok(result) +} + +impl CatalogController { + pub async fn resolve_working_set_for_reschedule_fragments( + &self, + fragment_ids: Vec, + ) -> MetaResult { + let inner = self.inner.read().await; + self.resolve_working_set_for_reschedule_helper(&inner.db, fragment_ids) + .await + } + + pub async fn resolve_working_set_for_reschedule_tables( + &self, + table_ids: Vec, + ) -> MetaResult { + let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + + let fragment_ids: Vec = Fragment::find() + .filter(fragment::Column::JobId.is_in(table_ids)) + .all(&txn) + .await? + .into_iter() + .map(|fragment| fragment.fragment_id) + .collect(); + + self.resolve_working_set_for_reschedule_helper(&txn, fragment_ids) + .await + } + + pub async fn resolve_working_set_for_reschedule_helper( + &self, + txn: &C, + fragment_ids: Vec, + ) -> MetaResult + where + C: ConnectionTrait, + { + // NO_SHUFFLE related multi-layer upstream fragments + let no_shuffle_related_upstream_fragment_ids = resolve_no_shuffle_query( + txn, + construct_no_shuffle_upstream_traverse_query(fragment_ids.clone()), + ) + .await?; + + // NO_SHUFFLE related multi-layer downstream fragments + let no_shuffle_related_downstream_fragment_ids = resolve_no_shuffle_query( + txn, + construct_no_shuffle_downstream_traverse_query(fragment_ids.clone()), + ) + .await?; + + // We need to identify all other types of dispatchers that are Leaves in the NO_SHUFFLE dependency tree. + let extended_fragment_ids: HashSet<_> = no_shuffle_related_upstream_fragment_ids + .iter() + .chain(no_shuffle_related_downstream_fragment_ids.iter()) + .flat_map(|(src, _, dst)| [*src, *dst]) + .chain(fragment_ids.iter().cloned()) + .collect(); + + let query = Actor::find() + .select_only() + .column(actor::Column::FragmentId) + .column(actor_dispatcher::Column::DispatcherType) + .column(actor_dispatcher::Column::DispatcherId) + .distinct() + .join(JoinType::InnerJoin, actor::Relation::ActorDispatcher.def()); + + // single-layer upstream fragment ids + let upstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query + .clone() + .filter(actor_dispatcher::Column::DispatcherId.is_in(extended_fragment_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + // single-layer downstream fragment ids + let downstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query + .clone() + .filter(actor::Column::FragmentId.is_in(extended_fragment_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + let all_fragment_relations: HashSet<_> = no_shuffle_related_upstream_fragment_ids + .into_iter() + .chain(no_shuffle_related_downstream_fragment_ids.into_iter()) + .chain(upstream_fragments.into_iter()) + .chain(downstream_fragments.into_iter()) + .collect(); + + let mut fragment_upstreams: HashMap> = + HashMap::new(); + let mut fragment_downstreams: HashMap> = + HashMap::new(); + + for (src, dispatcher_type, dst) in &all_fragment_relations { + fragment_upstreams + .entry(*dst) + .or_default() + .push((*src, *dispatcher_type)); + fragment_downstreams + .entry(*src) + .or_default() + .push((*dst, *dispatcher_type)); + } + + let all_fragment_ids: HashSet<_> = all_fragment_relations + .iter() + .flat_map(|(src, _, dst)| [*src, *dst]) + .chain(extended_fragment_ids.into_iter()) + .collect(); + + let fragments: Vec<_> = Fragment::find() + .filter(fragment::Column::FragmentId.is_in(all_fragment_ids.clone())) + .all(txn) + .await?; + + let actor_and_dispatchers: Vec<(_, _)> = Actor::find() + .filter(actor::Column::FragmentId.is_in(all_fragment_ids)) + .find_with_related(ActorDispatcher) + .all(txn) + .await?; + + let mut actors = HashMap::with_capacity(actor_and_dispatchers.len()); + let mut actor_dispatchers = HashMap::with_capacity(actor_and_dispatchers.len()); + + for (actor, dispatchers) in actor_and_dispatchers { + let actor_id = actor.actor_id; + actors.insert(actor_id, actor); + actor_dispatchers.insert(actor_id, dispatchers); + } + + let fragments = fragments + .into_iter() + .map(|fragment| (fragment.fragment_id, fragment)) + .collect(); + + Ok(RescheduleWorkingSet { + fragments, + actors, + actor_dispatchers, + fragment_downstreams, + fragment_upstreams, + }) + } +} + +#[cfg(test)] +#[cfg(not(madsim))] +mod tests { + + use super::*; + + #[tokio::test] + async fn test_scale() -> MetaResult<()> { + // let srv_env = MetaSrvEnv::for_test().await; + // let mgr = CatalogController::new(srv_env)?; + // let inner = mgr.inner.read().await; + // let txn = inner.db.begin().await?; + // + // let working_set = mgr + // .resolve_working_set_for_reschedule_fragments(&txn, vec![8, 7, 6]) + // .await + // .unwrap(); + + // println!("working set {:#?}", working_set); + + Ok(()) + } +} diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index f61789c8f9372..10d1b10c6cd26 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -30,7 +30,8 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; -use risingwave_meta_model_v2::StreamingParallelism; +use risingwave_common::util::value_encoding::DatumToProtoExt; +use risingwave_meta_model_v2::{actor, fragment, ObjectId, StreamingParallelism}; use risingwave_pb::common::{ActorInfo, Buffer, PbActorLocation, WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; @@ -41,7 +42,7 @@ use risingwave_pb::meta::table_fragments::{self, ActorStatus, PbFragment, State} use risingwave_pb::meta::FragmentWorkerSlotMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - Dispatcher, DispatcherType, FragmentTypeFlag, PbStreamActor, StreamNode, + Dispatcher, DispatcherType, FragmentTypeFlag, PbDispatcher, PbStreamActor, StreamNode, }; use risingwave_pb::stream_service::build_actor_info::SubscriptionIds; use risingwave_pb::stream_service::BuildActorInfo; @@ -52,6 +53,7 @@ use tokio::task::JoinHandle; use tokio::time::{Instant, MissedTickBehavior}; use crate::barrier::{Command, Reschedule, StreamRpcManager}; +use crate::controller::scale::RescheduleWorkingSet; use crate::manager::{ IdCategory, IdGenManagerImpl, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId, }; @@ -120,7 +122,7 @@ pub struct CustomFragmentInfo { pub actors: Vec, } -#[derive(Default)] +#[derive(Default, Clone)] pub struct CustomActorInfo { pub actor_id: u32, pub fragment_id: u32, @@ -555,18 +557,125 @@ impl ScaleController { ); } } - MetadataManager::V2(_) => { - let all_table_fragments = self.list_all_table_fragments().await?; + MetadataManager::V2(mgr) => { + let fragment_ids = reschedule.keys().map(|id| *id as _).collect(); + + let RescheduleWorkingSet { + fragments, + actors, + mut actor_dispatchers, + fragment_downstreams: _, + fragment_upstreams: _, + } = mgr + .catalog_controller + .resolve_working_set_for_reschedule_fragments(fragment_ids) + .await?; - for table_fragments in &all_table_fragments { - fulfill_index_by_table_fragments_ref( - &mut actor_map, - &mut fragment_map, - &mut actor_status, - &mut fragment_state, - &mut fragment_to_table, - table_fragments, - ); + let mut fragment_actors: HashMap< + risingwave_meta_model_v2::FragmentId, + Vec, + > = HashMap::new(); + + let mut expr_contexts = HashMap::new(); + for ( + _, + actor::Model { + actor_id, + fragment_id, + status, + splits: _, + worker_id, + upstream_actor_ids, + vnode_bitmap, + expr_context, + }, + ) in actors + { + let dispatchers = actor_dispatchers + .remove(&actor_id) + .unwrap_or_default() + .into_iter() + .map(PbDispatcher::from) + .collect(); + + let actor_info = CustomActorInfo { + actor_id: actor_id as _, + fragment_id: fragment_id as _, + dispatcher: dispatchers, + upstream_actor_id: upstream_actor_ids + .into_inner() + .values() + .flatten() + .map(|id| *id as _) + .collect(), + vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf()), + }; + + actor_map.insert(actor_id as _, actor_info.clone()); + + fragment_actors + .entry(fragment_id as _) + .or_default() + .push(actor_info); + + actor_status.insert(actor_id as _, worker_id as WorkerId); + + expr_contexts.insert(actor_id as u32, expr_context); + } + + for ( + _, + fragment::Model { + fragment_id, + job_id, + fragment_type_mask, + distribution_type, + stream_node, + state_table_ids, + upstream_fragment_id, + }, + ) in fragments + { + let actors = fragment_actors + .remove(&(fragment_id as _)) + .unwrap_or_default(); + + let CustomActorInfo { + actor_id, + fragment_id, + dispatcher, + upstream_actor_id, + vnode_bitmap, + } = actors.first().unwrap().clone(); + + let fragment = CustomFragmentInfo { + fragment_id: fragment_id as _, + fragment_type_mask: fragment_type_mask as _, + distribution_type: distribution_type.into(), + state_table_ids: state_table_ids.into_u32_array(), + upstream_fragment_ids: upstream_fragment_id.into_u32_array(), + actor_template: PbStreamActor { + nodes: Some(stream_node.to_protobuf()), + actor_id, + fragment_id: fragment_id as _, + dispatcher, + upstream_actor_id, + vnode_bitmap, + mview_definition: "".to_string(), + expr_context: expr_contexts + .get(&actor_id) + .cloned() + .map(|expr_context| expr_context.to_protobuf()), + }, + actors, + }; + + fragment_map.insert(fragment_id as _, fragment); + + fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32)); + + // todo + fragment_state.insert(fragment_id, table_fragments::State::Created); } } }; @@ -1975,22 +2084,64 @@ impl ScaleController { guard.table_fragments(), )?; } - MetadataManager::V2(_) => { - let all_table_fragments = self.list_all_table_fragments().await?; - let all_table_fragments = all_table_fragments - .into_iter() - .map(|table_fragments| (table_fragments.table_id(), table_fragments)) - .collect::>(); + MetadataManager::V2(mgr) => { + let table_ids = table_parallelisms + .keys() + .map(|id| *id as ObjectId) + .collect(); - build_index( - &mut no_shuffle_source_fragment_ids, - &mut no_shuffle_target_fragment_ids, - &mut fragment_distribution_map, - &mut actor_location, - &mut table_fragment_id_map, - &mut fragment_actor_id_map, - &all_table_fragments, - )?; + let RescheduleWorkingSet { + fragments, + actors, + actor_dispatchers: _, + fragment_downstreams, + fragment_upstreams: _, + } = mgr + .catalog_controller + .resolve_working_set_for_reschedule_tables(table_ids) + .await?; + + for (fragment_id, downstreams) in fragment_downstreams { + for (downstream_fragment_id, dispatcher_type) in downstreams { + println!( + "fragment_id {} downstream {} dispatcher {:?}", + fragment_id, downstream_fragment_id, dispatcher_type + ); + if let risingwave_meta_model_v2::actor_dispatcher::DispatcherType::NoShuffle = dispatcher_type { + no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId); + no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId); + } + } + } + + println!("no shuffle src {:?}", no_shuffle_source_fragment_ids); + println!("no shuffle dst {:?}", no_shuffle_target_fragment_ids); + + for (fragment_id, fragment) in fragments { + fragment_distribution_map.insert( + fragment_id as FragmentId, + FragmentDistributionType::from(fragment.distribution_type), + ); + + table_fragment_id_map + .entry(fragment.job_id as u32) + .or_default() + .insert(fragment_id as FragmentId); + } + + println!("frag dist map {:#?}", fragment_distribution_map); + println!("table frag id map {:?}", table_fragment_id_map); + + for (actor_id, actor) in actors { + actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId); + fragment_actor_id_map + .entry(actor.fragment_id as FragmentId) + .or_default() + .insert(actor_id as ActorId); + } + + // println!("actor status {:#?}", actor_status); + println!("frag actor {:#?}", fragment_actor_id_map); } } @@ -2086,6 +2237,8 @@ impl ScaleController { target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty()); + println!("target plan {:#?}", target_plan); + Ok(target_plan) }