Skip to content

Commit

Permalink
fix throttle mv
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion committed Nov 15, 2023
1 parent 731d5dd commit 9bb7f6b
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,18 +851,22 @@ impl FragmentManager {
.get_mut(table_id)
.ok_or_else(|| MetaError::fragment_not_found(table_id))?;
let mut fragment_to_apply = HashMap::new();

for fragment in fragment.fragments.values_mut() {
let mut actor_to_apply = Vec::new();
for actor in &mut fragment.actors {
if let Some(stream_node) = actor.nodes.as_mut() {
if let Some(NodeBody::StreamScan(ref mut node)) = stream_node.node_body.as_mut()
{
node.rate_limit = rate_limit;
actor_to_apply.push(actor.actor_id);
}
if (fragment.get_fragment_type_mask() & FragmentTypeFlag::StreamScan as u32) != 0 {
let mut actor_to_apply = Vec::new();
for actor in &mut fragment.actors {
actor.nodes.as_mut().map(|node| {
visit_stream_node(node, |node_body| {
if let NodeBody::StreamScan(ref mut node) = node_body {
node.rate_limit = rate_limit;
actor_to_apply.push(actor.actor_id);
}
})
});
}
fragment_to_apply.insert(fragment.fragment_id, actor_to_apply);
}
fragment_to_apply.insert(fragment.fragment_id, actor_to_apply);
}

commit_meta!(self, table_fragments)?;
Expand Down

0 comments on commit 9bb7f6b

Please sign in to comment.