diff --git a/proto/common.proto b/proto/common.proto index 8a4b7006217bf..b6dc349cc3e7f 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -60,6 +60,8 @@ message WorkerNode { repeated ParallelUnit parallel_units = 5; Property property = 6; + // uint32 parallelism = 8; + // Ranges from 0 to 1023, used to generate the machine ID field in the global unique ID. optional uint32 transactional_id = 7; } diff --git a/proto/meta.proto b/proto/meta.proto index 95e70ba490bda..e75ac110edcab 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -86,7 +86,7 @@ message TableFragments { common.ParallelUnitMapping vnode_mapping = 5; uint32 fragment_group_id = 8; - stream_plan.ActorMapping actor_group_mapping = 9; + stream_plan.ActorMapping actor_group_mapping = 9; // TODO: this can be recorded in other places repeated uint32 state_table_ids = 6; // Note that this can be derived backwards from the upstream actors of the Actor held by the Fragment, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 6c4ad5fc61813..1b4785832b144 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1085,7 +1085,15 @@ impl GlobalBarrierManager { .cluster_manager .list_active_streaming_compute_nodes() .await; - let all_actor_infos = self.fragment_manager.load_all_actors(check_state).await; + let assignments = self + .cluster_manager + .get_streaming_cluster_info() + .await + .assignments; + let all_actor_infos = self + .fragment_manager + .load_all_actors_2(&assignments, check_state) + .await; let info = BarrierActorInfo::resolve(all_nodes, all_actor_infos); diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 8b725602ae93c..b1213506242d5 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}; +use risingwave_common::hash::{ActorGroupId, ActorMapping, ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_connector::source::SplitImpl; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -183,6 +183,11 @@ impl FragmentManager { !self.core.read().await.table_fragments.is_empty() } + async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) { + // TODO: notify actor group mapping + } + + #[cfg(any())] async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) { // Notify all fragment mapping to frontend nodes for fragment in table_fragment.fragments.values() { @@ -564,6 +569,7 @@ impl FragmentManager { /// Used in [`crate::barrier::GlobalBarrierManager`], load all actor that need to be sent or /// collected + #[deprecated] pub async fn load_all_actors( &self, check_state: impl Fn(ActorState, TableId, ActorId) -> bool, @@ -603,6 +609,47 @@ impl FragmentManager { } } + /// Used in [`crate::barrier::GlobalBarrierManager`], load all actor that need to be sent or + /// collected + pub async fn load_all_actors_2( + &self, + assignments: &HashMap, + check_state: impl Fn(ActorState, TableId, ActorId) -> bool, + ) -> ActorInfos { + let mut actor_maps = HashMap::new(); + let mut barrier_inject_actor_maps = HashMap::new(); + + let map = &self.core.read().await.table_fragments; + for fragments in map.values() { + let table_id = fragments.table_id(); + let barrier_inject_actors = fragments.barrier_inject_actor_ids(); + + for actor in fragments.fragments.values().flat_map(|f| &f.actors) { + let actor_state = fragments.actor_status[&actor.actor_id].state(); + if check_state(actor_state, table_id, actor.actor_id) { + let worker_id = assignments[&actor.actor_group_id]; + + actor_maps + .entry(worker_id) + .or_insert_with(Vec::new) + .push(actor.actor_id); + + if barrier_inject_actors.contains(&actor.actor_id) { + barrier_inject_actor_maps + .entry(worker_id) + .or_insert_with(Vec::new) + .push(actor.actor_id); + } + } + } + } + + ActorInfos { + actor_maps, + barrier_inject_actor_maps, + } + } + async fn migrate_fragment_actors_inner( &self, migration_plan: &MigrationPlan, diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index e3de9b18aa6da..d66c72dfd6fa0 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use itertools::Itertools; -use risingwave_common::hash::ParallelUnitId; +use risingwave_common::hash::{ActorGroupId, ParallelUnitId}; use risingwave_pb::common::worker_node::{Property, State}; use risingwave_pb::common::{HostAddress, ParallelUnit, WorkerNode, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; @@ -277,6 +277,16 @@ impl ClusterManager { Ok(()) } + pub async fn record_new_assignments(&self, assignments: &HashMap) { + let mut core = self.core.write().await; + + for (&k, &v) in assignments { + if let Some(old_v) = core.assignments.insert(k, v) { + assert_eq!(old_v, v); + } + } + } + pub async fn delete_worker_node(&self, host_address: HostAddress) -> MetaResult { let mut core = self.core.write().await; let worker = core.get_worker_by_host_checked(host_address.clone())?; @@ -494,7 +504,9 @@ impl ClusterManager { #[derive(Debug, Clone)] pub struct StreamingClusterInfo { /// All **active** compute nodes in the cluster. - pub worker_nodes: HashMap, + pub worker_nodes: HashMap, + + pub assignments: HashMap, /// All parallel units of the **active** compute nodes in the cluster. pub parallel_units: HashMap, @@ -507,6 +519,8 @@ pub struct ClusterManagerCore { /// Record for workers in the cluster. workers: HashMap, + assignments: HashMap, // TODO: should we persist this? + /// Record for tracking available machine ids, one is available. available_transactional_ids: VecDeque, } @@ -573,6 +587,7 @@ impl ClusterManagerCore { .into_iter() .map(|w| (WorkerKey(w.key().unwrap()), w)) .collect(), + assignments: Default::default(), // TODO: recover available_transactional_ids, }) } @@ -693,6 +708,7 @@ impl ClusterManagerCore { worker_nodes: active_workers, parallel_units: active_parallel_units, unschedulable_parallel_units, + assignments: self.assignments.clone(), } } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 5dd8f53e249b0..04bc9cfa5f95e 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -123,12 +123,7 @@ impl MetadataModel for TableFragments { impl TableFragments { /// Create a new `TableFragments` with state of `Initial`, with other fields empty. pub fn for_test(table_id: TableId, fragments: BTreeMap) -> Self { - Self::new( - table_id, - fragments, - &BTreeMap::new(), - StreamEnvironment::default(), - ) + Self::new(table_id, fragments, StreamEnvironment::default()) } /// Create a new `TableFragments` with state of `Initial`, with the status of actors set to @@ -136,16 +131,30 @@ impl TableFragments { pub fn new( table_id: TableId, fragments: BTreeMap, - actor_locations: &BTreeMap, + // actor_locations: &BTreeMap, env: StreamEnvironment, ) -> Self { - let actor_status = actor_locations - .iter() - .map(|(&actor_id, parallel_unit)| { + // let actor_status = actor_locations + // .iter() + // .map(|(&actor_id, parallel_unit)| { + // ( + // actor_id, + // ActorStatus { + // parallel_unit: Some(parallel_unit.clone()), + // state: ActorState::Inactive as i32, + // }, + // ) + // }) + // .collect(); + + let actor_status = fragments + .values() + .flat_map(|f| &f.actors) + .map(|a| { ( - actor_id, + a.actor_id, ActorStatus { - parallel_unit: Some(parallel_unit.clone()), + parallel_unit: None, state: ActorState::Inactive as i32, }, ) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 2b2bd10f42894..3ea525dd712e1 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -13,12 +13,13 @@ // limitations under the License. use std::cmp::Ordering; +use std::collections::BTreeMap; use std::num::NonZeroUsize; use std::sync::Arc; use itertools::Itertools; use risingwave_common::config::DefaultParallelism; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::{ActorGroupId, VirtualNode}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::epoch::Epoch; use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider; @@ -36,7 +37,8 @@ use crate::barrier::BarrierManagerRef; use crate::manager::{ CatalogManagerRef, ClusterManagerRef, ConnectionId, DatabaseId, FragmentManagerRef, FunctionId, IdCategory, IndexId, LocalNotification, MetaSrvEnv, NotificationVersion, RelationIdEnum, - SchemaId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, TableId, ViewId, + SchemaId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, TableId, ViewId, WorkerId, + WorkerLocations, }; use crate::model::{StreamEnvironment, TableFragments}; use crate::rpc::cloud_provider::AwsEc2Client; @@ -101,6 +103,12 @@ pub enum DdlCommand { DropConnection(ConnectionId), } +pub struct NewClusterStreamingInfo { + pub assignments: BTreeMap, + + pub worker_locations: WorkerLocations, +} + #[derive(Clone)] pub struct DdlController { env: MetaSrvEnv, @@ -624,8 +632,7 @@ impl DdlController { // 3. Build the table fragments structure that will be persisted in the stream manager, // and the context that contains all information needed for building the // actors on the compute nodes. - let table_fragments = - TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); + let table_fragments = TableFragments::new(id.into(), graph, env); let ctx = CreateStreamingJobContext { dispatchers, @@ -929,12 +936,7 @@ impl DdlController { // 4. Build the table fragments structure that will be persisted in the stream manager, and // the context that contains all information needed for building the actors on the compute // nodes. - let table_fragments = TableFragments::new( - dummy_id.into(), - graph, - &building_locations.actor_locations, - env, - ); + let table_fragments = TableFragments::new(dummy_id.into(), graph, env); let ctx = ReplaceTableContext { old_table_fragments, diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index c833b74cf8c76..340d0f5d0281c 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -20,4 +20,5 @@ mod schedule; pub use actor::{ActorGraphBuildResult, ActorGraphBuilder}; pub use fragment::{CompleteStreamFragmentGraph, StreamFragmentGraph}; +pub use group_schedule::LocationsV2; pub use schedule::Locations; diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 338a935e13700..ebb3c84cd520c 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::num::NonZeroUsize; use std::sync::Arc; @@ -29,10 +29,12 @@ use risingwave_pb::stream_plan::{ DispatchStrategy, Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode, }; -use super::group_schedule::{self, FragmentSchedulings}; +use super::group_schedule::{self, FragmentSchedulings, LocationsV2, Scheduling}; use super::id::GlobalFragmentIdsExt; use super::Locations; -use crate::manager::{IdGeneratorManagerRef, StreamingClusterInfo, StreamingJob}; +use crate::manager::{ + FragmentManagerRef, IdGeneratorManagerRef, StreamingClusterInfo, StreamingJob, WorkerId, +}; use crate::model::{DispatcherId, FragmentId}; use crate::stream::stream_graph::fragment::{ CompleteStreamFragmentGraph, EdgeId, EitherFragment, StreamFragmentEdge, @@ -228,11 +230,13 @@ impl ActorBuilder { .into_values() .flat_map(|ActorUpstream { actors, .. }| actors.as_global_ids()) .collect(); + // Only fill the definition when debug assertions enabled, otherwise use name instead. - #[cfg(not(debug_assertions))] - let mview_definition = job.name(); - #[cfg(debug_assertions)] - let mview_definition = job.definition(); + let mview_definition = if cfg!(debug_assertions) { + job.definition() + } else { + job.name() + }; Ok(StreamActor { actor_id: self.actor_id.as_global_id(), @@ -279,6 +283,8 @@ impl ExternalChange { /// The parallel unit location of actors. type ActorLocations = BTreeMap; +type ActorGroupLocations = BTreeMap; + /// The actual mutable state of building an actor graph. /// /// When the fragments are visited in a topological order, actor builders will be added to this @@ -293,18 +299,22 @@ struct ActorGraphBuildStateInner { /// The scheduled locations of the actors to be built. building_locations: ActorLocations, + // building_scheduling_locations: ActorGroupLocations, /// The required changes to the external actors. See [`ExternalChange`]. external_changes: BTreeMap, /// The actual locations of the external actors. external_locations: ActorLocations, + + external_scheduling_locations: ActorGroupLocations, } /// The information of a fragment, used for parameter passing for `Inner::add_link`. struct FragmentLinkNode<'a> { fragment_id: GlobalFragmentId, actor_ids: &'a [GlobalActorId], - distribution: &'a Distribution, + // distribution: &'a Distribution, + scheduling: &'a Scheduling, } impl ActorGraphBuildStateInner { @@ -315,7 +325,7 @@ impl ActorGraphBuildStateInner { &mut self, actor_id: GlobalActorId, fragment_id: GlobalFragmentId, - parallel_unit_id: ParallelUnitId, + // parallel_unit_id: ParallelUnitId, actor_group_id: ActorGroupId, vnode_bitmap: Option, node: Arc, @@ -327,9 +337,9 @@ impl ActorGraphBuildStateInner { ) .unwrap(); - self.building_locations - .try_insert(actor_id, parallel_unit_id) - .unwrap(); + // self.building_locations + // .try_insert(actor_id, parallel_unit_id) + // .unwrap(); } /// Record the location of an external actor. @@ -343,6 +353,17 @@ impl ActorGraphBuildStateInner { .unwrap(); } + /// Record the location of an external actor. + fn record_external_scheduling_location( + &mut self, + actor_id: GlobalActorId, + actor_group_id: ActorGroupId, + ) { + self.external_scheduling_locations + .try_insert(actor_id, actor_group_id) + .unwrap(); + } + /// Create a new hash dispatcher. fn new_hash_dispatcher( strategy: &DispatchStrategy, @@ -413,6 +434,7 @@ impl ActorGraphBuildStateInner { /// Get the location of an actor. Will look up the location map of both the actors to be built /// and the external actors. + #[deprecated] fn get_location(&self, actor_id: GlobalActorId) -> ParallelUnitId { self.building_locations .get(&actor_id) @@ -421,6 +443,14 @@ impl ActorGraphBuildStateInner { .unwrap() } + fn get_scheduling_location(&self, actor_id: GlobalActorId) -> ActorGroupId { + self.actor_builders + .get(&actor_id) + .map(|b| b.actor_group_id) + .or_else(|| self.external_scheduling_locations.get(&actor_id).copied()) + .unwrap() + } + /// Add a "link" between two fragments in the graph. /// /// The `edge` will be expanded into multiple (downstream - upstream) pairs for the actors in @@ -441,15 +471,16 @@ impl ActorGraphBuildStateInner { // For `NoShuffle`, make n "1-1" links between the actors. DispatcherType::NoShuffle => { assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len()); - let upstream_locations: HashMap<_, _> = upstream + + let upstream_locations: HashMap = upstream .actor_ids .iter() - .map(|id| (self.get_location(*id), *id)) + .map(|id| (self.get_scheduling_location(*id), *id)) .collect(); - let downstream_locations: HashMap<_, _> = downstream + let downstream_locations: HashMap = downstream .actor_ids .iter() - .map(|id| (self.get_location(*id), *id)) + .map(|id| (self.get_scheduling_location(*id), *id)) .collect(); for (location, upstream_id) in upstream_locations { @@ -483,16 +514,18 @@ impl ActorGraphBuildStateInner { let dispatcher = if let DispatcherType::Hash = dt { // Transform the `ParallelUnitMapping` from the downstream distribution to the // `ActorMapping`, used for the `HashDispatcher` for the upstream actors. - let downstream_locations: HashMap = downstream + let downstream_locations: HashMap = downstream .actor_ids .iter() - .map(|&actor_id| (self.get_location(actor_id), actor_id.as_global_id())) + .map(|&actor_id| { + ( + self.get_scheduling_location(actor_id), + actor_id.as_global_id(), + ) + }) .collect(); - let actor_mapping = downstream - .distribution - .as_hash() - .unwrap() - .to_actor(&downstream_locations); + let actor_mapping = (downstream.scheduling.actor_group_mapping) + .transform(&downstream_locations); Self::new_hash_dispatcher( &edge.dispatch_strategy, @@ -577,10 +610,10 @@ pub struct ActorGraphBuildResult { pub graph: BTreeMap, /// The scheduled locations of the actors to be built. - pub building_locations: Locations, + pub building_locations: LocationsV2, /// The actual locations of the external actors. - pub existing_locations: Locations, + pub existing_locations: LocationsV2, /// The new dispatchers to be added to the upstream mview actors. Used for MV on MV. pub dispatchers: HashMap>, @@ -632,6 +665,7 @@ impl ActorGraphBuilder { &fragment_graph, id_gen_manager.clone(), default_parallelism.get(), + cluster_info.worker_nodes.clone(), ) .schedule() .await?; @@ -682,22 +716,46 @@ impl ActorGraphBuilder { } } + /// Convert the actor location map to the [`Locations`] struct. + fn build_locations_2<'a>( + &self, + fragments: impl IntoIterator, + new_assignments: HashMap, + ) -> LocationsV2 { + let actor_groups = fragments + .into_iter() + .flat_map(|f| &f.actors) + .map(|a| (a.actor_id, a.actor_group_id)) + .collect(); + + let mut assignments = self.cluster_info.assignments.clone(); + assignments.extend(new_assignments); + + let worker_locations = self.cluster_info.worker_nodes.clone(); + + LocationsV2 { + actor_groups, + assignments, + worker_locations, + } + } + /// Build a stream graph by duplicating each fragment as parallel actors. Returns /// [`ActorGraphBuildResult`] that will be further used to build actors on the compute nodes. pub async fn generate_graph(self, job: &StreamingJob) -> MetaResult { - // Pre-generate IDs for all actors. - let actor_len = self - .distributions - .values() - .map(|d| d.parallelism()) - .sum::() as u64; - - let actor_len_2 = (self.schedulings.building) + // // Pre-generate IDs for all actors. + // let actor_len = self + // .distributions + // .values() + // .map(|d| d.parallelism()) + // .sum::() as u64; + + let actor_len = (self.schedulings.building) .values() .map(|s| s.parallelism()) .sum::() as u64; - assert_eq!(actor_len, actor_len_2); + // assert_eq!(actor_len, actor_len_2); let id_gen = GlobalActorIdGen::new(&self.id_gen_manager, actor_len).await?; @@ -707,6 +765,7 @@ impl ActorGraphBuilder { building_locations, external_changes, external_locations, + external_scheduling_locations, } = self.build_actor_graph(id_gen)?; for parallel_unit_id in external_locations.values() { @@ -723,7 +782,7 @@ impl ActorGraphBuilder { } // Serialize the graph into a map of sealed fragments. - let graph = { + let graph: BTreeMap = { let mut actors: HashMap> = HashMap::new(); // As all fragments are processed, we can now `build` the actors where the `Exchange` @@ -737,12 +796,12 @@ impl ActorGraphBuilder { actors .into_iter() .map(|(fragment_id, actors)| { - let distribution = self.distributions[&fragment_id].clone(); + // let distribution = self.distributions[&fragment_id].clone(); let scheduling = self.get_scheduling(fragment_id).clone(); let fragment = self.fragment_graph.seal_fragment( fragment_id, actors, - distribution, + // distribution, scheduling, ); let fragment_id = fragment_id.as_global_id(); @@ -752,8 +811,18 @@ impl ActorGraphBuilder { }; // Convert the actor location map to the `Locations` struct. - let building_locations = self.build_locations(building_locations); - let existing_locations = self.build_locations(external_locations); + + let existing_locations = self.build_locations_2( + self.fragment_graph.existing_fragments().values(), + Default::default(), + ); + + let building_locations = + self.build_locations_2(graph.values(), self.schedulings.new_assignments.clone()); + + // Convert the actor location map to the `Locations` struct. + // let building_locations = self.build_locations(building_locations); + // let existing_locations = self.build_locations(external_locations); // Extract the new dispatchers from the external changes. let dispatchers = external_changes @@ -820,7 +889,7 @@ impl ActorGraphBuilder { state: &mut ActorGraphBuildState, ) -> MetaResult<()> { let current_fragment = self.fragment_graph.get_fragment(fragment_id); - let distribution = self.get_distribution(fragment_id); + // let distribution = self.get_distribution(fragment_id); let scheduling = self.get_scheduling(fragment_id); // First, add or record the actors for the current fragment into the state. @@ -828,19 +897,21 @@ impl ActorGraphBuilder { // For building fragments, we need to generate the actor builders. EitherFragment::Building(current_fragment) => { let node = Arc::new(current_fragment.node.clone().unwrap()); - let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps()); + // let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps()); + let bitmaps = scheduling.to_bitmaps(); - (distribution.parallel_units()) - .zip_eq_debug(scheduling.actor_groups()) - .map(|(parallel_unit_id, actor_group_id)| { + scheduling + .actor_groups() + .map(|actor_group_id| { let actor_id = state.next_actor_id(); - let vnode_bitmap = bitmaps.as_ref().map(|m| &m[¶llel_unit_id]).cloned(); - // TODO: use bitmap from `scheduling` + // let vnode_bitmap = bitmaps.as_ref().map(|m| + // &m[¶llel_unit_id]).cloned(); + let vnode_bitmap = bitmaps.as_ref().map(|m| &m[&actor_group_id]).cloned(); state.inner.add_actor( actor_id, fragment_id, - parallel_unit_id, + // parallel_unit_id, actor_group_id, vnode_bitmap, node.clone(), @@ -857,16 +928,20 @@ impl ActorGraphBuilder { .iter() .map(|a| { let actor_id = GlobalActorId::new(a.actor_id); - let parallel_unit_id = match &distribution { - Distribution::Singleton(parallel_unit_id) => *parallel_unit_id, - Distribution::Hash(mapping) => mapping - .get_matched(&Bitmap::from(a.get_vnode_bitmap().unwrap())) - .unwrap(), - }; + // let parallel_unit_id = match &distribution { + // Distribution::Singleton(parallel_unit_id) => *parallel_unit_id, + // Distribution::Hash(mapping) => mapping + // .get_matched(&Bitmap::from(a.get_vnode_bitmap().unwrap())) + // .unwrap(), + // }; + + // state + // .inner + // .record_external_location(actor_id, parallel_unit_id); state .inner - .record_external_location(actor_id, parallel_unit_id); + .record_external_scheduling_location(actor_id, a.actor_group_id); actor_id }) @@ -880,18 +955,20 @@ impl ActorGraphBuilder { .get(&downstream_fragment_id) .expect("downstream fragment not processed yet"); - let downstream_distribution = self.get_distribution(downstream_fragment_id); + let downstream_scheduling = self.get_scheduling(downstream_fragment_id); state.inner.add_link( FragmentLinkNode { fragment_id, actor_ids: &actor_ids, - distribution, + // distribution, + scheduling, }, FragmentLinkNode { fragment_id: downstream_fragment_id, actor_ids: downstream_actors, - distribution: downstream_distribution, + // distribution: downstream_distribution, + scheduling: downstream_scheduling, }, edge, ); diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 52dcc62307748..f8940ff258e4d 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -735,7 +735,7 @@ impl CompleteStreamFragmentGraph { &self, id: GlobalFragmentId, actors: Vec, - distribution: Distribution, + // distribution: Distribution, scheduling: Scheduling, ) -> Fragment { let building_fragment = self.get_fragment(id).into_building().unwrap(); @@ -746,9 +746,9 @@ impl CompleteStreamFragmentGraph { upstream_table_columns: _, } = building_fragment; - let distribution_type = distribution.to_distribution_type(); - let distribution_type_2 = scheduling.distribution_type; - assert_eq!(distribution_type, distribution_type_2); + // let distribution_type = distribution.to_distribution_type(); + let distribution_type = scheduling.distribution_type; + // assert_eq!(distribution_type, distribution_type_2); let state_table_ids = internal_tables .iter() @@ -768,7 +768,7 @@ impl CompleteStreamFragmentGraph { fragment_type_mask: inner.fragment_type_mask, distribution_type: distribution_type as _, actors, - vnode_mapping: Some(distribution.into_mapping().to_protobuf()), + vnode_mapping: None, // TODO: remove state_table_ids, upstream_fragment_ids, } diff --git a/src/meta/src/stream/stream_graph/group_schedule.rs b/src/meta/src/stream/stream_graph/group_schedule.rs index 0f792694268a3..da47d52774cc4 100644 --- a/src/meta/src/stream/stream_graph/group_schedule.rs +++ b/src/meta/src/stream/stream_graph/group_schedule.rs @@ -12,18 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::Arc; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::hash::{ActorGroupId, ActorGroupMapping}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::hash::{ActorGroupId, ActorGroupMapping, ActorId}; +use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::stream_plan::DispatcherType; use super::id::GlobalFragmentId; use super::CompleteStreamFragmentGraph; -use crate::manager::{IdCategory, IdGeneratorManagerRef}; +use crate::manager::{IdCategory, IdGeneratorManagerRef, WorkerId, WorkerKey, WorkerLocations}; use crate::MetaResult; #[derive(Clone, Debug)] @@ -41,12 +43,22 @@ impl Scheduling { pub fn actor_groups(&self) -> impl Iterator + '_ { self.actor_group_mapping.iter_unique() } + + pub fn to_bitmaps(&self) -> Option> { + match self.distribution_type { + FragmentDistributionType::Unspecified => unreachable!(), + FragmentDistributionType::Single => None, + FragmentDistributionType::Hash => Some(self.actor_group_mapping.to_bitmaps()), + } + } } #[derive(Clone, Debug)] pub(super) struct FragmentSchedulings { pub building: HashMap, pub existing: HashMap, + + pub new_assignments: HashMap, } pub(super) struct Scheduler<'a> { @@ -55,6 +67,8 @@ pub(super) struct Scheduler<'a> { id_gen_manager: IdGeneratorManagerRef, default_parallelism: usize, + + worker_nodes: HashMap, } impl<'a> Scheduler<'a> { @@ -62,11 +76,13 @@ impl<'a> Scheduler<'a> { graph: &'a CompleteStreamFragmentGraph, id_gen_manager: IdGeneratorManagerRef, default_parallelism: usize, + worker_nodes: HashMap, ) -> Self { Self { graph, id_gen_manager, default_parallelism, + worker_nodes, } } @@ -111,8 +127,12 @@ impl<'a> Scheduler<'a> { } pub async fn schedule(&self) -> MetaResult { + let mut round_robin_assignment = + RoundRobinAssignment::new(self.worker_nodes.values().map(|w| w.id).collect_vec()); + let mut building = HashMap::new(); let mut existing = HashMap::new(); + let mut new_actor_group_ids = Vec::new(); for group in self.split_into_groups() { let scheduling = match *group @@ -140,11 +160,13 @@ impl<'a> Scheduler<'a> { .id_gen_manager .generate_interval::<{ IdCategory::ActorGroup }>(parallelism as _) .await? as ActorGroupId; + let actor_group_ids = + start_actor_group_id..(start_actor_group_id + parallelism as ActorGroupId); - let actor_group_mapping = ActorGroupMapping::new_uniform( - start_actor_group_id..(start_actor_group_id + parallelism as u32), - ) - .into(); + new_actor_group_ids.extend(actor_group_ids.clone()); + + let actor_group_mapping = + ActorGroupMapping::new_uniform(actor_group_ids).into(); Scheduling { fragment_group_id: new_group_id, @@ -184,6 +206,80 @@ impl<'a> Scheduler<'a> { } } - Ok(FragmentSchedulings { building, existing }) + let new_assignments: HashMap<_, _> = new_actor_group_ids + .into_iter() + .zip(round_robin_assignment) + .collect(); + + if new_assignments.is_empty() { + bail!("empty worker nodes"); + } + + Ok(FragmentSchedulings { + building, + existing, + new_assignments, + }) + } +} + +// TODO: weighted round robin based on parallelism +pub struct RoundRobinAssignment { + worker_nodes: Vec, + i: usize, +} + +impl RoundRobinAssignment { + pub fn new(worker_nodes: impl IntoIterator) -> Self { + Self { + worker_nodes: worker_nodes.into_iter().collect(), + i: 0, + } + } +} + +impl Iterator for RoundRobinAssignment { + type Item = WorkerId; + + fn next(&mut self) -> Option { + let worker_node = self.worker_nodes.get(self.i)?; + self.i = (self.i + 1) % self.worker_nodes.len(); + Some(*worker_node) + } +} + +// pub struct LocationsV2 {} + +/// [`Locations`] represents the parallel unit and worker locations of the actors. +#[cfg_attr(test, derive(Default))] +pub struct LocationsV2 { + /// actor location map. + pub actor_groups: BTreeMap, + + pub assignments: HashMap, + + /// worker location map. + pub worker_locations: WorkerLocations, +} + +impl LocationsV2 { + /// Returns all actors for every worker node. + pub fn worker_actors(&self) -> HashMap> { + self.actor_groups + .iter() + .map(|(&actor_id, actor_group_id)| (self.assignments[actor_group_id], actor_id)) + .into_group_map() + } + + /// Returns an iterator of `ActorInfo`. + pub fn actor_infos(&self) -> impl Iterator + '_ { + self.actor_groups + .iter() + .map(|(&actor_id, actor_group_id)| ActorInfo { + actor_id, + host: self.worker_locations[&self.assignments[actor_group_id]] + .host + .clone(), + }) } } diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 72012a816f109..59560dd984c89 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -17,6 +17,8 @@ clippy::explicit_iter_loop, reason = "generated by crepe" )] +#![allow(unused_imports, dead_code, unused_variables)] +#![deprecated] use std::collections::{BTreeMap, HashMap, LinkedList}; use std::num::NonZeroUsize; @@ -80,6 +82,7 @@ enum Result { DefaultHash, } +#[cfg(any())] crepe::crepe! { @input struct Input(Fact); @@ -246,6 +249,7 @@ impl Scheduler { /// Schedule the given complete graph and returns the distribution of each **building /// fragment**. + #[cfg(any())] pub fn schedule( &self, graph: &CompleteStreamFragmentGraph, @@ -326,6 +330,13 @@ impl Scheduler { Ok(distributions) } + + pub fn schedule( + &self, + _graph: &CompleteStreamFragmentGraph, + ) -> MetaResult> { + Ok(Default::default()) + } } /// [`Locations`] represents the parallel unit and worker locations of the actors. @@ -360,6 +371,7 @@ impl Locations { } #[cfg(test)] +#[cfg(any())] mod tests { use super::*; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 1be53330ac735..cddeb61fe31ae 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -29,7 +29,7 @@ use tokio::sync::{oneshot, Mutex, RwLock}; use tracing::Instrument; use uuid::Uuid; -use super::Locations; +use super::LocationsV2; use crate::barrier::{BarrierScheduler, Command}; use crate::hummock::HummockManagerRef; use crate::manager::{ClusterManagerRef, FragmentManagerRef, MetaSrvEnv}; @@ -54,10 +54,10 @@ pub struct CreateStreamingJobContext { pub internal_tables: HashMap, /// The locations of the actors to build in the streaming job. - pub building_locations: Locations, + pub building_locations: LocationsV2, /// The locations of the existing actors, essentially the upstream mview actors to update. - pub existing_locations: Locations, + pub existing_locations: LocationsV2, /// The properties of the streaming job. // TODO: directly store `StreamingJob` here. @@ -147,10 +147,10 @@ pub struct ReplaceTableContext { pub dispatchers: HashMap>, /// The locations of the actors to build in the new table to replace. - pub building_locations: Locations, + pub building_locations: LocationsV2, /// The locations of the existing actors, essentially the downstream chain actors to update. - pub existing_locations: Locations, + pub existing_locations: LocationsV2, /// The properties of the streaming job. // TODO: directly store `StreamingJob here. @@ -330,8 +330,8 @@ impl GlobalStreamManager { async fn build_actors( &self, table_fragments: &TableFragments, - building_locations: &Locations, - existing_locations: &Locations, + building_locations: &LocationsV2, + existing_locations: &LocationsV2, ) -> MetaResult<()> { let actor_map = table_fragments.actor_map(); @@ -407,7 +407,6 @@ impl GlobalStreamManager { definition, mv_table_id, internal_tables, - .. }: CreateStreamingJobContext, ) -> MetaResult<()> { // Register to compaction group beforehand. @@ -432,6 +431,11 @@ impl GlobalStreamManager { self.build_actors(&table_fragments, &building_locations, &existing_locations) .await?; + // TODO: notify about new assignments. + self.cluster_manager + .record_new_assignments(&building_locations.assignments) + .await; + // Add table fragments to meta store with state: `State::Initial`. self.fragment_manager .start_create_table_fragments(table_fragments.clone()) @@ -569,6 +573,7 @@ impl GlobalStreamManager { } #[cfg(test)] +#[cfg(any())] mod tests { use std::collections::{BTreeMap, HashMap, HashSet}; use std::net::SocketAddr; diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index c84e8d191635d..f5fdeb4eb37c3 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![cfg(any())] + use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::vec;