Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): merge periodic and scheduled barrier and non-async handle barrier #14977

Merged
merged 4 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<RescheduleRequest>,
) -> Result<Response<RescheduleResponse>, Status> {
self.barrier_manager.check_status_running().await?;
self.barrier_manager.check_status_running()?;

let RescheduleRequest {
reschedules,
Expand Down Expand Up @@ -228,7 +228,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<GetReschedulePlanRequest>,
) -> Result<Response<GetReschedulePlanResponse>, Status> {
self.barrier_manager.check_status_running().await?;
self.barrier_manager.check_status_running()?;

let req = request.into_inner();

Expand Down
66 changes: 24 additions & 42 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::mem::take;
use std::sync::Arc;
use std::time::Duration;

use arc_swap::ArcSwap;
use fail::fail_point;
use itertools::Itertools;
use prometheus::HistogramTimer;
Expand Down Expand Up @@ -137,7 +138,7 @@ struct Scheduled {

#[derive(Clone)]
pub struct GlobalBarrierManagerContext {
status: Arc<Mutex<BarrierManagerStatus>>,
status: Arc<ArcSwap<BarrierManagerStatus>>,

tracker: Arc<Mutex<CreateMviewProgressTracker>>,

Expand Down Expand Up @@ -407,7 +408,7 @@ impl GlobalBarrierManager {
let tracker = CreateMviewProgressTracker::new();

let context = GlobalBarrierManagerContext {
status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)),
status: Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))),
metadata_manager,
hummock_manager,
source_manager,
Expand Down Expand Up @@ -482,6 +483,7 @@ impl GlobalBarrierManager {
let interval = Duration::from_millis(
self.env.system_params_reader().await.barrier_interval_ms() as u64,
);
self.scheduled_barriers.set_min_interval(interval);
tracing::info!(
"Starting barrier manager with: interval={:?}, enable_recovery={}, in_flight_barrier_nums={}",
interval,
Expand Down Expand Up @@ -519,8 +521,7 @@ impl GlobalBarrierManager {
// Even if there's no actor to recover, we still go through the recovery process to
// inject the first `Initial` barrier.
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap))
.await;
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap));
let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0);

let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
Expand All @@ -531,10 +532,8 @@ impl GlobalBarrierManager {
.await;
}

self.context.set_status(BarrierManagerStatus::Running).await;
self.context.set_status(BarrierManagerStatus::Running);

let mut min_interval = tokio::time::interval(interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();
self.env
Expand Down Expand Up @@ -610,11 +609,7 @@ impl GlobalBarrierManager {
let notification = notification.unwrap();
// Handle barrier interval and checkpoint frequency changes
if let LocalNotification::SystemParamsChange(p) = &notification {
let new_interval = Duration::from_millis(p.barrier_interval_ms() as u64);
if new_interval != min_interval.period() {
min_interval = tokio::time::interval(new_interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
}
self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
self.scheduled_barriers
.set_checkpoint_frequency(p.checkpoint_frequency() as usize)
}
Expand All @@ -626,34 +621,26 @@ impl GlobalBarrierManager {
)
.await;
}

// There's barrier scheduled.
_ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
min_interval.reset(); // Reset the interval as we have a new barrier.
self.handle_new_barrier().await;
}
// Minimum interval reached.
_ = min_interval.tick(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
self.handle_new_barrier().await;
scheduled = self.scheduled_barriers.next_barrier(),
if self
.checkpoint_control
.can_inject_barrier(self.in_flight_barrier_nums) => {
self.handle_new_barrier(scheduled);
}
}
self.checkpoint_control.update_barrier_nums_metrics();
}
}

/// Handle the new barrier from the scheduled queue and inject it.
async fn handle_new_barrier(&mut self) {
assert!(self
.checkpoint_control
.can_inject_barrier(self.in_flight_barrier_nums));

fn handle_new_barrier(&mut self, scheduled: Scheduled) {
let Scheduled {
command,
mut notifiers,
send_latency_timer,
checkpoint,
span,
} = self.scheduled_barriers.pop_or_default().await;
} = scheduled;

let info = self.state.apply_command(&command);

Expand All @@ -676,15 +663,12 @@ impl GlobalBarrierManager {
command,
kind,
self.context.clone(),
span.clone(),
span,
));

send_latency_timer.observe_duration();

