Skip to content

Commit

Permalink
Refactor all_running_fragment_mappings to return Result, simplify…
Browse files Browse the repository at this point in the history
… call in `notification_service`.
  • Loading branch information
shanicky committed Jul 17, 2024
1 parent 30fd4d8 commit abb2933
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
3 changes: 1 addition & 2 deletions src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ impl NotificationServiceImpl {
match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let fragment_guard = mgr.fragment_manager.get_fragment_read_guard().await;
let worker_slot_mappings =
fragment_guard.all_running_fragment_mappings().collect_vec();
let worker_slot_mappings = fragment_guard.all_running_fragment_mappings()?;
let notification_version = self.env.notification_manager().current_version().await;
Ok((worker_slot_mappings, notification_version))
}
Expand Down
43 changes: 22 additions & 21 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,28 @@ pub struct FragmentManagerCore {

impl FragmentManagerCore {
/// List all fragment vnode mapping info that not in `State::Initial`.
pub fn all_running_fragment_mappings(
&self,
) -> impl Iterator<Item = FragmentWorkerSlotMapping> + '_ {
self.table_fragments
.values()
.filter(|tf| tf.state() != State::Initial)
.flat_map(|table_fragments| {
table_fragments
.fragments
.values()
.map(move |fragment| FragmentWorkerSlotMapping {
fragment_id: fragment.fragment_id,
mapping: Some(
FragmentManager::convert_mapping(
&table_fragments.actor_status,
fragment.vnode_mapping.as_ref().unwrap(),
)
.unwrap(),
),
})
})
pub fn all_running_fragment_mappings(&self) -> MetaResult<Vec<FragmentWorkerSlotMapping>> {
let mut mappings = vec![];

for table_fragments in self.table_fragments.values() {
if table_fragments.state() == State::Initial {
continue;
}

for fragment in table_fragments.fragments() {
let mapping = FragmentWorkerSlotMapping {
fragment_id: fragment.fragment_id,
mapping: Some(FragmentManager::convert_mapping(
&table_fragments.actor_status,
fragment.vnode_mapping.as_ref().unwrap(),
)?),
};

mappings.push(mapping);
}
}

Ok(mappings)
}

fn running_fragment_parallelisms(
Expand Down

0 comments on commit abb2933

Please sign in to comment.