From 734186351cc1f7c8147ba33815dcfd1ba1255d64 Mon Sep 17 00:00:00 2001 From: William Wen Date: Sun, 4 Feb 2024 22:39:10 +0800 Subject: [PATCH] reset scheduled barrier --- src/meta/src/barrier/mod.rs | 25 ++++++++--- src/meta/src/barrier/schedule.rs | 74 ++++++++++---------------------- 2 files changed, 41 insertions(+), 58 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 9addc79389c9a..cc306f67904e5 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -490,7 +490,6 @@ impl GlobalBarrierManager { let interval = Duration::from_millis( self.env.system_params_reader().await.barrier_interval_ms() as u64, ); - self.scheduled_barriers.set_min_interval(interval); tracing::info!( "Starting barrier manager with: interval={:?}, enable_recovery={}, in_flight_barrier_nums={}", interval, @@ -541,6 +540,8 @@ impl GlobalBarrierManager { self.context.set_status(BarrierManagerStatus::Running); + let mut min_interval = tokio::time::interval(interval); + min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel(); self.env @@ -568,7 +569,11 @@ impl GlobalBarrierManager { let notification = notification.unwrap(); // Handle barrier interval and checkpoint frequency changes if let LocalNotification::SystemParamsChange(p) = ¬ification { - self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64)); + let new_interval = Duration::from_millis(p.barrier_interval_ms() as u64); + if new_interval != min_interval.period() { + min_interval = tokio::time::interval(new_interval); + min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + } self.scheduled_barriers .set_checkpoint_frequency(p.checkpoint_frequency() as usize) } @@ -580,11 +585,16 @@ impl GlobalBarrierManager { ) .await; } - scheduled = self.scheduled_barriers.next_barrier(), - if self - .checkpoint_control - .can_inject_barrier(self.in_flight_barrier_nums) => { - self.active_streaming_nodes.sync().await; + + // There's barrier scheduled. + _ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { + min_interval.reset(); // Reset the interval as we have a new barrier. + let scheduled = self.scheduled_barriers.pop_or_default().await; + self.handle_new_barrier(scheduled).await; + } + // Minimum interval reached. + _ = min_interval.tick(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { + let scheduled = self.scheduled_barriers.pop_or_default().await; self.handle_new_barrier(scheduled).await; } } @@ -602,6 +612,7 @@ impl GlobalBarrierManager { span, } = scheduled; + self.active_streaming_nodes.sync().await; self.state .resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned()); let info = self.state.apply_command(&command); diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index bb989d13de7f9..9fd5bc81f7466 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -13,20 +13,16 @@ // limitations under the License. use std::collections::{HashSet, VecDeque}; -use std::future::pending; use std::iter::once; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use anyhow::{anyhow, Context}; use assert_matches::assert_matches; use risingwave_common::catalog::TableId; use risingwave_pb::hummock::HummockSnapshot; use risingwave_pb::meta::PausedReason; -use tokio::select; use tokio::sync::{oneshot, watch, RwLock}; -use tokio::time::Interval; -use tracing::warn; use super::notifier::{BarrierInfo, Notifier}; use super::{Command, Scheduled}; @@ -164,7 +160,6 @@ impl BarrierScheduler { force_checkpoint: false, checkpoint_frequency, inner, - min_interval: None, }, ) } @@ -338,8 +333,6 @@ impl BarrierScheduler { /// The receiver side of the barrier scheduling queue. /// Held by the [`super::GlobalBarrierManager`] to execute these commands. pub struct ScheduledBarriers { - min_interval: Option, - /// Force checkpoint in next barrier. force_checkpoint: bool, @@ -350,59 +343,38 @@ pub struct ScheduledBarriers { } impl ScheduledBarriers { - pub(super) fn set_min_interval(&mut self, min_interval: Duration) { - let set_new_interval = match &self.min_interval { - None => true, - Some(prev_min_interval) => min_interval != prev_min_interval.period(), - }; - if set_new_interval { - let mut min_interval = tokio::time::interval(min_interval); - min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - self.min_interval = Some(min_interval); - } - } - - pub(super) async fn next_barrier(&mut self) -> Scheduled { - let scheduled = select! { - biased; - mut scheduled = async { - loop { - let mut rx = self.inner.changed_tx.subscribe(); - let mut queue = self.inner.queue.write().await; - if queue.queue.is_empty() { - rx.changed().await.unwrap(); - } else { - let scheduled = queue.queue.pop_front().expect("non-empty"); - - break scheduled; - } - } - } => { - if let Some(min_interval) = &mut self.min_interval { - min_interval.reset(); - } - let checkpoint = self.try_get_checkpoint(); + /// Pop a scheduled barrier from the queue, or a default checkpoint barrier if not exists. + pub(super) async fn pop_or_default(&mut self) -> Scheduled { + let mut queue = self.inner.queue.write().await; + let checkpoint = self.try_get_checkpoint(); + let scheduled = match queue.queue.pop_front() { + Some(mut scheduled) => { scheduled.checkpoint = scheduled.checkpoint || checkpoint; scheduled - }, - _ = async { - match &mut self.min_interval { - Some(min_interval) => min_interval.tick().await, - None => { - warn!("min interval not set and pending"); - pending::<_>().await - } - } - } => { - let checkpoint = self.try_get_checkpoint(); + } + None => { + // If no command scheduled, create a periodic barrier by default. self.inner .new_scheduled(checkpoint, Command::barrier(), std::iter::empty()) } }; + drop(queue); self.update_num_uncheckpointed_barrier(scheduled.checkpoint); scheduled } + /// Wait for at least one scheduled barrier in the queue. + pub(super) async fn wait_one(&self) { + let queue = self.inner.queue.read().await; + if queue.len() > 0 { + return; + } + let mut rx = self.inner.changed_tx.subscribe(); + drop(queue); + + rx.changed().await.unwrap(); + } + /// Mark command scheduler as blocked and abort all queued scheduled command and notify with /// specific reason. pub(super) async fn abort_and_mark_blocked(&self, reason: impl Into + Copy) {