From a34375aa3f588ec938cec59983791b8fc88124e1 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 10 Apr 2024 17:57:47 +0800 Subject: [PATCH] Refactor FragManager, cleanup proto fields & comments --- proto/meta.proto | 9 +- .../src/worker_manager/worker_node_manager.rs | 56 ------------- .../common_service/src/observer_manager.rs | 4 - src/frontend/src/observer/observer_manager.rs | 36 +------- src/frontend/src/scheduler/plan_fragmenter.rs | 15 ++-- src/meta/service/src/notification_service.rs | 28 ------- src/meta/src/controller/fragment.rs | 21 +---- src/meta/src/controller/streaming_job.rs | 20 +---- src/meta/src/manager/catalog/fragment.rs | 83 ++++++++----------- 9 files changed, 51 insertions(+), 221 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index c07c8a421b81f..3279984fbdfde 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -128,7 +128,6 @@ message ActorLocation { } message MigrationPlan { - // map, the plan indicates that the actors will be migrated from old parallel unit to the new one. map parallel_unit_migration_plan = 1; } @@ -382,7 +381,7 @@ message SubscribeRequest { message MetaSnapshot { message SnapshotVersion { uint64 catalog_version = 1; - // uint64 parallel_unit_mapping_version = 2; + reserved 2; // for old parallel_unit_mapping_version uint64 worker_node_version = 3; uint64 streaming_worker_mapping_version = 4; } @@ -397,15 +396,13 @@ message MetaSnapshot { repeated catalog.Connection connections = 17; repeated catalog.Subscription subscriptions = 19; repeated user.UserInfo users = 8; - // for streaming - // repeated FragmentParallelUnitMapping parallel_unit_mappings = 9; + reserved 9; // for old parallel_unit_mapping repeated common.WorkerNode nodes = 10; hummock.HummockSnapshot hummock_snapshot = 11; hummock.HummockVersion hummock_version = 12; backup_service.MetaBackupManifestId meta_backup_manifest_id = 14; hummock.WriteLimits hummock_write_limits = 16; - // for serving - // repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18; + reserved 18; // for old serving_parallel_unit_mappings // for streaming repeated FragmentWorkerMapping streaming_worker_mappings = 20; diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index c91c47d9bad41..1334745aeb9b7 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -35,13 +35,9 @@ pub struct WorkerNodeManager { struct WorkerNodeManagerInner { worker_nodes: Vec, - /// A cache for parallel units to worker nodes. It should be consistent with `worker_nodes`. - // pu_to_worker: HashMap, /// fragment vnode mapping info for streaming - // streaming_fragment_vnode_mapping: HashMap, streaming_fragment_vnode_mapping: HashMap, /// fragment vnode mapping info for serving - // serving_fragment_vnode_mapping: HashMap, serving_fragment_vnode_mapping: HashMap, } @@ -119,41 +115,13 @@ impl WorkerNodeManager { *w = node; } } - // Update `pu_to_worker` - // write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); } pub fn remove_worker_node(&self, node: WorkerNode) { let mut write_guard = self.inner.write().unwrap(); write_guard.worker_nodes.retain(|x| x.id != node.id); - - // Update `pu_to_worker` - // write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); } - // pub fn refresh( - // &self, - // nodes: Vec, - // streaming_mapping: HashMap, - // serving_mapping: HashMap, - // ) { - // let mut write_guard = self.inner.write().unwrap(); - // tracing::debug!("Refresh worker nodes {:?}.", nodes); - // tracing::debug!( - // "Refresh streaming vnode mapping for fragments {:?}.", - // streaming_mapping.keys() - // ); - // tracing::debug!( - // "Refresh serving vnode mapping for fragments {:?}.", - // serving_mapping.keys() - // ); - // write_guard.worker_nodes = nodes; - // // Update `pu_to_worker` - // write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); - // write_guard.streaming_fragment_vnode_mapping = streaming_mapping; - // write_guard.serving_fragment_vnode_mapping = serving_mapping; - // } - pub fn refresh( &self, nodes: Vec, @@ -204,29 +172,6 @@ impl WorkerNodeManager { Ok(workers) } - // pub fn get_workers_by_parallel_unit_ids( - // &self, - // parallel_unit_ids: &[ParallelUnitId], - // ) -> Result> { - // if parallel_unit_ids.is_empty() { - // return Err(BatchError::EmptyWorkerNodes); - // } - // - // let guard = self.inner.read().unwrap(); - // - // let mut workers = Vec::with_capacity(parallel_unit_ids.len()); - // for parallel_unit_id in parallel_unit_ids { - // match guard.pu_to_worker.get(parallel_unit_id) { - // Some(worker) => workers.push(worker.clone()), - // None => bail!( - // "No worker node found for parallel unit id: {}", - // parallel_unit_id - // ), - // } - // } - // Ok(workers) - // } - pub fn get_streaming_fragment_mapping( &self, fragment_id: &FragmentId, @@ -283,7 +228,6 @@ impl WorkerNodeManager { } pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { - // pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { let mut guard = self.inner.write().unwrap(); tracing::debug!( "Set serving vnode mapping for fragments {:?}", diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index 68019582958b3..77bb34c54584e 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -145,9 +145,6 @@ where | Info::Function(_) => { notification.version > info.version.as_ref().unwrap().catalog_version } - // Info::ParallelUnitMapping(_) => { - // notification.version > info.version.as_ref().unwrap().parallel_unit_mapping_version - // } Info::Node(_) => { notification.version > info.version.as_ref().unwrap().worker_node_version } @@ -157,7 +154,6 @@ where Info::HummockSnapshot(_) => true, Info::MetaBackupManifestId(_) => true, Info::SystemParams(_) => true, - // Info::ServingParallelUnitMappings(_) => true, Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(), Info::HummockStats(_) => true, Info::Recovery(_) => true, diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 108298d48da6c..45358e881483d 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -26,9 +26,7 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use risingwave_pb::meta::{ - FragmentWorkerMapping, MetaSnapshot, SubscribeResponse, -}; +use risingwave_pb::meta::{FragmentWorkerMapping, MetaSnapshot, SubscribeResponse}; use risingwave_rpc_client::ComputeClientPoolRef; use tokio::sync::watch::Sender; @@ -96,12 +94,6 @@ impl ObserverState for FrontendObserverNode { Info::HummockStats(stats) => { self.handle_table_stats_notification(stats); } - - // Info::ParallelUnitMapping(_) => self.handle_fragment_mapping_notification(resp), - // - // Info::ServingParallelUnitMappings(m) => { - // // self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation()); - // } Info::StreamingWorkerMapping(_) => self.handle_fragment_mapping_notification(resp), Info::ServingWorkerMappings(m) => { self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation()) @@ -133,8 +125,6 @@ impl ObserverState for FrontendObserverNode { functions, connections, users, - // parallel_unit_mappings: _, - // serving_parallel_unit_mappings: _, nodes, hummock_snapshot, hummock_version: _, @@ -385,30 +375,6 @@ impl FrontendObserverNode { return; }; match info { - // Info::ParallelUnitMapping(parallel_unit_mapping) => { - // let fragment_id = parallel_unit_mapping.fragment_id; - // let mapping = || { - // ParallelUnitMapping::from_protobuf( - // parallel_unit_mapping.mapping.as_ref().unwrap(), - // ) - // }; - // - // match resp.operation() { - // Operation::Add => { - // self.worker_node_manager - // .insert_streaming_fragment_mapping(fragment_id, mapping()); - // } - // Operation::Delete => { - // self.worker_node_manager - // .remove_streaming_fragment_mapping(&fragment_id); - // } - // Operation::Update => { - // self.worker_node_manager - // .update_streaming_fragment_mapping(fragment_id, mapping()); - // } - // _ => panic!("receive an unsupported notify {:?}", resp), - // } - // } Info::StreamingWorkerMapping(streaming_worker_mapping) => { let fragment_id = streaming_worker_mapping.fragment_id; let mapping = || { diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 4c2050c10374f..5fe7355c418d0 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1149,24 +1149,25 @@ fn derive_partitions( match vnode { None => { // put this scan_range to all partitions - vnode_mapping.to_bitmaps().into_iter().for_each( - |(parallel_unit_id, vnode_bitmap)| { + vnode_mapping + .to_bitmaps() + .into_iter() + .for_each(|(worker_id, vnode_bitmap)| { let (bitmap, scan_ranges) = partitions - .entry(parallel_unit_id) + .entry(worker_id) .or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![])); vnode_bitmap .iter() .enumerate() .for_each(|(vnode, b)| bitmap.set(vnode, b)); scan_ranges.push(scan_range.to_protobuf()); - }, - ); + }); } // scan a single partition Some(vnode) => { - let parallel_unit_id = vnode_mapping[vnode]; + let worker_id = vnode_mapping[vnode]; let (bitmap, scan_ranges) = partitions - .entry(parallel_unit_id) + .entry(worker_id) .or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![])); bitmap.set(vnode.to_index(), true); scan_ranges.push(scan_range.to_protobuf()); diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index 31bc684b9e053..8e532c20cefff 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -134,29 +134,6 @@ impl NotificationServiceImpl { } } - // async fn get_parallel_unit_mapping_snapshot( - // &self, - // ) -> MetaResult<(Vec, NotificationVersion)> { - // match &self.metadata_manager { - // MetadataManager::V1(mgr) => { - // let fragment_guard = mgr.fragment_manager.get_fragment_read_guard().await; - // let parallel_unit_mappings = - // fragment_guard.all_running_fragment_mappings().collect_vec(); - // let notification_version = self.env.notification_manager().current_version().await; - // Ok((parallel_unit_mappings, notification_version)) - // } - // MetadataManager::V2(mgr) => { - // let fragment_guard = mgr.catalog_controller.get_inner_read_guard().await; - // let parallel_unit_mappings = fragment_guard - // .all_running_fragment_mappings() - // .await? - // .collect_vec(); - // let notification_version = self.env.notification_manager().current_version().await; - // Ok((parallel_unit_mappings, notification_version)) - // } - // } - // } - async fn get_worker_mapping_snapshot( &self, ) -> MetaResult<(Vec, NotificationVersion)> { @@ -260,8 +237,6 @@ impl NotificationServiceImpl { users, catalog_version, ) = self.get_catalog_snapshot().await?; - // let (parallel_unit_mappings, parallel_unit_mapping_version) = - // self.get_parallel_unit_mapping_snapshot().await?; let (streaming_worker_mappings, streaming_worker_mapping_version) = self.get_worker_mapping_snapshot().await?; @@ -284,13 +259,10 @@ impl NotificationServiceImpl { functions, connections, users, - // parallel_unit_mappings: vec![], nodes, hummock_snapshot, - // serving_parallel_unit_mappings: vec![], version: Some(SnapshotVersion { catalog_version, - // parallel_unit_mapping_version: 0, worker_node_version, streaming_worker_mapping_version, }), diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 86e339759e8e0..11cbb640ded9b 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -76,25 +76,8 @@ impl CatalogControllerInner { .all(&txn) .await?; - let worker_parallel_units = WorkerProperty::find() - .select_only() - .columns([ - worker_property::Column::WorkerId, - worker_property::Column::ParallelUnitIds, - ]) - .into_tuple::<(WorkerId, I32Array)>() - .all(&txn) - .await?; - - let parallel_unit_to_worker = worker_parallel_units - .into_iter() - .flat_map(|(worker_id, parallel_unit_ids)| { - parallel_unit_ids - .into_inner() - .into_iter() - .map(move |parallel_unit_id| (parallel_unit_id as u32, worker_id as u32)) - }) - .collect::>(); + let parallel_unit_to_worker = + CatalogController::get_parallel_unit_to_worker_map(&txn).await?; Ok(fragment_mappings .into_iter() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 1cb48c7f7f19b..99e9f53305a8e 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1023,25 +1023,7 @@ impl CatalogController { let txn = inner.db.begin().await?; - let worker_parallel_unit_ids: Vec<(WorkerId, I32Array)> = WorkerProperty::find() - .select_only() - .columns([ - worker_property::Column::WorkerId, - worker_property::Column::ParallelUnitIds, - ]) - .into_tuple() - .all(&txn) - .await?; - - let parallel_unit_to_worker = worker_parallel_unit_ids - .into_iter() - .flat_map(|(worker_id, parallel_unit_ids)| { - parallel_unit_ids - .0 - .into_iter() - .map(move |parallel_unit_id| (parallel_unit_id as u32, worker_id as u32)) - }) - .collect::>(); + let parallel_unit_to_worker = Self::get_parallel_unit_to_worker_map(&txn).await?; let mut fragment_mapping_to_notify = vec![]; diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index f6fab0217900a..a554e7d6b7676 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -24,6 +24,7 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping} use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont}; use risingwave_connector::source::SplitImpl; use risingwave_meta_model_v2::SourceId; +use risingwave_pb::common::{PbParallelUnitMapping, PbWorkerMapping}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; @@ -60,25 +61,16 @@ impl FragmentManagerCore { .values() .filter(|tf| tf.state() != State::Initial) .flat_map(|table_fragments| { - let parallel_unit_to_worker: HashMap<_, _> = table_fragments - .actor_status + table_fragments + .fragments .values() - .map(|status| status.get_parallel_unit().unwrap()) - .map(|parallel_unit| (parallel_unit.id, parallel_unit.worker_node_id)) - .collect(); - - table_fragments.fragments.values().map(move |fragment| { - let parallel_unit_mapping = ParallelUnitMapping::from_protobuf( - fragment.vnode_mapping.as_ref().unwrap(), - ); - - let worker_mapping = parallel_unit_mapping.to_worker(¶llel_unit_to_worker); - - FragmentWorkerMapping { + .map(move |fragment| FragmentWorkerMapping { fragment_id: fragment.fragment_id, - mapping: Some(worker_mapping.to_protobuf()), - } - }) + mapping: Some(FragmentManager::convert_mapping( + &table_fragments.actor_status, + fragment.vnode_mapping.as_ref().unwrap(), + )), + }) }) } @@ -201,29 +193,17 @@ impl FragmentManager { } async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) { - let parallel_unit_to_worker = table_fragment - .actor_status - .values() - .map(|actor_status| { - let parallel_unit = actor_status.get_parallel_unit().unwrap(); - (parallel_unit.id, parallel_unit.worker_node_id) - }) - .collect(); - // Notify all fragment mapping to frontend nodes for fragment in table_fragment.fragments.values() { - let mapping = fragment - .vnode_mapping - .clone() - .expect("no data distribution found"); - - let worker_mapping = ParallelUnitMapping::from_protobuf(&mapping) - .to_worker(¶llel_unit_to_worker) - .to_protobuf(); - let fragment_mapping = FragmentWorkerMapping { fragment_id: fragment.fragment_id, - mapping: Some(worker_mapping), + mapping: Some(Self::convert_mapping( + &table_fragment.actor_status, + fragment + .vnode_mapping + .as_ref() + .expect("no data distribution found"), + )), }; self.env @@ -1287,17 +1267,7 @@ impl FragmentManager { *fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone(); - let parallel_unit_to_worker = actor_status - .values() - .map(|actor_status| { - let parallel_unit = actor_status.get_parallel_unit().unwrap(); - (parallel_unit.id, parallel_unit.worker_node_id) - }) - .collect(); - - let worker_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping) - .to_worker(¶llel_unit_to_worker) - .to_protobuf(); + let worker_mapping = Self::convert_mapping(&actor_status, &vnode_mapping); // Notify fragment mapping to frontend nodes. let fragment_mapping = FragmentWorkerMapping { @@ -1431,6 +1401,25 @@ impl FragmentManager { Ok(()) } + fn convert_mapping( + actor_status: &BTreeMap, + vnode_mapping: &PbParallelUnitMapping, + ) -> PbWorkerMapping { + let parallel_unit_to_worker = actor_status + .values() + .map(|actor_status| { + let parallel_unit = actor_status.get_parallel_unit().unwrap(); + (parallel_unit.id, parallel_unit.worker_node_id) + }) + .collect(); + + let worker_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping) + .to_worker(¶llel_unit_to_worker) + .to_protobuf(); + + worker_mapping + } + pub async fn table_node_actors( &self, table_ids: &HashSet,