Skip to content

Commit

Permalink
refactor(meta): merge periodic and scheduled barrier and non-async ha…
Browse files Browse the repository at this point in the history
…ndle barrier
  • Loading branch information
wenym1 committed Feb 6, 2024
1 parent 2a98c6e commit ce28905
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 171 deletions.
4 changes: 2 additions & 2 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<RescheduleRequest>,
) -> Result<Response<RescheduleResponse>, Status> {
self.barrier_manager.check_status_running().await?;
self.barrier_manager.check_status_running()?;

let RescheduleRequest {
reschedules,
Expand Down Expand Up @@ -235,7 +235,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<GetReschedulePlanRequest>,
) -> Result<Response<GetReschedulePlanResponse>, Status> {
self.barrier_manager.check_status_running().await?;
self.barrier_manager.check_status_running()?;

let req = request.into_inner();

Expand Down
85 changes: 36 additions & 49 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::mem::take;
use std::sync::Arc;
use std::time::Duration;

use arc_swap::ArcSwap;
use fail::fail_point;
use itertools::Itertools;
use prometheus::HistogramTimer;
Expand All @@ -32,6 +33,7 @@ use risingwave_hummock_sdk::table_watermark::{
};
use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId};
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::PausedReason;
Expand Down Expand Up @@ -134,7 +136,7 @@ struct Scheduled {

#[derive(Clone)]
pub struct GlobalBarrierManagerContext {
status: Arc<Mutex<BarrierManagerStatus>>,
status: Arc<ArcSwap<BarrierManagerStatus>>,

tracker: Arc<Mutex<CreateMviewProgressTracker>>,

Expand Down Expand Up @@ -406,7 +408,7 @@ impl GlobalBarrierManager {
));

let context = GlobalBarrierManagerContext {
status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)),
status: Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))),
metadata_manager,
hummock_manager,
source_manager,
Expand Down Expand Up @@ -480,6 +482,7 @@ impl GlobalBarrierManager {
let interval = Duration::from_millis(
self.env.system_params_reader().await.barrier_interval_ms() as u64,
);
self.scheduled_barriers.set_min_interval(interval);
tracing::info!(
"Starting barrier manager with: interval={:?}, enable_recovery={}, in_flight_barrier_nums={}",
interval,
Expand Down Expand Up @@ -517,8 +520,7 @@ impl GlobalBarrierManager {
// Even if there's no actor to recover, we still go through the recovery process to
// inject the first `Initial` barrier.
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap))
.await;
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap));
let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0);

let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
Expand All @@ -530,10 +532,8 @@ impl GlobalBarrierManager {
.await
};

self.context.set_status(BarrierManagerStatus::Running).await;
self.context.set_status(BarrierManagerStatus::Running);

let mut min_interval = tokio::time::interval(interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();
self.env
Expand All @@ -556,11 +556,7 @@ impl GlobalBarrierManager {
let notification = notification.unwrap();
// Handle barrier interval and checkpoint frequency changes
if let LocalNotification::SystemParamsChange(p) = &notification {
let new_interval = Duration::from_millis(p.barrier_interval_ms() as u64);
if new_interval != min_interval.period() {
min_interval = tokio::time::interval(new_interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
}
self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
self.scheduled_barriers
.set_checkpoint_frequency(p.checkpoint_frequency() as usize)
}
Expand All @@ -572,42 +568,38 @@ impl GlobalBarrierManager {
)
.await;
}

// There's barrier scheduled.
_ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
min_interval.reset(); // Reset the interval as we have a new barrier.
self.handle_new_barrier().await;
}
// Minimum interval reached.
_ = min_interval.tick(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
self.handle_new_barrier().await;
scheduled = self.scheduled_barriers.next_barrier(),
if self
.checkpoint_control
.can_inject_barrier(self.in_flight_barrier_nums) => {
let all_nodes = self
.context
.metadata_manager
.list_active_streaming_compute_nodes()
.await
.unwrap();
self.handle_new_barrier(scheduled, all_nodes);
}
}
self.checkpoint_control.update_barrier_nums_metrics();
}
}

/// Handle the new barrier from the scheduled queue and inject it.
async fn handle_new_barrier(&mut self) {
assert!(self
.checkpoint_control
.can_inject_barrier(self.in_flight_barrier_nums));

fn handle_new_barrier(
&mut self,
scheduled: Scheduled,
nodes: impl IntoIterator<Item = WorkerNode>,
) {
let Scheduled {
command,
mut notifiers,
send_latency_timer,
checkpoint,
span,
} = self.scheduled_barriers.pop_or_default().await;
} = scheduled;

let all_nodes = self
.context
.metadata_manager
.list_active_streaming_compute_nodes()
.await
.unwrap();
self.state.resolve_worker_nodes(all_nodes);
self.state.resolve_worker_nodes(nodes);
let info = self.state.apply_command(&command);

let (prev_epoch, curr_epoch) = self.state.next_epoch_pair();
Expand All @@ -629,15 +621,12 @@ impl GlobalBarrierManager {
command,
kind,
self.context.clone(),
span.clone(),
span,
));

