From abb29337a14d2994176e50da4ba900f313b81394 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 17 Jul 2024 16:55:38 +0800 Subject: [PATCH 1/2] Refactor `all_running_fragment_mappings` to return `Result`, simplify call in `notification_service`. --- src/meta/service/src/notification_service.rs | 3 +- src/meta/src/manager/catalog/fragment.rs | 43 ++++++++++---------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index 5f0abbc8fe0e9..2e6df0d53509f 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -197,8 +197,7 @@ impl NotificationServiceImpl { match &self.metadata_manager { MetadataManager::V1(mgr) => { let fragment_guard = mgr.fragment_manager.get_fragment_read_guard().await; - let worker_slot_mappings = - fragment_guard.all_running_fragment_mappings().collect_vec(); + let worker_slot_mappings = fragment_guard.all_running_fragment_mappings()?; let notification_version = self.env.notification_manager().current_version().await; Ok((worker_slot_mappings, notification_version)) } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 1ac5aacef450a..068aed623cfca 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -58,27 +58,28 @@ pub struct FragmentManagerCore { impl FragmentManagerCore { /// List all fragment vnode mapping info that not in `State::Initial`. - pub fn all_running_fragment_mappings( - &self, - ) -> impl Iterator + '_ { - self.table_fragments - .values() - .filter(|tf| tf.state() != State::Initial) - .flat_map(|table_fragments| { - table_fragments - .fragments - .values() - .map(move |fragment| FragmentWorkerSlotMapping { - fragment_id: fragment.fragment_id, - mapping: Some( - FragmentManager::convert_mapping( - &table_fragments.actor_status, - fragment.vnode_mapping.as_ref().unwrap(), - ) - .unwrap(), - ), - }) - }) + pub fn all_running_fragment_mappings(&self) -> MetaResult> { + let mut mappings = vec![]; + + for table_fragments in self.table_fragments.values() { + if table_fragments.state() == State::Initial { + continue; + } + + for fragment in table_fragments.fragments() { + let mapping = FragmentWorkerSlotMapping { + fragment_id: fragment.fragment_id, + mapping: Some(FragmentManager::convert_mapping( + &table_fragments.actor_status, + fragment.vnode_mapping.as_ref().unwrap(), + )?), + }; + + mappings.push(mapping); + } + } + + Ok(mappings) } fn running_fragment_parallelisms( From 2d4fd2038431ae04994fe7cce3794e80a606bd4d Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 17 Jul 2024 17:32:12 +0800 Subject: [PATCH 2/2] Refactor FragmentWorkerSlotMapping creation, log errors on failure --- src/meta/src/manager/catalog/fragment.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 068aed623cfca..606b44f6fdcbd 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -67,12 +67,16 @@ impl FragmentManagerCore { } for fragment in table_fragments.fragments() { + let worker_slot_mapping = FragmentManager::convert_mapping( + &table_fragments.actor_status, + fragment.vnode_mapping.as_ref().unwrap(), + ) + .inspect_err(|e| tracing::warn!("build mapping failed: {:?}, downgrade to None", e)) + .ok(); + let mapping = FragmentWorkerSlotMapping { fragment_id: fragment.fragment_id, - mapping: Some(FragmentManager::convert_mapping( - &table_fragments.actor_status, - fragment.vnode_mapping.as_ref().unwrap(), - )?), + mapping: worker_slot_mapping, }; mappings.push(mapping);