Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: local barrier manager use prev epoch to track barrier state #14436

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 47 additions & 34 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use anyhow::anyhow;
use prometheus::HistogramTimer;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;

use self::managed_state::ManagedBarrierState;
use crate::error::{StreamError, StreamResult};
use crate::error::{IntoUnexpectedExit, StreamError, StreamResult};
use crate::task::ActorId;

mod managed_state;
Expand All @@ -33,6 +32,7 @@ mod progress;
mod tests;

pub use progress::CreateMviewProgress;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_storage::StateStoreImpl;

use crate::executor::monitor::StreamingMetrics;
Expand Down Expand Up @@ -61,8 +61,8 @@ enum LocalBarrierEvent {
},
InjectBarrier {
barrier: Barrier,
actor_ids_to_send: Vec<ActorId>,
actor_ids_to_collect: Vec<ActorId>,
actor_ids_to_send: HashSet<ActorId>,
actor_ids_to_collect: HashSet<ActorId>,
result_sender: oneshot::Sender<StreamResult<()>>,
},
Reset,
Expand Down Expand Up @@ -92,34 +92,33 @@ enum LocalBarrierEvent {
/// barriers to and collect them from all actors, and finally report the progress.
struct LocalBarrierWorker {
/// Stores all streaming job source sender.
senders: HashMap<ActorId, Vec<UnboundedSender<Barrier>>>,
barrier_senders: HashMap<ActorId, Vec<UnboundedSender<Barrier>>>,

/// Current barrier collection state.
state: ManagedBarrierState,

/// Save collect `CompleteReceiver`.
collect_complete_receiver: HashMap<u64, CompleteReceiver>,

streaming_metrics: Arc<StreamingMetrics>,
/// Record all unexpected exited actors.
failure_actors: HashMap<ActorId, StreamError>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we wanna keep this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be consistent with the original logic. We can save the error and early return the error when handling inject_barrier.

}

/// Information used after collection.
pub struct CompleteReceiver {
/// Notify all actors of completion of collection.
pub complete_receiver: Option<Receiver<StreamResult<CollectResult>>>,
/// `barrier_inflight_timer`'s metrics.
pub barrier_inflight_timer: HistogramTimer,
/// The kind of barrier.
pub kind: BarrierKind,
}

impl LocalBarrierWorker {
fn new(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
Self {
senders: HashMap::new(),
state: ManagedBarrierState::new(state_store),
barrier_senders: HashMap::new(),
failure_actors: HashMap::default(),
state: ManagedBarrierState::new(state_store, streaming_metrics),
collect_complete_receiver: HashMap::default(),
streaming_metrics,
}
}

Expand All @@ -135,12 +134,8 @@ impl LocalBarrierWorker {
actor_ids_to_collect,
result_sender,
} => {
let timer = self
.streaming_metrics
.barrier_inflight_latency
.start_timer();
let result =
self.send_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect, timer);
self.send_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect);
let _ = result_sender.send(result).inspect_err(|e| {
warn!(err=?e, "fail to send inject barrier result");
});
Expand Down Expand Up @@ -184,20 +179,20 @@ impl LocalBarrierWorker {
actor_id = actor_id,
"register sender"
);
self.senders.entry(actor_id).or_default().push(sender);
self.barrier_senders
.entry(actor_id)
.or_default()
.push(sender);
}

/// Broadcast a barrier to all senders. Save a receiver which will get notified when this
/// barrier is finished, in managed mode.
fn send_barrier(
&mut self,
barrier: &Barrier,
actor_ids_to_send: impl IntoIterator<Item = ActorId>,
actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
timer: HistogramTimer,
to_send: HashSet<ActorId>,
to_collect: HashSet<ActorId>,
) -> StreamResult<()> {
let to_send: HashSet<ActorId> = actor_ids_to_send.into_iter().collect();
let to_collect: HashSet<ActorId> = actor_ids_to_collect.into_iter().collect();
debug!(
target: "events::stream::barrier::manager::send",
"send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}",
Expand All @@ -209,11 +204,20 @@ impl LocalBarrierWorker {
// There must be some actors to collect from.
assert!(!to_collect.is_empty());

for actor_id in &to_collect {
if let Some(e) = self.failure_actors.get(actor_id) {
// The failure actors could exit before the barrier is issued, while their
// up-downstream actors could be stuck somehow. Return error directly to trigger the
// recovery.
return Err(e.clone());
}
}

let (tx, rx) = oneshot::channel();
self.state.transform_to_issued(barrier, to_collect, tx)?;
self.state.transform_to_issued(barrier, to_collect, tx);

for actor_id in to_send {
match self.senders.get(&actor_id) {
match self.barrier_senders.get(&actor_id) {
Some(senders) => {
for sender in senders {
if let Err(_err) = sender.send(barrier.clone()) {
Expand Down Expand Up @@ -244,15 +248,14 @@ impl LocalBarrierWorker {
actors
);
for actor in actors {
self.senders.remove(actor);
self.barrier_senders.remove(actor);
}
}

self.collect_complete_receiver.insert(
barrier.epoch.prev,
CompleteReceiver {
complete_receiver: Some(rx),
barrier_inflight_timer: timer,
kind: barrier.kind,
},
);
Expand All @@ -277,10 +280,10 @@ impl LocalBarrierWorker {

/// Reset all internal states.
fn reset(&mut self) {
self.senders.clear();
self.collect_complete_receiver.clear();

self.state.clear_all_states();
*self = Self::new(
self.state.state_store.clone(),
self.state.streaming_metrics.clone(),
);
}

/// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
Expand All @@ -292,7 +295,15 @@ impl LocalBarrierWorker {
/// When a actor exit unexpectedly, it should report this event using this function, so meta
/// will notice actor's exit while collecting.
fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) {
self.state.notify_failure(actor_id, err)
let err = err.into_unexpected_exit(actor_id);
if let Some(prev_err) = self.failure_actors.insert(actor_id, err.clone()) {
warn!(?actor_id, prev_err = %prev_err.as_report(), "actor error overwritten");
}
for (fail_epoch, notifier) in self.state.notifiers_await_on_actor(actor_id) {
if notifier.send(Err(err.clone())).is_err() {
warn!(?fail_epoch, ?actor_id, err = ?err.as_report(), "fail to notify actor failure");
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

Expand Down Expand Up @@ -340,7 +351,8 @@ impl LocalBarrierManager {
actor_ids_to_collect: actor_ids_to_collect.into_iter().collect(),
result_sender: tx,
});
rx.await.expect("should receive response")
rx.await
.map_err(|_| anyhow!("barrier manager maybe reset"))?
}

/// Use `prev_epoch` to remove collect rx and return rx.
Expand All @@ -350,7 +362,8 @@ impl LocalBarrierManager {
epoch: prev_epoch,
result_sender: tx,
});
rx.await.expect("should receive response")
rx.await
.map_err(|_| anyhow!("barrier manager maybe reset"))?
}

/// Reset all internal states.
Expand Down
Loading
Loading