Skip to content

Commit

Permalink
simplify actor state
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 25, 2024
1 parent 6dbb67f commit 043f77d
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 298 deletions.
5 changes: 5 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,14 @@ message StreamingControlStreamRequest {
uint64 prev_epoch = 2;
}

message RemovePartialGraphRequest {
repeated uint32 partial_graph_ids = 1;
}

oneof request {
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
RemovePartialGraphRequest remove_partial_graph = 3;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,7 @@ mod tests {
),
}));
}
streaming_control_stream_request::Request::RemovePartialGraph(..) => {}
}
}
});
Expand Down
26 changes: 24 additions & 2 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,12 @@ impl LocalBarrierWorker {
self.send_barrier(barrier, req.graph_info)?;
Ok(())
}
Request::RemovePartialGraph(req) => {
self.remove_partial_graphs(
req.partial_graph_ids.into_iter().map(PartialGraphId::new),
);
Ok(())
}
Request::Init(_) => {
unreachable!()
}
Expand Down Expand Up @@ -767,6 +773,23 @@ impl LocalBarrierWorker {
Ok(())
}

fn remove_partial_graphs(&mut self, partial_graph_ids: impl Iterator<Item = PartialGraphId>) {
for partial_graph_id in partial_graph_ids {
if let Some(graph) = self.state.graph_states.remove(&partial_graph_id) {
assert!(
graph.is_empty(),
"non empty graph to be removed: {}",
&graph
);
} else {
warn!(
partial_graph_id = partial_graph_id.0,
"no partial graph to remove"
);
}
}
}

/// Reset all internal states.
pub(super) fn reset_state(&mut self) {
*self = Self::new(self.actor_manager.clone());
Expand All @@ -785,8 +808,7 @@ impl LocalBarrierWorker {
let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one

if let Some(actor_state) = self.state.actor_states.get(&actor_id)
&& let Some(inflight_barriers) = actor_state.inflight_barriers()
&& !inflight_barriers.is_empty()
&& !actor_state.inflight_barriers.is_empty()
{
self.control_stream_handle.reset_stream_with_err(
anyhow!(root_err)
Expand Down
Loading

0 comments on commit 043f77d

Please sign in to comment.