diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 94f8cde01c6ec..f54045c6b6e2b 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -851,18 +851,22 @@ impl FragmentManager { .get_mut(table_id) .ok_or_else(|| MetaError::fragment_not_found(table_id))?; let mut fragment_to_apply = HashMap::new(); + for fragment in fragment.fragments.values_mut() { - let mut actor_to_apply = Vec::new(); - for actor in &mut fragment.actors { - if let Some(stream_node) = actor.nodes.as_mut() { - if let Some(NodeBody::StreamScan(ref mut node)) = stream_node.node_body.as_mut() - { - node.rate_limit = rate_limit; - actor_to_apply.push(actor.actor_id); - } + if (fragment.get_fragment_type_mask() & FragmentTypeFlag::StreamScan as u32) != 0 { + let mut actor_to_apply = Vec::new(); + for actor in &mut fragment.actors { + actor.nodes.as_mut().map(|node| { + visit_stream_node(node, |node_body| { + if let NodeBody::StreamScan(ref mut node) = node_body { + node.rate_limit = rate_limit; + actor_to_apply.push(actor.actor_id); + } + }) + }); } + fragment_to_apply.insert(fragment.fragment_id, actor_to_apply); } - fragment_to_apply.insert(fragment.fragment_id, actor_to_apply); } commit_meta!(self, table_fragments)?;