Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 29, 2024
1 parent e76c11c commit 76b1244
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ impl LocalBarrierWorker {
let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one

if let Some(actor_state) = self.state.actor_states.get(&actor_id)
&& !actor_state.inflight_barriers.is_empty()
&& (!actor_state.inflight_barriers.is_empty() || !actor_state.is_running())
{
self.control_stream_handle.reset_stream_with_err(
anyhow!(root_err)
Expand Down
40 changes: 33 additions & 7 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,27 @@ impl Display for &'_ PartialGraphManagedBarrierState {
}
}

enum InflightActorStatus {
/// The actor is just spawned and not issued any barrier yet
NotStarted,
/// The actor has been issued some barriers, and not issued any stop barrier yet
Running,
/// The actor has been issued a stop barrier
Stopping,
}

impl InflightActorStatus {
pub(super) fn is_stopping(&self) -> bool {
matches!(self, InflightActorStatus::Stopping)
}
}

pub(crate) struct InflightActorState {
pending_subscribers: BTreeMap<u64, Vec<mpsc::UnboundedSender<SubscribeMutationItem>>>,
started_subscribers: Vec<mpsc::UnboundedSender<SubscribeMutationItem>>,
/// `prev_epoch` -> partial graph id
pub(super) inflight_barriers: BTreeMap<u64, (PartialGraphId, Option<Arc<Mutation>>)>,
/// Whether the actor has been issued a stop barrier
is_stopping: bool,
status: InflightActorStatus,
}

impl InflightActorState {
Expand All @@ -206,7 +220,7 @@ impl InflightActorState {
pending_subscribers: Default::default(),
started_subscribers: vec![],
inflight_barriers: BTreeMap::default(),
is_stopping: false,
status: InflightActorStatus::NotStarted,
}
}

Expand All @@ -216,7 +230,6 @@ impl InflightActorState {
barrier: &Barrier,
is_stop: bool,
) {
assert!(!self.is_stopping, "stopped actor should not issue barrier");
if let Some((first_epoch, _)) = self.pending_subscribers.first_key_value() {
assert!(
*first_epoch >= barrier.epoch.prev,
Expand All @@ -242,10 +255,16 @@ impl InflightActorState {
barrier.epoch.prev,
(partial_graph_id, barrier.mutation.clone()),
);
self.is_stopping = is_stop;
if is_stop {
assert!(self.pending_subscribers.is_empty());
self.started_subscribers.clear();
assert!(
!self.status.is_stopping(),
"stopped actor should not issue barrier"
);
self.status = InflightActorStatus::Stopping;
} else {
self.status = InflightActorStatus::Running;
}
}

Expand All @@ -255,9 +274,13 @@ impl InflightActorState {
assert_eq!(prev_epoch, epoch.prev);
(
prev_partial_graph_id,
self.inflight_barriers.is_empty() && self.is_stopping,
self.inflight_barriers.is_empty() && self.status.is_stopping(),
)
}

pub(super) fn is_running(&self) -> bool {
matches!(&self.status, InflightActorStatus::Running)
}
}

pub(super) struct PartialGraphManagedBarrierState {
Expand Down Expand Up @@ -368,7 +391,8 @@ impl InflightActorState {
return;
}
}
if !self.is_stopping {
if !self.status.is_stopping() {
// Only add the subscribers when the actor is not stopped yet.
self.started_subscribers.push(tx);
}
} else {
Expand All @@ -380,6 +404,8 @@ impl InflightActorState {
last_epoch,
start_prev_epoch
);
} else {
assert!(self.status.is_stopping(), "actor has been stopped and has not inflight barrier. unlikely to get further barrier");
}
self.pending_subscribers
.entry(start_prev_epoch)
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/task/barrier_manager/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ impl LocalBarrierWorker {
.entry(epoch.curr)
.or_default()
.insert(actor, state);
} else {
warn!(?epoch, actor, ?state, "ignore create mview progress");
}
}
}
Expand Down

0 comments on commit 76b1244

Please sign in to comment.