Skip to content

Commit

Permalink
fix throttle source
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion committed Nov 15, 2023
1 parent 9bb7f6b commit 08f02bc
Showing 1 changed file with 14 additions and 20 deletions.
34 changes: 14 additions & 20 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,17 +785,10 @@ impl FragmentManager {
) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
let map = &mut self.core.write().await.table_fragments;
let mut table_id_to_apply = HashSet::new();
'table: for (table_id, table_fragments) in map.iter() {
for (table_id, table_fragments) in map.iter() {
for fragment in table_fragments.fragments.values() {
for actor in &fragment.actors {
if let Some(stream_node) = actor.nodes.as_ref() {
if let Some(NodeBody::Source(ref node)) = stream_node.node_body.as_ref() {
if let Some(node_inner) = &node.source_inner && node_inner.source_id == source_id as u32 {
table_id_to_apply.insert(*table_id);
continue 'table;
}
}
}
if (fragment.get_fragment_type_mask() & FragmentTypeFlag::Source as u32) != 0 {
table_id_to_apply.insert(*table_id);
}
}
}
Expand All @@ -813,15 +806,16 @@ impl FragmentManager {
for fragment in table_fragment.fragments.values_mut() {
let mut actor_to_apply = Vec::new();
for actor in &mut fragment.actors {
if let Some(stream_node) = actor.nodes.as_mut() {
if let Some(NodeBody::Source(ref mut node)) = stream_node.node_body.as_mut()
{
if let Some(source_inner) = &mut node.source_inner {
source_inner.rate_limit = rate_limit;
if let Some(node) = actor.nodes.as_mut() {
visit_stream_node(node, |node_body| {
if let NodeBody::Source(ref mut node) = node_body {
if let Some(ref mut node_inner) = node.source_inner && node_inner.source_id == source_id as u32 {
node_inner.rate_limit = rate_limit;
actor_to_apply.push(actor.actor_id);
}
}
actor_to_apply.push(actor.actor_id);
}
}
})
};
}
to_apply_fragment.insert(fragment.fragment_id, actor_to_apply);
}
Expand Down Expand Up @@ -856,14 +850,14 @@ impl FragmentManager {
if (fragment.get_fragment_type_mask() & FragmentTypeFlag::StreamScan as u32) != 0 {
let mut actor_to_apply = Vec::new();
for actor in &mut fragment.actors {
actor.nodes.as_mut().map(|node| {
if let Some(node) = actor.nodes.as_mut() {
visit_stream_node(node, |node_body| {
if let NodeBody::StreamScan(ref mut node) = node_body {
node.rate_limit = rate_limit;
actor_to_apply.push(actor.actor_id);
}
})
});
};
}
fragment_to_apply.insert(fragment.fragment_id, actor_to_apply);
}
Expand Down

0 comments on commit 08f02bc

Please sign in to comment.