Skip to content

Commit

Permalink
Refactor FragManager, cleanup proto fields & comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Apr 10, 2024
1 parent 2bf6d37 commit a34375a
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 221 deletions.
9 changes: 3 additions & 6 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ message ActorLocation {
}

message MigrationPlan {
// map<parallel_unit_id, parallel_unit>, the plan indicates that the actors will be migrated from old parallel unit to the new one.
map<uint32, common.ParallelUnit> parallel_unit_migration_plan = 1;
}

Expand Down Expand Up @@ -382,7 +381,7 @@ message SubscribeRequest {
message MetaSnapshot {
message SnapshotVersion {
uint64 catalog_version = 1;
// uint64 parallel_unit_mapping_version = 2;
reserved 2; // for old parallel_unit_mapping_version
uint64 worker_node_version = 3;
uint64 streaming_worker_mapping_version = 4;
}
Expand All @@ -397,15 +396,13 @@ message MetaSnapshot {
repeated catalog.Connection connections = 17;
repeated catalog.Subscription subscriptions = 19;
repeated user.UserInfo users = 8;
// for streaming
// repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
reserved 9; // for old parallel_unit_mapping
repeated common.WorkerNode nodes = 10;
hummock.HummockSnapshot hummock_snapshot = 11;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
hummock.WriteLimits hummock_write_limits = 16;
// for serving
// repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18;
reserved 18; // for old serving_parallel_unit_mappings

// for streaming
repeated FragmentWorkerMapping streaming_worker_mappings = 20;
Expand Down
56 changes: 0 additions & 56 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@ pub struct WorkerNodeManager {

struct WorkerNodeManagerInner {
worker_nodes: Vec<WorkerNode>,
/// A cache for parallel units to worker nodes. It should be consistent with `worker_nodes`.
// pu_to_worker: HashMap<ParallelUnitId, WorkerNode>,
/// fragment vnode mapping info for streaming
// streaming_fragment_vnode_mapping: HashMap<FragmentId, ParallelUnitMapping>,
streaming_fragment_vnode_mapping: HashMap<FragmentId, WorkerMapping>,
/// fragment vnode mapping info for serving
// serving_fragment_vnode_mapping: HashMap<FragmentId, ParallelUnitMapping>,
serving_fragment_vnode_mapping: HashMap<FragmentId, WorkerMapping>,
}

Expand Down Expand Up @@ -119,41 +115,13 @@ impl WorkerNodeManager {
*w = node;
}
}
// Update `pu_to_worker`
// write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes);
}

pub fn remove_worker_node(&self, node: WorkerNode) {
let mut write_guard = self.inner.write().unwrap();
write_guard.worker_nodes.retain(|x| x.id != node.id);

// Update `pu_to_worker`
// write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes);
}

// pub fn refresh(
// &self,
// nodes: Vec<WorkerNode>,
// streaming_mapping: HashMap<FragmentId, WorkerMapping>,
// serving_mapping: HashMap<FragmentId, ParallelUnitMapping>,
// ) {
// let mut write_guard = self.inner.write().unwrap();
// tracing::debug!("Refresh worker nodes {:?}.", nodes);
// tracing::debug!(
// "Refresh streaming vnode mapping for fragments {:?}.",
// streaming_mapping.keys()
// );
// tracing::debug!(
// "Refresh serving vnode mapping for fragments {:?}.",
// serving_mapping.keys()
// );
// write_guard.worker_nodes = nodes;
// // Update `pu_to_worker`
// write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes);
// write_guard.streaming_fragment_vnode_mapping = streaming_mapping;
// write_guard.serving_fragment_vnode_mapping = serving_mapping;
// }

pub fn refresh(
&self,
nodes: Vec<WorkerNode>,
Expand Down Expand Up @@ -204,29 +172,6 @@ impl WorkerNodeManager {
Ok(workers)
}

// pub fn get_workers_by_parallel_unit_ids(
// &self,
// parallel_unit_ids: &[ParallelUnitId],
// ) -> Result<Vec<WorkerNode>> {
// if parallel_unit_ids.is_empty() {
// return Err(BatchError::EmptyWorkerNodes);
// }
//
// let guard = self.inner.read().unwrap();
//
// let mut workers = Vec::with_capacity(parallel_unit_ids.len());
// for parallel_unit_id in parallel_unit_ids {
// match guard.pu_to_worker.get(parallel_unit_id) {
// Some(worker) => workers.push(worker.clone()),
// None => bail!(
// "No worker node found for parallel unit id: {}",
// parallel_unit_id
// ),
// }
// }
// Ok(workers)
// }

pub fn get_streaming_fragment_mapping(
&self,
fragment_id: &FragmentId,
Expand Down Expand Up @@ -283,7 +228,6 @@ impl WorkerNodeManager {
}

pub fn set_serving_fragment_mapping(&self, mappings: HashMap<FragmentId, WorkerMapping>) {
// pub fn set_serving_fragment_mapping(&self, mappings: HashMap<FragmentId, ParallelUnitMapping>) {
let mut guard = self.inner.write().unwrap();
tracing::debug!(
"Set serving vnode mapping for fragments {:?}",
Expand Down
4 changes: 0 additions & 4 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ where
| Info::Function(_) => {
notification.version > info.version.as_ref().unwrap().catalog_version
}
// Info::ParallelUnitMapping(_) => {
// notification.version > info.version.as_ref().unwrap().parallel_unit_mapping_version
// }
Info::Node(_) => {
notification.version > info.version.as_ref().unwrap().worker_node_version
}
Expand All @@ -157,7 +154,6 @@ where
Info::HummockSnapshot(_) => true,
Info::MetaBackupManifestId(_) => true,
Info::SystemParams(_) => true,
// Info::ServingParallelUnitMappings(_) => true,
Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(),
Info::HummockStats(_) => true,
Info::Recovery(_) => true,
Expand Down
36 changes: 1 addition & 35 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{
FragmentWorkerMapping, MetaSnapshot, SubscribeResponse,
};
use risingwave_pb::meta::{FragmentWorkerMapping, MetaSnapshot, SubscribeResponse};
use risingwave_rpc_client::ComputeClientPoolRef;
use tokio::sync::watch::Sender;

Expand Down Expand Up @@ -96,12 +94,6 @@ impl ObserverState for FrontendObserverNode {
Info::HummockStats(stats) => {
self.handle_table_stats_notification(stats);
}

// Info::ParallelUnitMapping(_) => self.handle_fragment_mapping_notification(resp),
//
// Info::ServingParallelUnitMappings(m) => {
// // self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation());
// }
Info::StreamingWorkerMapping(_) => self.handle_fragment_mapping_notification(resp),
Info::ServingWorkerMappings(m) => {
self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation())
Expand Down Expand Up @@ -133,8 +125,6 @@ impl ObserverState for FrontendObserverNode {
functions,
connections,
users,
// parallel_unit_mappings: _,
// serving_parallel_unit_mappings: _,
nodes,
hummock_snapshot,
hummock_version: _,
Expand Down Expand Up @@ -385,30 +375,6 @@ impl FrontendObserverNode {
return;
};
match info {
// Info::ParallelUnitMapping(parallel_unit_mapping) => {
// let fragment_id = parallel_unit_mapping.fragment_id;
// let mapping = || {
// ParallelUnitMapping::from_protobuf(
// parallel_unit_mapping.mapping.as_ref().unwrap(),
// )
// };
//
// match resp.operation() {
// Operation::Add => {
// self.worker_node_manager
// .insert_streaming_fragment_mapping(fragment_id, mapping());
// }
// Operation::Delete => {
// self.worker_node_manager
// .remove_streaming_fragment_mapping(&fragment_id);
// }
// Operation::Update => {
// self.worker_node_manager
// .update_streaming_fragment_mapping(fragment_id, mapping());
// }
// _ => panic!("receive an unsupported notify {:?}", resp),
// }
// }
Info::StreamingWorkerMapping(streaming_worker_mapping) => {
let fragment_id = streaming_worker_mapping.fragment_id;
let mapping = || {
Expand Down
15 changes: 8 additions & 7 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1149,24 +1149,25 @@ fn derive_partitions(
match vnode {
None => {
// put this scan_range to all partitions
vnode_mapping.to_bitmaps().into_iter().for_each(
|(parallel_unit_id, vnode_bitmap)| {
vnode_mapping
.to_bitmaps()
.into_iter()
.for_each(|(worker_id, vnode_bitmap)| {
let (bitmap, scan_ranges) = partitions
.entry(parallel_unit_id)
.entry(worker_id)
.or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![]));
vnode_bitmap
.iter()
.enumerate()
.for_each(|(vnode, b)| bitmap.set(vnode, b));
scan_ranges.push(scan_range.to_protobuf());
},
);
});
}
// scan a single partition
Some(vnode) => {
let parallel_unit_id = vnode_mapping[vnode];
let worker_id = vnode_mapping[vnode];
let (bitmap, scan_ranges) = partitions
.entry(parallel_unit_id)
.entry(worker_id)
.or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![]));
bitmap.set(vnode.to_index(), true);
scan_ranges.push(scan_range.to_protobuf());
Expand Down
28 changes: 0 additions & 28 deletions src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,29 +134,6 @@ impl NotificationServiceImpl {
}
}

// async fn get_parallel_unit_mapping_snapshot(
// &self,
// ) -> MetaResult<(Vec<FragmentParallelUnitMapping>, NotificationVersion)> {
// match &self.metadata_manager {
// MetadataManager::V1(mgr) => {
// let fragment_guard = mgr.fragment_manager.get_fragment_read_guard().await;
// let parallel_unit_mappings =
// fragment_guard.all_running_fragment_mappings().collect_vec();
// let notification_version = self.env.notification_manager().current_version().await;
// Ok((parallel_unit_mappings, notification_version))
// }
// MetadataManager::V2(mgr) => {
// let fragment_guard = mgr.catalog_controller.get_inner_read_guard().await;
// let parallel_unit_mappings = fragment_guard
// .all_running_fragment_mappings()
// .await?
// .collect_vec();
// let notification_version = self.env.notification_manager().current_version().await;
// Ok((parallel_unit_mappings, notification_version))
// }
// }
// }

async fn get_worker_mapping_snapshot(
&self,
) -> MetaResult<(Vec<FragmentWorkerMapping>, NotificationVersion)> {
Expand Down Expand Up @@ -260,8 +237,6 @@ impl NotificationServiceImpl {
users,
catalog_version,
) = self.get_catalog_snapshot().await?;
// let (parallel_unit_mappings, parallel_unit_mapping_version) =
// self.get_parallel_unit_mapping_snapshot().await?;

let (streaming_worker_mappings, streaming_worker_mapping_version) =
self.get_worker_mapping_snapshot().await?;
Expand All @@ -284,13 +259,10 @@ impl NotificationServiceImpl {
functions,
connections,
users,
// parallel_unit_mappings: vec![],
nodes,
hummock_snapshot,
// serving_parallel_unit_mappings: vec![],
version: Some(SnapshotVersion {
catalog_version,
// parallel_unit_mapping_version: 0,
worker_node_version,
streaming_worker_mapping_version,
}),
Expand Down
21 changes: 2 additions & 19 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,8 @@ impl CatalogControllerInner {
.all(&txn)
.await?;

let worker_parallel_units = WorkerProperty::find()
.select_only()
.columns([
worker_property::Column::WorkerId,
worker_property::Column::ParallelUnitIds,
])
.into_tuple::<(WorkerId, I32Array)>()
.all(&txn)
.await?;

let parallel_unit_to_worker = worker_parallel_units
.into_iter()
.flat_map(|(worker_id, parallel_unit_ids)| {
parallel_unit_ids
.into_inner()
.into_iter()
.map(move |parallel_unit_id| (parallel_unit_id as u32, worker_id as u32))
})
.collect::<HashMap<_, _>>();
let parallel_unit_to_worker =
CatalogController::get_parallel_unit_to_worker_map(&txn).await?;

Ok(fragment_mappings
.into_iter()
Expand Down
20 changes: 1 addition & 19 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,25 +1023,7 @@ impl CatalogController {

let txn = inner.db.begin().await?;

let worker_parallel_unit_ids: Vec<(WorkerId, I32Array)> = WorkerProperty::find()
.select_only()
.columns([
worker_property::Column::WorkerId,
worker_property::Column::ParallelUnitIds,
])
.into_tuple()
.all(&txn)
.await?;

let parallel_unit_to_worker = worker_parallel_unit_ids
.into_iter()
.flat_map(|(worker_id, parallel_unit_ids)| {
parallel_unit_ids
.0
.into_iter()
.map(move |parallel_unit_id| (parallel_unit_id as u32, worker_id as u32))
})
.collect::<HashMap<_, _>>();
let parallel_unit_to_worker = Self::get_parallel_unit_to_worker_map(&txn).await?;

let mut fragment_mapping_to_notify = vec![];

Expand Down
Loading

0 comments on commit a34375a

Please sign in to comment.