send_latency_timer.observe_duration();

self.rpc_manager
.inject_barrier(command_ctx.clone())
.instrument(span)
.await;
self.rpc_manager.inject_barrier(command_ctx.clone());

// Notify about the injection.
let prev_paused_reason = self.state.paused_reason();
Expand All @@ -649,7 +638,7 @@ impl GlobalBarrierManager {
prev_paused_reason,
curr_paused_reason,
};
notifiers.iter_mut().for_each(|n| n.notify_injected(info));
notifiers.iter_mut().for_each(|n| n.notify_started(info));

// Update the paused state after the barrier is injected.
self.state.set_paused_reason(curr_paused_reason);
Expand Down Expand Up @@ -728,8 +717,7 @@ impl GlobalBarrierManager {
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover(
err.clone(),
)))
.await;
)));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch
let span = tracing::info_span!(
Expand All @@ -745,7 +733,7 @@ impl GlobalBarrierManager {
.recovery(prev_epoch, None, &self.scheduled_barriers)
.instrument(span)
.await;
self.context.set_status(BarrierManagerStatus::Running).await;
self.context.set_status(BarrierManagerStatus::Running);
} else {
panic!("failed to execute barrier: {}", err.as_report());
}
Expand Down Expand Up @@ -897,9 +885,9 @@ impl GlobalBarrierManager {

impl GlobalBarrierManagerContext {
/// Check the status of barrier manager, return error if it is not `Running`.
pub async fn check_status_running(&self) -> MetaResult<()> {
let status = self.status.lock().await;
match &*status {
pub fn check_status_running(&self) -> MetaResult<()> {
let status = self.status.load();
match &**status {
BarrierManagerStatus::Starting
| BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
bail!("The cluster is bootstrapping")
Expand All @@ -912,9 +900,8 @@ impl GlobalBarrierManagerContext {
}

/// Set barrier manager status.
async fn set_status(&self, new_status: BarrierManagerStatus) {
let mut status = self.status.lock().await;
*status = new_status;
fn set_status(&self, new_status: BarrierManagerStatus) {
self.status.store(Arc::new(new_status));
}

/// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/barrier/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pub struct BarrierInfo {
/// Used for notifying the status of a scheduled command/barrier.
#[derive(Debug, Default)]
pub(crate) struct Notifier {
/// Get notified when scheduled barrier is injected to compute nodes.
pub injected: Option<oneshot::Sender<BarrierInfo>>,
/// Get notified when scheduled barrier has started to be handled.
pub started: Option<oneshot::Sender<BarrierInfo>>,

/// Get notified when scheduled barrier is collected or failed.
pub collected: Option<oneshot::Sender<MetaResult<()>>>,
Expand All @@ -43,8 +43,8 @@ pub(crate) struct Notifier {

impl Notifier {
/// Notify when we have injected a barrier to compute nodes.
pub fn notify_injected(&mut self, info: BarrierInfo) {
if let Some(tx) = self.injected.take() {
pub fn notify_started(&mut self, info: BarrierInfo) {
if let Some(tx) = self.started.take() {
tx.send(info).ok();
}
}
Expand Down
17 changes: 9 additions & 8 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,7 @@ impl GlobalBarrierManagerContext {
&self,
scheduled_barriers: &ScheduledBarriers,
) -> MetaResult<bool> {
let (dropped_actors, cancelled) =
scheduled_barriers.pre_apply_drop_cancel_scheduled().await;
let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled();
let applied = !dropped_actors.is_empty() || !cancelled.is_empty();
if !cancelled.is_empty() {
match &self.metadata_manager {
Expand Down Expand Up @@ -337,9 +336,7 @@ impl GlobalBarrierManagerContext {
scheduled_barriers: &ScheduledBarriers,
) -> BarrierManagerState {
// Mark blocked and abort buffered schedules, they might be dirty already.
scheduled_barriers
.abort_and_mark_blocked("cluster is under recovering")
.await;
scheduled_barriers.abort_and_mark_blocked("cluster is under recovering");

tracing::info!("recovery start!");
self.clean_dirty_streaming_jobs()
Expand Down Expand Up @@ -437,8 +434,12 @@ impl GlobalBarrierManagerContext {
command_ctx.wait_epoch_commit(mce).await?;
}
};
let await_barrier_complete = self.inject_barrier(command_ctx.clone()).await;
let res = match await_barrier_complete.await.result {

let res = match self
.inject_barrier(command_ctx.clone(), None, None)
.await
.result
{
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
warn!(error = %err.as_report(), "post_collect failed");
Expand Down Expand Up @@ -467,7 +468,7 @@ impl GlobalBarrierManagerContext {
.expect("Retry until recovery success.");

recovery_timer.observe_duration();
scheduled_barriers.mark_ready().await;
scheduled_barriers.mark_ready();

tracing::info!(
epoch = state.in_flight_prev_epoch().value().0,
Expand Down
Loading

0 comments on commit ce28905

Please sign in to comment.