diff --git a/proto/meta.proto b/proto/meta.proto index 48eb72f7f3f97..4570e3749e7e3 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -98,6 +98,7 @@ message TableFragments { map actor_splits = 5; stream_plan.StreamContext ctx = 6; + TableParallelism parallelism = 7; } /// Parallel unit mapping with fragment id, used for notification. @@ -461,6 +462,22 @@ message RescheduleResponse { uint64 revision = 2; } +message TableParallelism { + message FixedParallelism { + uint32 parallelism = 1; + } + + message AutoParallelism {} + + message CustomParallelism {} + + oneof parallelism { + FixedParallelism fixed = 1; + AutoParallelism auto = 2; + CustomParallelism custom = 3; + } +} + message GetReschedulePlanRequest { uint64 revision = 1; diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 333b94cd6b620..92b97a8f1fe0a 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -300,6 +300,11 @@ impl ParallelUnitMapping { Self::new_uniform(parallel_units.iter().map(|pu| pu.id)) } + /// Create a uniform parallel unit mapping from the given parallel units ids + pub fn build_from_ids(parallel_unit_ids: &[ParallelUnitId]) -> Self { + Self::new_uniform(parallel_unit_ids.iter().cloned()) + } + /// Transform this parallel unit mapping to an actor mapping, essentially `transform`. pub fn to_actor(&self, to_map: &HashMap) -> ActorMapping { self.transform(to_map) diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 0fecc15c2e4db..efe310d0e9d58 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; +use risingwave_meta::model::TableParallelism; use risingwave_meta::stream::{ScaleController, ScaleControllerRef}; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::scale_service_server::ScaleService; @@ -145,6 +147,22 @@ impl ScaleService for ScaleServiceImpl { })); } + let table_parallelisms = { + let guard = self.fragment_manager.get_fragment_read_guard().await; + + let mut table_parallelisms = HashMap::new(); + for (table_id, table) in guard.table_fragments() { + if table + .fragment_ids() + .any(|fragment_id| reschedules.contains_key(&fragment_id)) + { + table_parallelisms.insert(*table_id, TableParallelism::Custom); + } + } + + table_parallelisms + }; + self.stream_manager .reschedule_actors( reschedules @@ -170,6 +188,7 @@ impl ScaleService for ScaleServiceImpl { RescheduleOptions { resolve_no_shuffle_upstream, }, + Some(table_parallelisms), ) .await?; diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 59f54471d5206..b0bfdccebafc4 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -41,7 +41,7 @@ use super::trace::TracedEpoch; use crate::barrier::CommandChanges; use crate::hummock::HummockManagerRef; use crate::manager::{CatalogManagerRef, DdlType, FragmentManagerRef, WorkerId}; -use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments}; +use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::stream::{ build_actor_connector_splits, ScaleControllerRef, SourceManagerRef, SplitAssignment, ThrottleConfig, @@ -146,6 +146,7 @@ pub enum Command { /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively. RescheduleFragment { reschedules: HashMap, + table_parallelism: HashMap, }, /// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is @@ -891,10 +892,13 @@ impl CommandContext { .await; } - Command::RescheduleFragment { reschedules } => { + Command::RescheduleFragment { + reschedules, + table_parallelism, + } => { let node_dropped_actors = self .scale_controller - .post_apply_reschedule(reschedules) + .post_apply_reschedule(reschedules, table_parallelism) .await?; self.clean_up(node_dropped_actors).await?; } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 313dfce976410..ce8f8550f5611 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -23,7 +23,6 @@ use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_pb::common::ActorInfo; -use risingwave_pb::meta::get_reschedule_plan_request::{PbWorkerChanges, StableResizePolicy}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::barrier_mutation::Mutation; @@ -44,7 +43,7 @@ use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager}; use crate::manager::WorkerId; use crate::model::{BarrierManagerState, MigrationPlan}; -use crate::stream::{build_actor_connector_splits, RescheduleOptions}; +use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::MetaResult; impl GlobalBarrierManager { @@ -416,38 +415,39 @@ impl GlobalBarrierManager { return Ok(false); } - let all_worker_parallel_units = self.fragment_manager.all_worker_parallel_units().await; - - let expired_worker_parallel_units: HashMap<_, _> = all_worker_parallel_units - .into_iter() - .filter(|(worker, _)| expired_workers.contains(worker)) - .collect(); - - let fragment_worker_changes = { + let table_parallelisms = { let guard = self.fragment_manager.get_fragment_read_guard().await; - let mut policy = HashMap::new(); - for table_fragments in guard.table_fragments().values() { - for fragment_id in table_fragments.fragment_ids() { - policy.insert( - fragment_id, - PbWorkerChanges { - exclude_worker_ids: expired_workers.iter().cloned().collect(), - ..Default::default() - }, - ); - } - } - policy + + guard + .table_fragments() + .iter() + .map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism)) + .collect() }; + let workers = self + .cluster_manager + .list_active_streaming_compute_nodes() + .await; + + let schedulable_worker_ids = workers + .iter() + .filter(|worker| { + !worker + .property + .as_ref() + .map(|p| p.is_unschedulable) + .unwrap_or(false) + }) + .map(|worker| worker.id) + .collect(); + let plan = self .scale_controller - .generate_stable_resize_plan( - StableResizePolicy { - fragment_worker_changes, - }, - Some(expired_worker_parallel_units), - ) + .generate_table_resize_plan(TableResizePolicy { + worker_ids: schedulable_worker_ids, + table_parallelisms, + }) .await?; let (reschedule_fragment, applied_reschedules) = self @@ -457,12 +457,13 @@ impl GlobalBarrierManager { RescheduleOptions { resolve_no_shuffle_upstream: true, }, + None, ) .await?; if let Err(e) = self .scale_controller - .post_apply_reschedule(&reschedule_fragment) + .post_apply_reschedule(&reschedule_fragment, &Default::default()) .await { tracing::error!( diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 1ddb8d90213c3..fbc23f58f1b2d 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -331,6 +331,8 @@ impl CatalogController { actor_status: pb_actor_status, actor_splits: pb_actor_splits, ctx: Some(ctx.unwrap_or_default()), + // TODO(peng): fix this for model v2 + parallelism: None, }; Ok(table_fragments) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 3d88174da3242..c2ed97fc07fd6 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -42,7 +42,7 @@ use crate::manager::cluster::WorkerId; use crate::manager::{commit_meta, commit_meta_with_trx, LocalNotification, MetaSrvEnv}; use crate::model::{ ActorId, BTreeMapTransaction, FragmentId, MetadataModel, MigrationPlan, TableFragments, - ValTransaction, + TableParallelism, ValTransaction, }; use crate::storage::Transaction; use crate::stream::{SplitAssignment, TableRevision}; @@ -1111,6 +1111,7 @@ impl FragmentManager { pub async fn post_apply_reschedules( &self, mut reschedules: HashMap, + table_parallelism_assignment: HashMap, ) -> MetaResult<()> { let mut guard = self.core.write().await; let current_version = guard.table_revision; @@ -1178,6 +1179,8 @@ impl FragmentManager { }) .collect_vec(); + let mut table_fragment = table_fragments.get_mut(table_id).unwrap(); + for (fragment_id, reschedule) in reschedules { let Reschedule { added_actors, @@ -1189,8 +1192,6 @@ impl FragmentManager { actor_splits, } = reschedule; - let mut table_fragment = table_fragments.get_mut(table_id).unwrap(); - // First step, update self fragment // Add actors to this fragment: set the state to `Running`. for actor_id in &added_actors { @@ -1328,6 +1329,12 @@ impl FragmentManager { } } + for (table_id, parallelism) in table_parallelism_assignment { + if let Some(mut table) = table_fragments.get_mut(table_id) { + table.assigned_parallelism = parallelism; + } + } + assert!(reschedules.is_empty(), "all reschedules must be applied"); // new empty transaction diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 51f8cff08d39b..faa6c4539c46c 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -22,7 +22,11 @@ use risingwave_connector::source::SplitImpl; use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; -use risingwave_pb::meta::PbTableFragments; +use risingwave_pb::meta::table_parallelism::{ + FixedParallelism, Parallelism, PbAutoParallelism, PbCustomParallelism, PbFixedParallelism, + PbParallelism, +}; +use risingwave_pb::meta::{PbTableFragments, PbTableParallelism}; use risingwave_pb::plan_common::PbExprContext; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ @@ -37,6 +41,43 @@ use crate::stream::{build_actor_connector_splits, build_actor_split_impls, Split /// Column family name for table fragments. const TABLE_FRAGMENTS_CF_NAME: &str = "cf/table_fragments"; +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum TableParallelism { + Auto, + Fixed(usize), + Custom, +} + +impl From for TableParallelism { + fn from(value: PbTableParallelism) -> Self { + use Parallelism::*; + match &value.parallelism { + Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize), + Some(Auto(_)) => Self::Auto, + Some(Custom(_)) => Self::Custom, + _ => Self::Auto, + } + } +} + +impl From for PbTableParallelism { + fn from(value: TableParallelism) -> Self { + use TableParallelism::*; + + let parallelism = match value { + Auto => PbParallelism::Auto(PbAutoParallelism {}), + Fixed(n) => PbParallelism::Fixed(PbFixedParallelism { + parallelism: n as u32, + }), + Custom => PbParallelism::Custom(PbCustomParallelism {}), + }; + + Self { + parallelism: Some(parallelism), + } + } +} + /// Fragments of a streaming job. /// /// We store whole fragments in a single column family as follow: @@ -60,6 +101,9 @@ pub struct TableFragments { /// The streaming context associated with this stream plan and its fragments pub ctx: StreamContext, + + /// The parallelism assigned to this table fragments + pub assigned_parallelism: TableParallelism, } #[derive(Debug, Clone, Default)] @@ -109,11 +153,17 @@ impl MetadataModel for TableFragments { actor_status: self.actor_status.clone().into_iter().collect(), actor_splits: build_actor_connector_splits(&self.actor_splits), ctx: Some(self.ctx.to_protobuf()), + parallelism: Some(self.assigned_parallelism.into()), } } fn from_protobuf(prost: Self::PbType) -> Self { let ctx = StreamContext::from_protobuf(prost.get_ctx().unwrap()); + + let default_parallelism = PbTableParallelism { + parallelism: Some(Parallelism::Custom(PbCustomParallelism {})), + }; + Self { table_id: TableId::new(prost.table_id), state: prost.state(), @@ -121,6 +171,7 @@ impl MetadataModel for TableFragments { actor_status: prost.actor_status.into_iter().collect(), actor_splits: build_actor_split_impls(&prost.actor_splits), ctx, + assigned_parallelism: prost.parallelism.unwrap_or(default_parallelism).into(), } } @@ -137,6 +188,7 @@ impl TableFragments { fragments, &BTreeMap::new(), StreamContext::default(), + TableParallelism::Auto, ) } @@ -147,6 +199,7 @@ impl TableFragments { fragments: BTreeMap, actor_locations: &BTreeMap, ctx: StreamContext, + table_parallelism: TableParallelism, ) -> Self { let actor_status = actor_locations .iter() @@ -168,6 +221,7 @@ impl TableFragments { actor_status, actor_splits: HashMap::default(), ctx, + assigned_parallelism: table_parallelism, } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 1c70efa2c6d76..0fd3c9fe40b99 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -57,7 +57,7 @@ use crate::manager::{ SchemaId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, TableId, UserId, ViewId, IGNORED_NOTIFICATION_VERSION, }; -use crate::model::{FragmentId, StreamContext, TableFragments}; +use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; use crate::rpc::cloud_provider::AwsEc2Client; use crate::stream::{ validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, @@ -1085,11 +1085,11 @@ impl DdlController { // 2. Build the actor graph. let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; - let default_parallelism = - self.resolve_stream_parallelism(default_parallelism, &cluster_info)?; + + let parallelism = self.resolve_stream_parallelism(default_parallelism, &cluster_info)?; let actor_graph_builder = - ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; + ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?; let ActorGraphBuildResult { graph, @@ -1105,11 +1105,18 @@ impl DdlController { // 3. Build the table fragments structure that will be persisted in the stream manager, // and the context that contains all information needed for building the // actors on the compute nodes. + + let table_parallelism = match default_parallelism { + None => TableParallelism::Auto, + Some(parallelism) => TableParallelism::Fixed(parallelism.get()), + }; + let table_fragments = TableFragments::new( id.into(), graph, &building_locations.actor_locations, stream_ctx.clone(), + table_parallelism, ); let replace_table_job_info = match affected_table_replace_info { @@ -1464,10 +1471,10 @@ impl DdlController { // 2. Build the actor graph. let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; - let default_parallelism = - self.resolve_stream_parallelism(default_parallelism, &cluster_info)?; + + let parallelism = self.resolve_stream_parallelism(default_parallelism, &cluster_info)?; let actor_graph_builder = - ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; + ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?; let ActorGraphBuildResult { graph, @@ -1491,6 +1498,11 @@ impl DdlController { .generate::<{ IdCategory::Table }>() .await? as u32; + let table_parallelism = match default_parallelism { + None => TableParallelism::Auto, + Some(parallelism) => TableParallelism::Fixed(parallelism.get()), + }; + // 4. Build the table fragments structure that will be persisted in the stream manager, and // the context that contains all information needed for building the actors on the compute // nodes. @@ -1499,6 +1511,8 @@ impl DdlController { graph, &building_locations.actor_locations, stream_ctx, + // todo: shall we use the old table fragments' parallelism + table_parallelism, ); let ctx = ReplaceTableContext { diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 0dae4206cc4a8..87cdd78a59d66 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -13,7 +13,9 @@ // limitations under the License. use std::cmp::{min, Ordering}; +use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; +use std::hash::{Hash, Hasher}; use std::iter::repeat; use std::sync::Arc; use std::time::Duration; @@ -25,12 +27,11 @@ use num_integer::Integer; use num_traits::abs; use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; +use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, ParallelUnitId, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_pb::common::{ActorInfo, ParallelUnit, WorkerNode}; -use risingwave_pb::meta::get_reschedule_plan_request::{ - PbWorkerChanges, Policy, StableResizePolicy, -}; +use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; @@ -51,7 +52,7 @@ use crate::barrier::{Command, Reschedule}; use crate::manager::{ ClusterManagerRef, FragmentManagerRef, IdCategory, LocalNotification, MetaSrvEnv, WorkerId, }; -use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments}; +use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::serving::{ to_deleted_fragment_parallel_unit_mapping, to_fragment_parallel_unit_mapping, ServingVnodeMapping, @@ -395,6 +396,7 @@ impl ScaleController { &self, reschedule: &mut HashMap, options: RescheduleOptions, + table_parallelisms: Option<&mut HashMap>, ) -> MetaResult { // Index worker node, used to create actor let worker_nodes: HashMap = self @@ -457,6 +459,7 @@ impl ScaleController { // Index for actor status, including actor's parallel unit let mut actor_status = BTreeMap::new(); let mut fragment_state = HashMap::new(); + let mut fragment_to_table = HashMap::new(); for table_fragments in self.fragment_manager.list_table_fragments().await { fragment_state.extend( table_fragments @@ -466,6 +469,12 @@ impl ScaleController { fragment_map.extend(table_fragments.fragments.clone()); actor_map.extend(table_fragments.actor_map()); actor_status.extend(table_fragments.actor_status.clone()); + + fragment_to_table.extend( + table_fragments + .fragment_ids() + .map(|f| (f, table_fragments.table_id())), + ); } // NoShuffle relation index @@ -479,12 +488,26 @@ impl ScaleController { ); if options.resolve_no_shuffle_upstream { - Self::resolve_no_shuffle_upstream( + let original_reschedule_keys = reschedule.keys().cloned().collect(); + + Self::resolve_no_shuffle_upstream_fragments( reschedule, &fragment_map, &no_shuffle_source_fragment_ids, &no_shuffle_target_fragment_ids, )?; + + if let Some(table_parallelisms) = table_parallelisms { + // We need to reiterate through the NO_SHUFFLE dependencies in order to ascertain which downstream table the custom modifications of the table have been propagated from. + Self::resolve_no_shuffle_upstream_tables( + original_reschedule_keys, + &fragment_map, + &no_shuffle_source_fragment_ids, + &no_shuffle_target_fragment_ids, + &fragment_to_table, + table_parallelisms, + )?; + } } let mut fragment_dispatcher_map = HashMap::new(); @@ -723,12 +746,13 @@ impl ScaleController { &self, mut reschedules: HashMap, options: RescheduleOptions, + table_parallelisms: Option<&mut HashMap>, ) -> MetaResult<( HashMap, HashMap>, )> { let ctx = self - .build_reschedule_context(&mut reschedules, options) + .build_reschedule_context(&mut reschedules, options, table_parallelisms) .await?; // Index of actors to create/remove // Fragment Id => ( Actor Id => Parallel Unit Id ) @@ -1508,6 +1532,7 @@ impl ScaleController { pub async fn post_apply_reschedule( &self, reschedules: &HashMap, + table_parallelism: &HashMap, ) -> MetaResult>> { let mut node_dropped_actors = HashMap::new(); for table_fragments in self @@ -1539,7 +1564,7 @@ impl ScaleController { // Update fragment info after rescheduling in meta store. self.fragment_manager - .post_apply_reschedules(reschedules.clone()) + .post_apply_reschedules(reschedules.clone(), table_parallelism.clone()) .await?; // Update serving fragment info after rescheduling in meta store. @@ -1608,6 +1633,200 @@ impl ScaleController { Ok(node_dropped_actors) } + pub async fn generate_table_resize_plan( + &self, + policy: TableResizePolicy, + ) -> MetaResult> { + let TableResizePolicy { + worker_ids, + table_parallelisms, + } = policy; + + let workers = self + .cluster_manager + .list_active_streaming_compute_nodes() + .await; + + let unschedulable_worker_ids = Self::filter_unschedulable_workers(&workers); + + for worker_id in &worker_ids { + if unschedulable_worker_ids.contains(worker_id) { + bail!("Cannot include unscheduable worker {}", worker_id) + } + } + + let workers = workers + .into_iter() + .filter(|worker| worker_ids.contains(&worker.id)) + .collect::>(); + + let worker_parallel_units = workers + .iter() + .map(|worker| { + ( + worker.id, + worker + .parallel_units + .iter() + .map(|parallel_unit| parallel_unit.id as ParallelUnitId) + .collect::>(), + ) + }) + .collect::>(); + + let all_table_fragments = self.fragment_manager.list_table_fragments().await; + + // FIXME: only need actor id and dispatcher info, avoid clone it. + let mut actor_map = HashMap::new(); + let mut actor_status = HashMap::new(); + // FIXME: only need fragment distribution info, should avoid clone it. + + let mut table_fragment_map = HashMap::new(); + for table_fragments in all_table_fragments { + let table_id = table_fragments.table_id().table_id; + for (fragment_id, fragment) in table_fragments.fragments { + fragment + .actors + .iter() + .map(|actor| (actor.actor_id, actor)) + .for_each(|(id, actor)| { + actor_map.insert(id as ActorId, actor.clone()); + }); + + table_fragment_map + .entry(table_id) + .or_insert(HashMap::new()) + .insert(fragment_id, fragment); + } + + actor_status.extend(table_fragments.actor_status); + } + + let mut no_shuffle_source_fragment_ids = HashSet::new(); + let mut no_shuffle_target_fragment_ids = HashSet::new(); + + Self::build_no_shuffle_relation_index( + &actor_map, + &mut no_shuffle_source_fragment_ids, + &mut no_shuffle_target_fragment_ids, + ); + + let mut target_plan = HashMap::new(); + + for (table_id, parallelism) in table_parallelisms { + let fragment_map = table_fragment_map.remove(&table_id).unwrap(); + + for (fragment_id, fragment) in fragment_map { + // Currently, all of our NO_SHUFFLE relation propagations are only transmitted from upstream to downstream. + if no_shuffle_target_fragment_ids.contains(&fragment_id) { + continue; + } + + let fragment_parallel_unit_ids: BTreeSet<_> = fragment + .actors + .iter() + .map(|actor| { + actor_status + .get(&actor.actor_id) + .and_then(|status| status.parallel_unit.clone()) + .unwrap() + .id as ParallelUnitId + }) + .collect(); + + let all_available_parallel_unit_ids: BTreeSet<_> = + worker_parallel_units.values().flatten().cloned().collect(); + + if all_available_parallel_unit_ids.is_empty() { + bail!( + "No schedulable ParallelUnits available for fragment {}", + fragment_id + ); + } + + match fragment.get_distribution_type().unwrap() { + FragmentDistributionType::Unspecified => unreachable!(), + FragmentDistributionType::Single => { + let single_parallel_unit_id = + fragment_parallel_unit_ids.iter().exactly_one().unwrap(); + + if all_available_parallel_unit_ids.contains(single_parallel_unit_id) { + // NOTE: shall we continue? + continue; + } + + let units = schedule_units_for_slots(&worker_parallel_units, 1, table_id)?; + + let chosen_target_parallel_unit_id = units + .values() + .flatten() + .cloned() + .exactly_one() + .map_err(|e| { + anyhow!( + "Cannot find a single target ParallelUnit for fragment {}: {}", + fragment_id, + e + ) + })?; + + target_plan.insert( + fragment_id, + ParallelUnitReschedule { + added_parallel_units: BTreeSet::from([ + chosen_target_parallel_unit_id, + ]), + removed_parallel_units: BTreeSet::from([*single_parallel_unit_id]), + }, + ); + } + FragmentDistributionType::Hash => match parallelism { + TableParallelism::Auto => { + target_plan.insert( + fragment_id, + Self::diff_parallel_unit_change( + &fragment_parallel_unit_ids, + &all_available_parallel_unit_ids, + ), + ); + } + TableParallelism::Fixed(n) => { + if n > all_available_parallel_unit_ids.len() { + bail!( + "Not enough ParallelUnits available for fragment {}", + fragment_id + ); + } + + let rebalance_result = + schedule_units_for_slots(&worker_parallel_units, n, table_id)?; + + let target_parallel_unit_ids = + rebalance_result.into_values().flatten().collect(); + + target_plan.insert( + fragment_id, + Self::diff_parallel_unit_change( + &fragment_parallel_unit_ids, + &target_parallel_unit_ids, + ), + ); + } + TableParallelism::Custom => { + // skipping for custom + } + }, + } + } + } + + target_plan.retain(|_, plan| { + !(plan.added_parallel_units.is_empty() && plan.removed_parallel_units.is_empty()) + }); + + Ok(target_plan) + } + pub async fn generate_stable_resize_plan( &self, policy: StableResizePolicy, @@ -1624,17 +1843,7 @@ impl ScaleController { .list_active_streaming_compute_nodes() .await; - let unschedulable_worker_ids: HashSet<_> = workers - .iter() - .filter(|worker| { - worker - .property - .as_ref() - .map(|p| p.is_unschedulable) - .unwrap_or(false) - }) - .map(|worker| worker.id as WorkerId) - .collect(); + let unschedulable_worker_ids = Self::filter_unschedulable_workers(&workers); for changes in fragment_worker_changes.values() { for worker_id in &changes.include_worker_ids { @@ -1665,6 +1874,7 @@ impl ScaleController { let mut actor_status = HashMap::new(); // FIXME: only need fragment distribution info, should avoid clone it. let mut fragment_map = HashMap::new(); + let mut fragment_parallelism = HashMap::new(); for table_fragments in all_table_fragments { for (fragment_id, fragment) in table_fragments.fragments { @@ -1677,6 +1887,8 @@ impl ScaleController { }); fragment_map.insert(fragment_id, fragment); + + fragment_parallelism.insert(fragment_id, table_fragments.assigned_parallelism); } actor_status.extend(table_fragments.actor_status); @@ -1719,7 +1931,7 @@ impl ScaleController { }) .collect(); - Self::resolve_no_shuffle_upstream( + Self::resolve_no_shuffle_upstream_fragments( &mut fragment_worker_changes, &fragment_map, &no_shuffle_source_fragment_ids, @@ -1824,7 +2036,6 @@ impl ScaleController { // then we re-add the limited parallel units from the limited workers target_parallel_unit_ids.extend(limited_worker_parallel_unit_ids.into_iter()); } - match fragment.get_distribution_type().unwrap() { FragmentDistributionType::Unspecified => unreachable!(), FragmentDistributionType::Single => { @@ -1913,22 +2124,12 @@ impl ScaleController { _ => {} } - let to_expand_parallel_units = target_parallel_unit_ids - .difference(&fragment_parallel_unit_ids) - .cloned() - .collect(); - - let to_shrink_parallel_units = fragment_parallel_unit_ids - .difference(&target_parallel_unit_ids) - .cloned() - .collect(); - target_plan.insert( fragment_id, - ParallelUnitReschedule { - added_parallel_units: to_expand_parallel_units, - removed_parallel_units: to_shrink_parallel_units, - }, + Self::diff_parallel_unit_change( + &fragment_parallel_unit_ids, + &target_parallel_unit_ids, + ), ); } } @@ -1941,6 +2142,40 @@ impl ScaleController { Ok(target_plan) } + fn filter_unschedulable_workers(workers: &[WorkerNode]) -> HashSet { + workers + .iter() + .filter(|worker| { + worker + .property + .as_ref() + .map(|p| p.is_unschedulable) + .unwrap_or(false) + }) + .map(|worker| worker.id as WorkerId) + .collect() + } + + fn diff_parallel_unit_change( + fragment_parallel_unit_ids: &BTreeSet, + target_parallel_unit_ids: &BTreeSet, + ) -> ParallelUnitReschedule { + let to_expand_parallel_units = target_parallel_unit_ids + .difference(fragment_parallel_unit_ids) + .cloned() + .collect(); + + let to_shrink_parallel_units = fragment_parallel_unit_ids + .difference(target_parallel_unit_ids) + .cloned() + .collect(); + + ParallelUnitReschedule { + added_parallel_units: to_expand_parallel_units, + removed_parallel_units: to_shrink_parallel_units, + } + } + pub async fn get_reschedule_plan( &self, policy: Policy, @@ -1957,7 +2192,12 @@ impl ScaleController { no_shuffle_source_fragment_ids: &mut HashSet, no_shuffle_target_fragment_ids: &mut HashSet, ) { + let mut fragment_cache = HashSet::new(); for actor in actor_map.values() { + if fragment_cache.contains(&actor.fragment_id) { + continue; + } + for dispatcher in &actor.dispatcher { for downstream_actor_id in &dispatcher.downstream_actor_id { if let Some(downstream_actor) = actor_map.get(downstream_actor_id) { @@ -1970,6 +2210,8 @@ impl ScaleController { } } } + + fragment_cache.insert(actor.fragment_id); } } @@ -1994,7 +2236,77 @@ impl ScaleController { } } - pub fn resolve_no_shuffle_upstream( + pub fn resolve_no_shuffle_upstream_tables( + fragment_ids: HashSet, + fragment_map: &HashMap, + no_shuffle_source_fragment_ids: &HashSet, + no_shuffle_target_fragment_ids: &HashSet, + fragment_to_table: &HashMap, + table_parallelisms: &mut HashMap, + ) -> MetaResult<()> { + let mut queue: VecDeque = fragment_ids.iter().cloned().collect(); + + let mut fragment_ids = fragment_ids; + + // We trace the upstreams of each downstream under the hierarchy until we reach the top + // for every no_shuffle relation. + while let Some(fragment_id) = queue.pop_front() { + if !no_shuffle_target_fragment_ids.contains(&fragment_id) + && !no_shuffle_source_fragment_ids.contains(&fragment_id) + { + continue; + } + + // for upstream + for upstream_fragment_id in &fragment_map + .get(&fragment_id) + .unwrap() + .upstream_fragment_ids + { + if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) { + continue; + } + + let table_id = fragment_to_table.get(&fragment_id).unwrap(); + let upstream_table_id = fragment_to_table.get(upstream_fragment_id).unwrap(); + + // Only custom parallelism will be propagated to the no shuffle upstream. + if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) { + if let Some(upstream_table_parallelism) = + table_parallelisms.get(upstream_table_id) + { + if upstream_table_parallelism != &TableParallelism::Custom { + bail!( + "Cannot change upstream table {} from {:?} to {:?}", + upstream_table_id, + upstream_table_parallelism, + TableParallelism::Custom + ) + } + } else { + table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom); + } + } + + fragment_ids.insert(*upstream_fragment_id); + queue.push_back(*upstream_fragment_id); + } + } + + let downstream_fragment_ids = fragment_ids + .iter() + .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id)); + + let downstream_table_ids = downstream_fragment_ids + .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap()) + .collect::>(); + + table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id)); + + Ok(()) + } + + pub fn resolve_no_shuffle_upstream_fragments( reschedule: &mut HashMap, fragment_map: &HashMap, no_shuffle_source_fragment_ids: &HashSet, @@ -2035,6 +2347,7 @@ impl ScaleController { } reschedule.insert(*upstream_fragment_id, reschedule_plan.clone()); + queue.push_back(*upstream_fragment_id); } } @@ -2045,15 +2358,23 @@ impl ScaleController { } } +// At present, for table level scaling, we use the strategy TableResizePolicy. +// Currently, this is used as an internal interface, so it won’t be included in Protobuf for the time being. +pub struct TableResizePolicy { + pub(crate) worker_ids: BTreeSet, + pub(crate) table_parallelisms: HashMap, +} + impl GlobalStreamManager { pub async fn reschedule_actors( &self, reschedules: HashMap, options: RescheduleOptions, + table_parallelism: Option>, ) -> MetaResult<()> { let mut revert_funcs = vec![]; if let Err(e) = self - .reschedule_actors_impl(&mut revert_funcs, reschedules, options) + .reschedule_actors_impl(&mut revert_funcs, reschedules, options, table_parallelism) .await { for revert_func in revert_funcs.into_iter().rev() { @@ -2070,16 +2391,20 @@ impl GlobalStreamManager { revert_funcs: &mut Vec>, reschedules: HashMap, options: RescheduleOptions, + table_parallelism: Option>, ) -> MetaResult<()> { + let mut table_parallelism = table_parallelism; + let (reschedule_fragment, applied_reschedules) = self .scale_controller - .prepare_reschedule_command(reschedules, options) + .prepare_reschedule_command(reschedules, options, table_parallelism.as_mut()) .await?; tracing::debug!("reschedule plan: {:#?}", reschedule_fragment); let command = Command::RescheduleFragment { reschedules: reschedule_fragment, + table_parallelism: table_parallelism.unwrap_or_default(), }; let fragment_manager_ref = self.fragment_manager.clone(); @@ -2099,34 +2424,42 @@ impl GlobalStreamManager { Ok(()) } - async fn trigger_scale_out(&self, workers: Vec) -> MetaResult<()> { + async fn trigger_parallelism_control(&self) -> MetaResult<()> { let _reschedule_job_lock = self.reschedule_lock.write().await; - let fragment_worker_changes = { + let table_parallelisms = { let guard = self.fragment_manager.get_fragment_read_guard().await; - let mut fragment_worker_changes = HashMap::new(); - for table_fragments in guard.table_fragments().values() { - for fragment_id in table_fragments.fragment_ids() { - fragment_worker_changes.insert( - fragment_id, - PbWorkerChanges { - include_worker_ids: workers.clone(), - ..Default::default() - }, - ); - } - } - fragment_worker_changes + + guard + .table_fragments() + .iter() + .map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism)) + .collect() }; + let workers = self + .cluster_manager + .list_active_streaming_compute_nodes() + .await; + + let schedulable_worker_ids = workers + .iter() + .filter(|worker| { + !worker + .property + .as_ref() + .map(|p| p.is_unschedulable) + .unwrap_or(false) + }) + .map(|worker| worker.id) + .collect(); + let reschedules = self .scale_controller - .generate_stable_resize_plan( - StableResizePolicy { - fragment_worker_changes, - }, - None, - ) + .generate_table_resize_plan(TableResizePolicy { + worker_ids: schedulable_worker_ids, + table_parallelisms, + }) .await?; if reschedules.is_empty() { @@ -2138,6 +2471,7 @@ impl GlobalStreamManager { RescheduleOptions { resolve_no_shuffle_upstream: true, }, + None, ) .await?; @@ -2188,9 +2522,8 @@ impl GlobalStreamManager { continue; } - match self.trigger_scale_out(include_workers).await { + match self.trigger_parallelism_control().await { Ok(_) => { - worker_cache.clear(); changed = false; } Err(e) => { @@ -2247,3 +2580,221 @@ impl GlobalStreamManager { (join_handle, shutdown_tx) } } + +// We redistribute parallel units (which will be ensembles in the future) through a simple consistent hashing ring. +// Note that we have added some simple logic here to ensure the consistency of the ratio between each slot, +// especially when equal division is needed. +pub fn schedule_units_for_slots( + slots: &BTreeMap>, + total_unit_size: usize, + salt: u32, +) -> MetaResult>> { + let mut ch = ConsistentHashRing::new(salt); + + for (worker_id, parallel_unit_ids) in slots { + ch.add_worker(*worker_id, parallel_unit_ids.len() as u32); + } + + let target_distribution = ch.distribute_tasks(total_unit_size as u32)?; + + Ok(slots + .iter() + .map(|(worker_id, parallel_unit_ids)| { + ( + *worker_id, + parallel_unit_ids + .iter() + .take( + target_distribution + .get(worker_id) + .cloned() + .unwrap_or_default() as usize, + ) + .cloned() + .collect::>(), + ) + }) + .collect()) +} + +pub struct ConsistentHashRing { + ring: BTreeMap, + capacities: BTreeMap, + virtual_nodes: u32, + salt: u32, +} + +impl ConsistentHashRing { + fn new(salt: u32) -> Self { + ConsistentHashRing { + ring: BTreeMap::new(), + capacities: BTreeMap::new(), + virtual_nodes: 1024, + salt, + } + } + + fn hash(key: T, salt: S) -> u64 { + let mut hasher = DefaultHasher::new(); + salt.hash(&mut hasher); + key.hash(&mut hasher); + hasher.finish() + } + + fn add_worker(&mut self, id: u32, capacity: u32) { + let virtual_nodes_count = self.virtual_nodes; + + for i in 0..virtual_nodes_count { + let virtual_node_key = (id, i); + let hash = Self::hash(virtual_node_key, self.salt); + self.ring.insert(hash, id); + } + + self.capacities.insert(id, capacity); + } + + fn distribute_tasks(&self, total_tasks: u32) -> MetaResult> { + let total_capacity = self.capacities.values().sum::(); + + if total_capacity < total_tasks { + bail!("Total tasks exceed the total weight of all workers."); + } + + let mut soft_limits = HashMap::new(); + for (worker_id, worker_capacity) in &self.capacities { + soft_limits.insert( + *worker_id, + (total_tasks as f64 * (*worker_capacity as f64 / total_capacity as f64)).ceil() + as u32, + ); + } + + let mut task_distribution: BTreeMap = BTreeMap::new(); + let mut task_hashes = (0..total_tasks) + .map(|task_idx| Self::hash(task_idx, self.salt)) + .collect_vec(); + + // Sort task hashes to disperse them around the hash ring + task_hashes.sort(); + + for task_hash in task_hashes { + let mut assigned = false; + + // Iterator that starts from the current task_hash or the next node in the ring + let ring_range = self.ring.range(task_hash..).chain(self.ring.iter()); + + for (_, &worker_id) in ring_range { + let worker_capacity = self.capacities.get(&worker_id).unwrap(); + let worker_soft_limit = soft_limits.get(&worker_id).unwrap(); + + let task_limit = min(*worker_capacity, *worker_soft_limit); + + let worker_task_count = task_distribution.entry(worker_id).or_insert(0); + + if *worker_task_count < task_limit { + *worker_task_count += 1; + assigned = true; + break; + } + } + + if !assigned { + bail!("Could not distribute tasks due to capacity constraints."); + } + } + + Ok(task_distribution) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const DEFAULT_SALT: u32 = 42; + + #[test] + fn test_single_worker_capacity() { + let mut ch = ConsistentHashRing::new(DEFAULT_SALT); + ch.add_worker(1, 10); + + let total_tasks = 5; + let task_distribution = ch.distribute_tasks(total_tasks).unwrap(); + + assert_eq!(task_distribution.get(&1).cloned().unwrap_or(0), 5); + } + + #[test] + fn test_multiple_workers_even_distribution() { + let mut ch = ConsistentHashRing::new(DEFAULT_SALT); + + ch.add_worker(1, 1); + ch.add_worker(2, 1); + ch.add_worker(3, 1); + + let total_tasks = 3; + let task_distribution = ch.distribute_tasks(total_tasks).unwrap(); + + for id in 1..=3 { + assert_eq!(task_distribution.get(&id).cloned().unwrap_or(0), 1); + } + } + + #[test] + fn test_weighted_distribution() { + let mut ch = ConsistentHashRing::new(DEFAULT_SALT); + + ch.add_worker(1, 2); + ch.add_worker(2, 3); + ch.add_worker(3, 5); + + let total_tasks = 10; + let task_distribution = ch.distribute_tasks(total_tasks).unwrap(); + + assert_eq!(task_distribution.get(&1).cloned().unwrap_or(0), 2); + assert_eq!(task_distribution.get(&2).cloned().unwrap_or(0), 3); + assert_eq!(task_distribution.get(&3).cloned().unwrap_or(0), 5); + } + + #[test] + fn test_over_capacity() { + let mut ch = ConsistentHashRing::new(DEFAULT_SALT); + + ch.add_worker(1, 1); + ch.add_worker(2, 2); + ch.add_worker(3, 3); + + let total_tasks = 10; // More tasks than the total weight + let task_distribution = ch.distribute_tasks(total_tasks); + + assert!(task_distribution.is_err()); + } + + #[test] + fn test_balance_distribution() { + for mut worker_capacity in 1..10 { + for workers in 3..10 { + let mut ring = ConsistentHashRing::new(DEFAULT_SALT); + + for worker_id in 0..workers { + ring.add_worker(worker_id, worker_capacity); + } + + // Here we simulate a real situation where the actual parallelism cannot fill all the capacity. + // This is to ensure an average distribution, for example, when three workers with 6 parallelism are assigned 9 tasks, + // they should ideally get an exact distribution of 3, 3, 3 respectively. + if worker_capacity % 2 == 0 { + worker_capacity /= 2; + } + + let total_tasks = worker_capacity * workers; + + let task_distribution = ring.distribute_tasks(total_tasks).unwrap(); + + for (_, v) in task_distribution { + assert_eq!(v, worker_capacity); + } + } + } + } +} diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 8c0472aae14f1..c4c7718813ff1 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -650,6 +650,7 @@ impl ActorGraphBuilder { /// Create a new actor graph builder with the given "complete" graph. Returns an error if the /// graph is failed to be scheduled. pub fn new( + streaming_job_id: u32, fragment_graph: CompleteStreamFragmentGraph, cluster_info: StreamingClusterInfo, default_parallelism: NonZeroUsize, @@ -657,11 +658,12 @@ impl ActorGraphBuilder { let existing_distributions = fragment_graph.existing_distribution(); // Schedule the distribution of all building fragments. - let distributions = schedule::Scheduler::new( + let scheduler = schedule::Scheduler::new( + streaming_job_id, cluster_info.parallel_units.values().cloned(), default_parallelism, - ) - .schedule(&fragment_graph)?; + )?; + let distributions = scheduler.schedule(&fragment_graph)?; Ok(Self { distributions, diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 823e9fb9fd204..dc3c85ef521e5 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -18,14 +18,12 @@ reason = "generated by crepe" )] -use std::collections::{BTreeMap, HashMap, LinkedList}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::num::NonZeroUsize; use either::Either; use enum_as_inner::EnumAsInner; use itertools::Itertools; -use rand::seq::SliceRandom; -use rand::thread_rng; use risingwave_common::bail; use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; use risingwave_pb::common::{ActorInfo, ParallelUnit}; @@ -36,6 +34,7 @@ use risingwave_pb::stream_plan::DispatcherType::{self, *}; use crate::manager::{WorkerId, WorkerLocations}; use crate::model::ActorId; +use crate::stream::schedule_units_for_slots; use crate::stream::stream_graph::fragment::CompleteStreamFragmentGraph; use crate::stream::stream_graph::id::GlobalFragmentId as Id; use crate::MetaResult; @@ -200,53 +199,44 @@ impl Scheduler { /// Each hash-distributed fragment will be scheduled to at most `default_parallelism` parallel /// units, in a round-robin fashion on all compute nodes. If the `default_parallelism` is /// `None`, all parallel units will be used. + /// + /// For different streaming jobs, we even out possible scheduling skew by using the streaming job id as the salt for the scheduling algorithm. pub fn new( + streaming_job_id: u32, parallel_units: impl IntoIterator, default_parallelism: NonZeroUsize, - ) -> Self { + ) -> MetaResult { // Group parallel units with worker node. - let mut parallel_units_map = BTreeMap::new(); - for p in parallel_units { - parallel_units_map - .entry(p.worker_node_id) - .or_insert_with(Vec::new) - .push(p); + let mut slots = BTreeMap::new(); + for parallel_unit in parallel_units { + slots + .entry(parallel_unit.worker_node_id as WorkerId) + .or_insert_with(BTreeSet::new) + .insert(parallel_unit.id as ParallelUnitId); } - let mut parallel_units: LinkedList<_> = parallel_units_map - .into_values() - .map(|v| v.into_iter().sorted_by_key(|p| p.id)) - .collect(); - - // Visit the parallel units in a round-robin manner on each worker. - let mut round_robin = Vec::new(); - while !parallel_units.is_empty() { - parallel_units - .extract_if(|ps| { - if let Some(p) = ps.next() { - round_robin.push(p); - false - } else { - true - } - }) - .for_each(drop); - } - round_robin.truncate(default_parallelism.get()); - assert_eq!(round_robin.len(), default_parallelism.get()); + let parallelism = default_parallelism.get(); + let scheduled = schedule_units_for_slots(&slots, parallelism, streaming_job_id)?; - // Sort all parallel units by ID to achieve better vnode locality. - round_robin.sort_unstable_by_key(|p| p.id); + let scheduled_parallel_units = scheduled.values().flatten().cloned().sorted().collect_vec(); + assert_eq!(scheduled_parallel_units.len(), parallelism); // Build the default hash mapping uniformly. - let default_hash_mapping = ParallelUnitMapping::build(&round_robin); - // Randomly choose a parallel unit as the default singleton parallel unit. - let default_singleton_parallel_unit = round_robin.choose(&mut thread_rng()).unwrap().id; + let default_hash_mapping = ParallelUnitMapping::build_from_ids(&scheduled_parallel_units); - Self { + let single_scheduled = schedule_units_for_slots(&slots, 1, streaming_job_id)?; + + let default_singleton_parallel_unit = single_scheduled + .values() + .flatten() + .exactly_one() + .cloned() + .unwrap(); + + Ok(Self { default_hash_mapping, default_singleton_parallel_unit, - } + }) } /// Schedule the given complete graph and returns the distribution of each **building diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index c60db7a0f8d52..0dc9fbce813b2 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -744,7 +744,7 @@ mod tests { CatalogManager, CatalogManagerRef, ClusterManager, FragmentManager, MetaSrvEnv, RelationIdEnum, StreamingClusterInfo, }; - use crate::model::{ActorId, FragmentId}; + use crate::model::{ActorId, FragmentId, TableParallelism}; use crate::rpc::ddl_controller::DropMode; use crate::rpc::metrics::MetaMetrics; use crate::stream::SourceManager; @@ -1024,6 +1024,7 @@ mod tests { fragments, &locations.actor_locations, Default::default(), + TableParallelism::Auto, ); let ctx = CreateStreamingJobContext { building_locations: locations, diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index dd9defecb8ca5..e5cda10fb374d 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -463,6 +463,7 @@ async fn test_graph_builder() -> MetaResult<()> { let internal_tables = fragment_graph.internal_tables(); let actor_graph_builder = ActorGraphBuilder::new( + job.id(), CompleteStreamFragmentGraph::for_test(fragment_graph), make_cluster_info(), NonZeroUsize::new(parallel_degree).unwrap(),