Skip to content

Commit

Permalink
reset scheduled barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 4, 2024
1 parent ac40aa0 commit 7341863
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 58 deletions.
25 changes: 18 additions & 7 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,6 @@ impl GlobalBarrierManager {
let interval = Duration::from_millis(
self.env.system_params_reader().await.barrier_interval_ms() as u64,
);
self.scheduled_barriers.set_min_interval(interval);
tracing::info!(
"Starting barrier manager with: interval={:?}, enable_recovery={}, in_flight_barrier_nums={}",
interval,
Expand Down Expand Up @@ -541,6 +540,8 @@ impl GlobalBarrierManager {

self.context.set_status(BarrierManagerStatus::Running);

let mut min_interval = tokio::time::interval(interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();
self.env
Expand Down Expand Up @@ -568,7 +569,11 @@ impl GlobalBarrierManager {
let notification = notification.unwrap();
// Handle barrier interval and checkpoint frequency changes
if let LocalNotification::SystemParamsChange(p) = &notification {
self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
let new_interval = Duration::from_millis(p.barrier_interval_ms() as u64);
if new_interval != min_interval.period() {
min_interval = tokio::time::interval(new_interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
}
self.scheduled_barriers
.set_checkpoint_frequency(p.checkpoint_frequency() as usize)
}
Expand All @@ -580,11 +585,16 @@ impl GlobalBarrierManager {
)
.await;
}
scheduled = self.scheduled_barriers.next_barrier(),
if self
.checkpoint_control
.can_inject_barrier(self.in_flight_barrier_nums) => {
self.active_streaming_nodes.sync().await;

// There's barrier scheduled.
_ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
min_interval.reset(); // Reset the interval as we have a new barrier.
let scheduled = self.scheduled_barriers.pop_or_default().await;
self.handle_new_barrier(scheduled).await;
}
// Minimum interval reached.
_ = min_interval.tick(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
let scheduled = self.scheduled_barriers.pop_or_default().await;
self.handle_new_barrier(scheduled).await;
}
}
Expand All @@ -602,6 +612,7 @@ impl GlobalBarrierManager {
span,
} = scheduled;

self.active_streaming_nodes.sync().await;
self.state
.resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned());
let info = self.state.apply_command(&command);
Expand Down
74 changes: 23 additions & 51 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,16 @@
// limitations under the License.

use std::collections::{HashSet, VecDeque};
use std::future::pending;
use std::iter::once;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;

use anyhow::{anyhow, Context};
use assert_matches::assert_matches;
use risingwave_common::catalog::TableId;
use risingwave_pb::hummock::HummockSnapshot;
use risingwave_pb::meta::PausedReason;
use tokio::select;
use tokio::sync::{oneshot, watch, RwLock};
use tokio::time::Interval;
use tracing::warn;

use super::notifier::{BarrierInfo, Notifier};
use super::{Command, Scheduled};
Expand Down Expand Up @@ -164,7 +160,6 @@ impl BarrierScheduler {
force_checkpoint: false,
checkpoint_frequency,
inner,
min_interval: None,
},
)
}
Expand Down Expand Up @@ -338,8 +333,6 @@ impl BarrierScheduler {
/// The receiver side of the barrier scheduling queue.
/// Held by the [`super::GlobalBarrierManager`] to execute these commands.
pub struct ScheduledBarriers {
min_interval: Option<Interval>,

/// Force checkpoint in next barrier.
force_checkpoint: bool,

Expand All @@ -350,59 +343,38 @@ pub struct ScheduledBarriers {
}

impl ScheduledBarriers {
pub(super) fn set_min_interval(&mut self, min_interval: Duration) {
let set_new_interval = match &self.min_interval {
None => true,
Some(prev_min_interval) => min_interval != prev_min_interval.period(),
};
if set_new_interval {
let mut min_interval = tokio::time::interval(min_interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
self.min_interval = Some(min_interval);
}
}

pub(super) async fn next_barrier(&mut self) -> Scheduled {
let scheduled = select! {
biased;
mut scheduled = async {
loop {
let mut rx = self.inner.changed_tx.subscribe();
let mut queue = self.inner.queue.write().await;
if queue.queue.is_empty() {
rx.changed().await.unwrap();
} else {
let scheduled = queue.queue.pop_front().expect("non-empty");

break scheduled;
}
}
} => {
if let Some(min_interval) = &mut self.min_interval {
min_interval.reset();
}
let checkpoint = self.try_get_checkpoint();
/// Pop a scheduled barrier from the queue, or a default checkpoint barrier if not exists.
pub(super) async fn pop_or_default(&mut self) -> Scheduled {
let mut queue = self.inner.queue.write().await;
let checkpoint = self.try_get_checkpoint();
let scheduled = match queue.queue.pop_front() {
Some(mut scheduled) => {
scheduled.checkpoint = scheduled.checkpoint || checkpoint;
scheduled
},
_ = async {
match &mut self.min_interval {
Some(min_interval) => min_interval.tick().await,
None => {
warn!("min interval not set and pending");
pending::<_>().await
}
}
} => {
let checkpoint = self.try_get_checkpoint();
}
None => {
// If no command scheduled, create a periodic barrier by default.
self.inner
.new_scheduled(checkpoint, Command::barrier(), std::iter::empty())
}
};
drop(queue);
self.update_num_uncheckpointed_barrier(scheduled.checkpoint);
scheduled
}

/// Wait for at least one scheduled barrier in the queue.
pub(super) async fn wait_one(&self) {
let queue = self.inner.queue.read().await;
if queue.len() > 0 {
return;
}
let mut rx = self.inner.changed_tx.subscribe();
drop(queue);

rx.changed().await.unwrap();
}

/// Mark command scheduler as blocked and abort all queued scheduled command and notify with
/// specific reason.
pub(super) async fn abort_and_mark_blocked(&self, reason: impl Into<String> + Copy) {
Expand Down

0 comments on commit 7341863

Please sign in to comment.