diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 69e17a978212..86277c0f2a50 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -192,7 +192,16 @@ pub async fn start_serving_vnode_mapping_worker( continue; } let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await; - let (upserted, failed) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); + let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id|{ + match streaming_parallelisms.get(frag_id) { + Some(parallelism) => Some((*frag_id, *parallelism)), + None => { + tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found"); + None + } + } + }).collect(); + let (upserted, failed) = serving_vnode_mapping.upsert(filtered_streaming_parallelisms, &workers); if !upserted.is_empty() { tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys()); notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&upserted) }));