Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 21, 2024
1 parent 9e5f205 commit ea42c29
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 131 deletions.
3 changes: 2 additions & 1 deletion src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,8 @@ impl LocalBarrierWorker {
let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one

if let Some(actor_state) = self.state.actor_states.get(&actor_id)
&& !actor_state.inflight_barriers.is_empty()
&& let Some(inflight_barriers) = actor_state.inflight_barriers()
&& !inflight_barriers.is_empty()
{
self.control_stream_handle.reset_stream_with_err(
anyhow!(root_err)
Expand Down
Loading

0 comments on commit ea42c29

Please sign in to comment.