From 08f02bc0697017bcdd0b94b1b165d5479d7cc89e Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 15 Nov 2023 20:23:47 +0800 Subject: [PATCH] fix throttle source --- src/meta/src/manager/catalog/fragment.rs | 34 ++++++++++-------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index f54045c6b6e2b..3ef2769124219 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -785,17 +785,10 @@ impl FragmentManager { ) -> MetaResult>> { let map = &mut self.core.write().await.table_fragments; let mut table_id_to_apply = HashSet::new(); - 'table: for (table_id, table_fragments) in map.iter() { + for (table_id, table_fragments) in map.iter() { for fragment in table_fragments.fragments.values() { - for actor in &fragment.actors { - if let Some(stream_node) = actor.nodes.as_ref() { - if let Some(NodeBody::Source(ref node)) = stream_node.node_body.as_ref() { - if let Some(node_inner) = &node.source_inner && node_inner.source_id == source_id as u32 { - table_id_to_apply.insert(*table_id); - continue 'table; - } - } - } + if (fragment.get_fragment_type_mask() & FragmentTypeFlag::Source as u32) != 0 { + table_id_to_apply.insert(*table_id); } } } @@ -813,15 +806,16 @@ impl FragmentManager { for fragment in table_fragment.fragments.values_mut() { let mut actor_to_apply = Vec::new(); for actor in &mut fragment.actors { - if let Some(stream_node) = actor.nodes.as_mut() { - if let Some(NodeBody::Source(ref mut node)) = stream_node.node_body.as_mut() - { - if let Some(source_inner) = &mut node.source_inner { - source_inner.rate_limit = rate_limit; + if let Some(node) = actor.nodes.as_mut() { + visit_stream_node(node, |node_body| { + if let NodeBody::Source(ref mut node) = node_body { + if let Some(ref mut node_inner) = node.source_inner && node_inner.source_id == source_id as u32 { + node_inner.rate_limit = rate_limit; + actor_to_apply.push(actor.actor_id); + } } - actor_to_apply.push(actor.actor_id); - } - } + }) + }; } to_apply_fragment.insert(fragment.fragment_id, actor_to_apply); } @@ -856,14 +850,14 @@ impl FragmentManager { if (fragment.get_fragment_type_mask() & FragmentTypeFlag::StreamScan as u32) != 0 { let mut actor_to_apply = Vec::new(); for actor in &mut fragment.actors { - actor.nodes.as_mut().map(|node| { + if let Some(node) = actor.nodes.as_mut() { visit_stream_node(node, |node_body| { if let NodeBody::StreamScan(ref mut node) = node_body { node.rate_limit = rate_limit; actor_to_apply.push(actor.actor_id); } }) - }); + }; } fragment_to_apply.insert(fragment.fragment_id, actor_to_apply); }