diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index f599456f94bea..c481b756b1828 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{HashSet, VecDeque}; -use std::future::pending; use std::iter::once; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -27,7 +26,6 @@ use risingwave_pb::meta::PausedReason; use tokio::select; use tokio::sync::{oneshot, watch}; use tokio::time::Interval; -use tracing::warn; use super::notifier::{BarrierInfo, Notifier}; use super::{Command, Scheduled}; @@ -364,37 +362,17 @@ impl ScheduledBarriers { } pub(super) async fn next_barrier(&mut self) -> Scheduled { + let checkpoint = self.try_get_checkpoint(); let scheduled = select! { biased; - mut scheduled = async { - loop { - let mut rx = self.inner.changed_tx.subscribe(); - { - let mut queue = self.inner.queue.lock(); - if let Some(scheduled) = queue.queue.pop_front() { - break scheduled - } - } - rx.changed().await.unwrap(); - } - } => { + mut scheduled = self.inner.next_scheduled() => { if let Some(min_interval) = &mut self.min_interval { min_interval.reset(); } - let checkpoint = self.try_get_checkpoint(); scheduled.checkpoint = scheduled.checkpoint || checkpoint; scheduled }, - _ = async { - match &mut self.min_interval { - Some(min_interval) => min_interval.tick().await, - None => { - warn!("min interval not set and pending"); - pending::<_>().await - } - } - } => { - let checkpoint = self.try_get_checkpoint(); + _ = self.min_interval.as_mut().expect("should have set min interval").tick() => { self.inner .new_scheduled(checkpoint, Command::barrier(), std::iter::empty()) } @@ -402,7 +380,24 @@ impl ScheduledBarriers { self.update_num_uncheckpointed_barrier(scheduled.checkpoint); scheduled } +} + +impl Inner { + async fn next_scheduled(&self) -> Scheduled { + loop { + let mut rx = self.changed_tx.subscribe(); + { + let mut queue = self.queue.lock(); + if let Some(scheduled) = queue.queue.pop_front() { + break scheduled; + } + } + rx.changed().await.unwrap(); + } + } +} +impl ScheduledBarriers { /// Mark command scheduler as blocked and abort all queued scheduled command and notify with /// specific reason. pub(super) fn abort_and_mark_blocked(&self, reason: impl Into + Copy) {