self.rpc_manager
.inject_barrier(command_ctx.clone())
.instrument(span)
.await;
self.rpc_manager.inject_barrier(command_ctx.clone());

// Notify about the injection.
let prev_paused_reason = self.state.paused_reason();
Expand All @@ -696,7 +680,7 @@ impl GlobalBarrierManager {
prev_paused_reason,
curr_paused_reason,
};
notifiers.iter_mut().for_each(|n| n.notify_injected(info));
notifiers.iter_mut().for_each(|n| n.notify_started(info));

// Update the paused state after the barrier is injected.
self.state.set_paused_reason(curr_paused_reason);
Expand Down Expand Up @@ -775,8 +759,7 @@ impl GlobalBarrierManager {
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover(
err.clone(),
)))
.await;
)));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch
let span = tracing::info_span!(
Expand All @@ -788,7 +771,7 @@ impl GlobalBarrierManager {
// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.recovery(prev_epoch, None).instrument(span).await;
self.context.set_status(BarrierManagerStatus::Running).await;
self.context.set_status(BarrierManagerStatus::Running);
} else {
panic!("failed to execute barrier: {}", err.as_report());
}
Expand Down Expand Up @@ -940,9 +923,9 @@ impl GlobalBarrierManager {

impl GlobalBarrierManagerContext {
/// Check the status of barrier manager, return error if it is not `Running`.
pub async fn check_status_running(&self) -> MetaResult<()> {
let status = self.status.lock().await;
match &*status {
pub fn check_status_running(&self) -> MetaResult<()> {
let status = self.status.load();
match &**status {
BarrierManagerStatus::Starting
| BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
bail!("The cluster is bootstrapping")
Expand All @@ -955,9 +938,8 @@ impl GlobalBarrierManagerContext {
}

/// Set barrier manager status.
async fn set_status(&self, new_status: BarrierManagerStatus) {
let mut status = self.status.lock().await;
*status = new_status;
fn set_status(&self, new_status: BarrierManagerStatus) {
self.status.store(Arc::new(new_status));
}

/// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/barrier/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pub struct BarrierInfo {
/// Used for notifying the status of a scheduled command/barrier.
#[derive(Debug, Default)]
pub(crate) struct Notifier {
/// Get notified when scheduled barrier is injected to compute nodes.
pub injected: Option<oneshot::Sender<BarrierInfo>>,
/// Get notified when scheduled barrier has started to be handled.
pub started: Option<oneshot::Sender<BarrierInfo>>,

/// Get notified when scheduled barrier is collected or failed.
pub collected: Option<oneshot::Sender<MetaResult<()>>>,
Expand All @@ -43,8 +43,8 @@ pub(crate) struct Notifier {

impl Notifier {
/// Notify when we have injected a barrier to compute nodes.
pub fn notify_injected(&mut self, info: BarrierInfo) {
if let Some(tx) = self.injected.take() {
pub fn notify_started(&mut self, info: BarrierInfo) {
if let Some(tx) = self.started.take() {
tx.send(info).ok();
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,7 @@ impl GlobalBarrierManagerContext {
impl GlobalBarrierManager {
/// Pre buffered drop and cancel command, return true if any.
async fn pre_apply_drop_cancel(&self) -> MetaResult<bool> {
let (dropped_actors, cancelled) = self
.scheduled_barriers
.pre_apply_drop_cancel_scheduled()
.await;
let (dropped_actors, cancelled) = self.scheduled_barriers.pre_apply_drop_cancel_scheduled();
let applied = !dropped_actors.is_empty() || !cancelled.is_empty();
if !cancelled.is_empty() {
match &self.context.metadata_manager {
Expand Down Expand Up @@ -335,8 +332,7 @@ impl GlobalBarrierManager {
pub async fn recovery(&mut self, prev_epoch: TracedEpoch, paused_reason: Option<PausedReason>) {
// Mark blocked and abort buffered schedules, they might be dirty already.
self.scheduled_barriers
.abort_and_mark_blocked("cluster is under recovering")
.await;
.abort_and_mark_blocked("cluster is under recovering");

tracing::info!("recovery start!");
self.context
Expand Down Expand Up @@ -465,9 +461,13 @@ impl GlobalBarrierManager {
command_ctx.wait_epoch_commit(mce).await?;
}
};
let await_barrier_complete =
self.context.inject_barrier(command_ctx.clone()).await;
let res = match await_barrier_complete.await.result {

let res = match self
.context
.inject_barrier(command_ctx.clone(), None, None)
.await
.result
{
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
warn!(error = %err.as_report(), "post_collect failed");
Expand Down Expand Up @@ -499,7 +499,7 @@ impl GlobalBarrierManager {
.expect("Retry until recovery success.");

recovery_timer.observe_duration();
self.scheduled_barriers.mark_ready().await;
self.scheduled_barriers.mark_ready();

tracing::info!(
epoch = state.in_flight_prev_epoch().value().0,
Expand Down
74 changes: 50 additions & 24 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::StreamClient;
use rw_futures_util::pending_on_none;
use tokio::sync::oneshot;
use tracing::Instrument;
use uuid::Uuid;

use super::command::CommandContext;
Expand All @@ -47,22 +48,33 @@ pub(super) struct BarrierRpcManager {

/// Futures that await on the completion of barrier.
injected_in_progress_barrier: FuturesUnordered<BarrierCompletionFuture>,

prev_injecting_barrier: Option<oneshot::Receiver<()>>,
}

impl BarrierRpcManager {
pub(super) fn new(context: GlobalBarrierManagerContext) -> Self {
Self {
context,
injected_in_progress_barrier: FuturesUnordered::new(),
prev_injecting_barrier: None,
}
}

pub(super) fn clear(&mut self) {
self.injected_in_progress_barrier = FuturesUnordered::new();
self.prev_injecting_barrier = None;
}

pub(super) async fn inject_barrier(&mut self, command_context: Arc<CommandContext>) {
let await_complete_future = self.context.inject_barrier(command_context).await;
pub(super) fn inject_barrier(&mut self, command_context: Arc<CommandContext>) {
// this is to notify that the barrier has been injected so that the next
// barrier can be injected to avoid out of order barrier injection.
// TODO: can be removed when bidi-stream control in implemented.
let (inject_tx, inject_rx) = oneshot::channel();
let prev_inject_rx = self.prev_injecting_barrier.replace(inject_rx);
let await_complete_future =
self.context
.inject_barrier(command_context, Some(inject_tx), prev_inject_rx);
self.injected_in_progress_barrier
.push(await_complete_future);
}
Expand All @@ -76,35 +88,49 @@ pub(super) type BarrierCompletionFuture = impl Future<Output = BarrierCompletion

impl GlobalBarrierManagerContext {
/// Inject a barrier to all CNs and spawn a task to collect it
pub(super) async fn inject_barrier(
pub(super) fn inject_barrier(
&self,
command_context: Arc<CommandContext>,
inject_tx: Option<oneshot::Sender<()>>,
prev_inject_rx: Option<oneshot::Receiver<()>>,
) -> BarrierCompletionFuture {
let (tx, rx) = oneshot::channel();
let prev_epoch = command_context.prev_epoch.value().0;
let result = self
.stream_rpc_manager
.inject_barrier(command_context.clone())
.await;
match result {
Ok(node_need_collect) => {
// todo: the collect handler should be abort when recovery.
tokio::spawn({
let stream_rpc_manager = self.stream_rpc_manager.clone();
async move {
stream_rpc_manager
.collect_barrier(node_need_collect, command_context, tx)
.await
}
});
let stream_rpc_manager = self.stream_rpc_manager.clone();
// todo: the collect handler should be abort when recovery.
let _join_handle = tokio::spawn(async move {
let span = command_context.span.clone();
if let Some(prev_inject_rx) = prev_inject_rx {
if prev_inject_rx.await.is_err() {
let _ = tx.send(BarrierCompletion {
prev_epoch,
result: Err(anyhow!("prev barrier failed to be injected").into()),
});
return;
}
}
Err(e) => {
let _ = tx.send(BarrierCompletion {
prev_epoch,
result: Err(e),
});
let result = stream_rpc_manager
.inject_barrier(command_context.clone())
.instrument(span.clone())
.await;
match result {
Ok(node_need_collect) => {
if let Some(inject_tx) = inject_tx {
let _ = inject_tx.send(());
}
stream_rpc_manager
.collect_barrier(node_need_collect, command_context, tx)
.instrument(span.clone())
.await;
}
Err(e) => {
let _ = tx.send(BarrierCompletion {
prev_epoch,
result: Err(e),
});
}
}
}
});
rx.map(move |result| match result {
Ok(completion) => completion,
Err(_e) => BarrierCompletion {
Expand Down
Loading
Loading