diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 33899856a57b..f36a3c3ec0f0 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -134,7 +134,7 @@ impl ScaleService for ScaleServiceImpl { &self, request: Request, ) -> Result, Status> { - self.barrier_manager.check_status_running().await?; + self.barrier_manager.check_status_running()?; let RescheduleRequest { reschedules, @@ -228,7 +228,7 @@ impl ScaleService for ScaleServiceImpl { &self, request: Request, ) -> Result, Status> { - self.barrier_manager.check_status_running().await?; + self.barrier_manager.check_status_running()?; let req = request.into_inner(); diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 3bad3cbd15a2..b13dc19ef61c 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -19,6 +19,7 @@ use std::mem::take; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwap; use fail::fail_point; use itertools::Itertools; use prometheus::HistogramTimer; @@ -137,7 +138,7 @@ struct Scheduled { #[derive(Clone)] pub struct GlobalBarrierManagerContext { - status: Arc>, + status: Arc>, tracker: Arc>, @@ -407,7 +408,7 @@ impl GlobalBarrierManager { let tracker = CreateMviewProgressTracker::new(); let context = GlobalBarrierManagerContext { - status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)), + status: Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))), metadata_manager, hummock_manager, source_manager, @@ -482,6 +483,7 @@ impl GlobalBarrierManager { let interval = Duration::from_millis( self.env.system_params_reader().await.barrier_interval_ms() as u64, ); + self.scheduled_barriers.set_min_interval(interval); tracing::info!( "Starting barrier manager with: interval={:?}, enable_recovery={}, in_flight_barrier_nums={}", interval, @@ -519,8 +521,7 @@ impl GlobalBarrierManager { // Even if there's no actor to recover, we still go through the recovery process to // inject the first `Initial` barrier. self.context - .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)) - .await; + .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)); let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0); let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); @@ -531,10 +532,8 @@ impl GlobalBarrierManager { .await; } - self.context.set_status(BarrierManagerStatus::Running).await; + self.context.set_status(BarrierManagerStatus::Running); - let mut min_interval = tokio::time::interval(interval); - min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel(); self.env @@ -610,11 +609,7 @@ impl GlobalBarrierManager { let notification = notification.unwrap(); // Handle barrier interval and checkpoint frequency changes if let LocalNotification::SystemParamsChange(p) = ¬ification { - let new_interval = Duration::from_millis(p.barrier_interval_ms() as u64); - if new_interval != min_interval.period() { - min_interval = tokio::time::interval(new_interval); - min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - } + self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64)); self.scheduled_barriers .set_checkpoint_frequency(p.checkpoint_frequency() as usize) } @@ -626,15 +621,11 @@ impl GlobalBarrierManager { ) .await; } - - // There's barrier scheduled. - _ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { - min_interval.reset(); // Reset the interval as we have a new barrier. - self.handle_new_barrier().await; - } - // Minimum interval reached. - _ = min_interval.tick(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { - self.handle_new_barrier().await; + scheduled = self.scheduled_barriers.next_barrier(), + if self + .checkpoint_control + .can_inject_barrier(self.in_flight_barrier_nums) => { + self.handle_new_barrier(scheduled); } } self.checkpoint_control.update_barrier_nums_metrics(); @@ -642,18 +633,14 @@ impl GlobalBarrierManager { } /// Handle the new barrier from the scheduled queue and inject it. - async fn handle_new_barrier(&mut self) { - assert!(self - .checkpoint_control - .can_inject_barrier(self.in_flight_barrier_nums)); - + fn handle_new_barrier(&mut self, scheduled: Scheduled) { let Scheduled { command, mut notifiers, send_latency_timer, checkpoint, span, - } = self.scheduled_barriers.pop_or_default().await; + } = scheduled; let info = self.state.apply_command(&command); @@ -676,15 +663,12 @@ impl GlobalBarrierManager { command, kind, self.context.clone(), - span.clone(), + span, )); send_latency_timer.observe_duration(); - self.rpc_manager - .inject_barrier(command_ctx.clone()) - .instrument(span) - .await; + self.rpc_manager.inject_barrier(command_ctx.clone()); // Notify about the injection. let prev_paused_reason = self.state.paused_reason(); @@ -696,7 +680,7 @@ impl GlobalBarrierManager { prev_paused_reason, curr_paused_reason, }; - notifiers.iter_mut().for_each(|n| n.notify_injected(info)); + notifiers.iter_mut().for_each(|n| n.notify_started(info)); // Update the paused state after the barrier is injected. self.state.set_paused_reason(curr_paused_reason); @@ -775,8 +759,7 @@ impl GlobalBarrierManager { self.context .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover( err.clone(), - ))) - .await; + ))); let latest_snapshot = self.context.hummock_manager.latest_snapshot(); let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch let span = tracing::info_span!( @@ -788,7 +771,7 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. self.recovery(prev_epoch, None).instrument(span).await; - self.context.set_status(BarrierManagerStatus::Running).await; + self.context.set_status(BarrierManagerStatus::Running); } else { panic!("failed to execute barrier: {}", err.as_report()); } @@ -940,9 +923,9 @@ impl GlobalBarrierManager { impl GlobalBarrierManagerContext { /// Check the status of barrier manager, return error if it is not `Running`. - pub async fn check_status_running(&self) -> MetaResult<()> { - let status = self.status.lock().await; - match &*status { + pub fn check_status_running(&self) -> MetaResult<()> { + let status = self.status.load(); + match &**status { BarrierManagerStatus::Starting | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => { bail!("The cluster is bootstrapping") @@ -955,9 +938,8 @@ impl GlobalBarrierManagerContext { } /// Set barrier manager status. - async fn set_status(&self, new_status: BarrierManagerStatus) { - let mut status = self.status.lock().await; - *status = new_status; + fn set_status(&self, new_status: BarrierManagerStatus) { + self.status.store(Arc::new(new_status)); } /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. diff --git a/src/meta/src/barrier/notifier.rs b/src/meta/src/barrier/notifier.rs index 1675544d7a41..e142e9be514c 100644 --- a/src/meta/src/barrier/notifier.rs +++ b/src/meta/src/barrier/notifier.rs @@ -31,8 +31,8 @@ pub struct BarrierInfo { /// Used for notifying the status of a scheduled command/barrier. #[derive(Debug, Default)] pub(crate) struct Notifier { - /// Get notified when scheduled barrier is injected to compute nodes. - pub injected: Option>, + /// Get notified when scheduled barrier has started to be handled. + pub started: Option>, /// Get notified when scheduled barrier is collected or failed. pub collected: Option>>, @@ -43,8 +43,8 @@ pub(crate) struct Notifier { impl Notifier { /// Notify when we have injected a barrier to compute nodes. - pub fn notify_injected(&mut self, info: BarrierInfo) { - if let Some(tx) = self.injected.take() { + pub fn notify_started(&mut self, info: BarrierInfo) { + if let Some(tx) = self.started.take() { tx.send(info).ok(); } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 858fa937044d..3f7f3911e730 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -301,10 +301,7 @@ impl GlobalBarrierManagerContext { impl GlobalBarrierManager { /// Pre buffered drop and cancel command, return true if any. async fn pre_apply_drop_cancel(&self) -> MetaResult { - let (dropped_actors, cancelled) = self - .scheduled_barriers - .pre_apply_drop_cancel_scheduled() - .await; + let (dropped_actors, cancelled) = self.scheduled_barriers.pre_apply_drop_cancel_scheduled(); let applied = !dropped_actors.is_empty() || !cancelled.is_empty(); if !cancelled.is_empty() { match &self.context.metadata_manager { @@ -335,8 +332,7 @@ impl GlobalBarrierManager { pub async fn recovery(&mut self, prev_epoch: TracedEpoch, paused_reason: Option) { // Mark blocked and abort buffered schedules, they might be dirty already. self.scheduled_barriers - .abort_and_mark_blocked("cluster is under recovering") - .await; + .abort_and_mark_blocked("cluster is under recovering"); tracing::info!("recovery start!"); self.context @@ -465,9 +461,13 @@ impl GlobalBarrierManager { command_ctx.wait_epoch_commit(mce).await?; } }; - let await_barrier_complete = - self.context.inject_barrier(command_ctx.clone()).await; - let res = match await_barrier_complete.await.result { + + let res = match self + .context + .inject_barrier(command_ctx.clone(), None, None) + .await + .result + { Ok(response) => { if let Err(err) = command_ctx.post_collect().await { warn!(error = %err.as_report(), "post_collect failed"); @@ -499,7 +499,7 @@ impl GlobalBarrierManager { .expect("Retry until recovery success."); recovery_timer.observe_duration(); - self.scheduled_barriers.mark_ready().await; + self.scheduled_barriers.mark_ready(); tracing::info!( epoch = state.in_flight_prev_epoch().value().0, diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 670ee7cf1092..877f935f2520 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -35,6 +35,7 @@ use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClient; use rw_futures_util::pending_on_none; use tokio::sync::oneshot; +use tracing::Instrument; use uuid::Uuid; use super::command::CommandContext; @@ -47,6 +48,8 @@ pub(super) struct BarrierRpcManager { /// Futures that await on the completion of barrier. injected_in_progress_barrier: FuturesUnordered, + + prev_injecting_barrier: Option>, } impl BarrierRpcManager { @@ -54,15 +57,24 @@ impl BarrierRpcManager { Self { context, injected_in_progress_barrier: FuturesUnordered::new(), + prev_injecting_barrier: None, } } pub(super) fn clear(&mut self) { self.injected_in_progress_barrier = FuturesUnordered::new(); + self.prev_injecting_barrier = None; } - pub(super) async fn inject_barrier(&mut self, command_context: Arc) { - let await_complete_future = self.context.inject_barrier(command_context).await; + pub(super) fn inject_barrier(&mut self, command_context: Arc) { + // this is to notify that the barrier has been injected so that the next + // barrier can be injected to avoid out of order barrier injection. + // TODO: can be removed when bidi-stream control in implemented. + let (inject_tx, inject_rx) = oneshot::channel(); + let prev_inject_rx = self.prev_injecting_barrier.replace(inject_rx); + let await_complete_future = + self.context + .inject_barrier(command_context, Some(inject_tx), prev_inject_rx); self.injected_in_progress_barrier .push(await_complete_future); } @@ -76,35 +88,49 @@ pub(super) type BarrierCompletionFuture = impl Future, + inject_tx: Option>, + prev_inject_rx: Option>, ) -> BarrierCompletionFuture { let (tx, rx) = oneshot::channel(); let prev_epoch = command_context.prev_epoch.value().0; - let result = self - .stream_rpc_manager - .inject_barrier(command_context.clone()) - .await; - match result { - Ok(node_need_collect) => { - // todo: the collect handler should be abort when recovery. - tokio::spawn({ - let stream_rpc_manager = self.stream_rpc_manager.clone(); - async move { - stream_rpc_manager - .collect_barrier(node_need_collect, command_context, tx) - .await - } - }); + let stream_rpc_manager = self.stream_rpc_manager.clone(); + // todo: the collect handler should be abort when recovery. + let _join_handle = tokio::spawn(async move { + let span = command_context.span.clone(); + if let Some(prev_inject_rx) = prev_inject_rx { + if prev_inject_rx.await.is_err() { + let _ = tx.send(BarrierCompletion { + prev_epoch, + result: Err(anyhow!("prev barrier failed to be injected").into()), + }); + return; + } } - Err(e) => { - let _ = tx.send(BarrierCompletion { - prev_epoch, - result: Err(e), - }); + let result = stream_rpc_manager + .inject_barrier(command_context.clone()) + .instrument(span.clone()) + .await; + match result { + Ok(node_need_collect) => { + if let Some(inject_tx) = inject_tx { + let _ = inject_tx.send(()); + } + stream_rpc_manager + .collect_barrier(node_need_collect, command_context, tx) + .instrument(span.clone()) + .await; + } + Err(e) => { + let _ = tx.send(BarrierCompletion { + prev_epoch, + result: Err(e), + }); + } } - } + }); rx.map(move |result| match result { Ok(completion) => completion, Err(_e) => BarrierCompletion { diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 56152c18baa7..c481b756b182 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -14,16 +14,18 @@ use std::collections::{HashSet, VecDeque}; use std::iter::once; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::{anyhow, Context}; use assert_matches::assert_matches; +use parking_lot::Mutex; use risingwave_common::catalog::TableId; use risingwave_pb::hummock::HummockSnapshot; use risingwave_pb::meta::PausedReason; -use tokio::sync::{oneshot, watch, RwLock}; +use tokio::select; +use tokio::sync::{oneshot, watch}; +use tokio::time::Interval; use super::notifier::{BarrierInfo, Notifier}; use super::{Command, Scheduled}; @@ -37,19 +39,11 @@ use crate::{MetaError, MetaResult}; /// We manually implement one here instead of using channels since we may need to update the front /// of the queue to add some notifiers for instant flushes. struct Inner { - queue: RwLock, + queue: Mutex, /// When `queue` is not empty anymore, all subscribers of this watcher will be notified. changed_tx: watch::Sender<()>, - /// The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true) - num_uncheckpointed_barrier: AtomicUsize, - - /// Force checkpoint in next barrier. - force_checkpoint: AtomicBool, - - checkpoint_frequency: AtomicUsize, - /// Used for recording send latency of each barrier. metrics: Arc, } @@ -62,7 +56,7 @@ enum QueueStatus { Blocked(String), } -struct ScheduledQueue { +pub(super) struct ScheduledQueue { queue: VecDeque, status: QueueStatus, } @@ -154,11 +148,8 @@ impl BarrierScheduler { checkpoint_frequency, ); let inner = Arc::new(Inner { - queue: RwLock::new(ScheduledQueue::new()), + queue: Mutex::new(ScheduledQueue::new()), changed_tx: watch::channel(()).0, - num_uncheckpointed_barrier: AtomicUsize::new(0), - checkpoint_frequency: AtomicUsize::new(checkpoint_frequency), - force_checkpoint: AtomicBool::new(false), metrics, }); @@ -167,13 +158,19 @@ impl BarrierScheduler { inner: inner.clone(), hummock_manager, }, - ScheduledBarriers { inner }, + ScheduledBarriers { + num_uncheckpointed_barrier: 0, + force_checkpoint: false, + checkpoint_frequency, + inner, + min_interval: None, + }, ) } /// Push a scheduled barrier into the queue. - async fn push(&self, scheduleds: impl IntoIterator) -> MetaResult<()> { - let mut queue = self.inner.queue.write().await; + fn push(&self, scheduleds: impl IntoIterator) -> MetaResult<()> { + let mut queue = self.inner.queue.lock(); for scheduled in scheduleds { queue.push_back(scheduled)?; if queue.len() == 1 { @@ -184,8 +181,8 @@ impl BarrierScheduler { } /// Try to cancel scheduled cmd for create streaming job, return true if cancelled. - pub async fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool { - let queue = &mut self.inner.queue.write().await; + pub fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool { + let queue = &mut self.inner.queue.lock(); if let Some(idx) = queue.queue.iter().position(|scheduled| { if let Command::CreateStreamingJob { table_fragments, .. @@ -207,12 +204,12 @@ impl BarrierScheduler { /// Attach `new_notifiers` to the very first scheduled barrier. If there's no one scheduled, a /// default barrier will be created. If `new_checkpoint` is true, the barrier will become a /// checkpoint. - async fn attach_notifiers( + fn attach_notifiers( &self, new_notifiers: Vec, new_checkpoint: bool, ) -> MetaResult<()> { - let mut queue = self.inner.queue.write().await; + let mut queue = self.inner.queue.lock(); match queue.queue.front_mut() { Some(Scheduled { notifiers, @@ -243,7 +240,7 @@ impl BarrierScheduler { collected: Some(tx), ..Default::default() }; - self.attach_notifiers(vec![notifier], checkpoint).await?; + self.attach_notifiers(vec![notifier], checkpoint)?; rx.await.unwrap() } @@ -258,23 +255,23 @@ impl BarrierScheduler { let mut scheduleds = Vec::with_capacity(commands.len()); for command in commands { - let (injected_tx, injected_rx) = oneshot::channel(); + let (started_tx, started_rx) = oneshot::channel(); let (collect_tx, collect_rx) = oneshot::channel(); let (finish_tx, finish_rx) = oneshot::channel(); - contexts.push((injected_rx, collect_rx, finish_rx)); + contexts.push((started_rx, collect_rx, finish_rx)); scheduleds.push(self.inner.new_scheduled( command.need_checkpoint(), command, once(Notifier { - injected: Some(injected_tx), + started: Some(started_tx), collected: Some(collect_tx), finished: Some(finish_tx), }), )); } - self.push(scheduleds).await?; + self.push(scheduleds)?; let mut infos = Vec::with_capacity(contexts.len()); @@ -340,21 +337,42 @@ impl BarrierScheduler { /// The receiver side of the barrier scheduling queue. /// Held by the [`super::GlobalBarrierManager`] to execute these commands. pub struct ScheduledBarriers { + min_interval: Option, + + /// Force checkpoint in next barrier. + force_checkpoint: bool, + + /// The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true) + num_uncheckpointed_barrier: usize, + checkpoint_frequency: usize, inner: Arc, } impl ScheduledBarriers { - /// Pop a scheduled barrier from the queue, or a default checkpoint barrier if not exists. - pub(super) async fn pop_or_default(&self) -> Scheduled { - let mut queue = self.inner.queue.write().await; + pub(super) fn set_min_interval(&mut self, min_interval: Duration) { + let set_new_interval = match &self.min_interval { + None => true, + Some(prev_min_interval) => min_interval != prev_min_interval.period(), + }; + if set_new_interval { + let mut min_interval = tokio::time::interval(min_interval); + min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + self.min_interval = Some(min_interval); + } + } + + pub(super) async fn next_barrier(&mut self) -> Scheduled { let checkpoint = self.try_get_checkpoint(); - let scheduled = match queue.queue.pop_front() { - Some(mut scheduled) => { + let scheduled = select! { + biased; + mut scheduled = self.inner.next_scheduled() => { + if let Some(min_interval) = &mut self.min_interval { + min_interval.reset(); + } scheduled.checkpoint = scheduled.checkpoint || checkpoint; scheduled - } - None => { - // If no command scheduled, create a periodic barrier by default. + }, + _ = self.min_interval.as_mut().expect("should have set min interval").tick() => { self.inner .new_scheduled(checkpoint, Command::barrier(), std::iter::empty()) } @@ -362,23 +380,28 @@ impl ScheduledBarriers { self.update_num_uncheckpointed_barrier(scheduled.checkpoint); scheduled } +} - /// Wait for at least one scheduled barrier in the queue. - pub(super) async fn wait_one(&self) { - let queue = self.inner.queue.read().await; - if queue.len() > 0 { - return; +impl Inner { + async fn next_scheduled(&self) -> Scheduled { + loop { + let mut rx = self.changed_tx.subscribe(); + { + let mut queue = self.queue.lock(); + if let Some(scheduled) = queue.queue.pop_front() { + break scheduled; + } + } + rx.changed().await.unwrap(); } - let mut rx = self.inner.changed_tx.subscribe(); - drop(queue); - - rx.changed().await.unwrap(); } +} +impl ScheduledBarriers { /// Mark command scheduler as blocked and abort all queued scheduled command and notify with /// specific reason. - pub(super) async fn abort_and_mark_blocked(&self, reason: impl Into + Copy) { - let mut queue = self.inner.queue.write().await; + pub(super) fn abort_and_mark_blocked(&self, reason: impl Into + Copy) { + let mut queue = self.inner.queue.lock(); queue.mark_blocked(reason.into()); while let Some(Scheduled { notifiers, .. }) = queue.queue.pop_front() { notifiers @@ -388,15 +411,15 @@ impl ScheduledBarriers { } /// Mark command scheduler as ready to accept new command. - pub(super) async fn mark_ready(&self) { - let mut queue = self.inner.queue.write().await; + pub(super) fn mark_ready(&self) { + let mut queue = self.inner.queue.lock(); queue.mark_ready(); } /// Try to pre apply drop and cancel scheduled command and return them if any. /// It should only be called in recovery. - pub(super) async fn pre_apply_drop_cancel_scheduled(&self) -> (Vec, HashSet) { - let mut queue = self.inner.queue.write().await; + pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> (Vec, HashSet) { + let mut queue = self.inner.queue.lock(); assert_matches!(queue.status, QueueStatus::Blocked(_)); let (mut drop_table_ids, mut cancel_table_ids) = (vec![], HashSet::new()); @@ -426,37 +449,26 @@ impl ScheduledBarriers { /// Whether the barrier(checkpoint = true) should be injected. fn try_get_checkpoint(&self) -> bool { - self.inner - .num_uncheckpointed_barrier - .load(Ordering::Relaxed) - + 1 - >= self.inner.checkpoint_frequency.load(Ordering::Relaxed) - || self.inner.force_checkpoint.load(Ordering::Relaxed) + self.num_uncheckpointed_barrier + 1 >= self.checkpoint_frequency || self.force_checkpoint } /// Make the `checkpoint` of the next barrier must be true - pub fn force_checkpoint_in_next_barrier(&self) { - self.inner.force_checkpoint.store(true, Ordering::Relaxed) + pub fn force_checkpoint_in_next_barrier(&mut self) { + self.force_checkpoint = true; } /// Update the `checkpoint_frequency` - pub fn set_checkpoint_frequency(&self, frequency: usize) { - self.inner - .checkpoint_frequency - .store(frequency, Ordering::Relaxed); + pub fn set_checkpoint_frequency(&mut self, frequency: usize) { + self.checkpoint_frequency = frequency; } /// Update the `num_uncheckpointed_barrier` - fn update_num_uncheckpointed_barrier(&self, checkpoint: bool) { + fn update_num_uncheckpointed_barrier(&mut self, checkpoint: bool) { if checkpoint { - self.inner - .num_uncheckpointed_barrier - .store(0, Ordering::Relaxed); - self.inner.force_checkpoint.store(false, Ordering::Relaxed); + self.num_uncheckpointed_barrier = 0; + self.force_checkpoint = false; } else { - self.inner - .num_uncheckpointed_barrier - .fetch_add(1, Ordering::Relaxed); + self.num_uncheckpointed_barrier += 1; } } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 7de106523599..25b6ed25464a 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -269,7 +269,7 @@ impl DdlController { /// would be a huge hassle and pain if we don't spawn here. pub async fn run_command(&self, command: DdlCommand) -> MetaResult { if !command.allow_in_recovery() { - self.barrier_manager.check_status_running().await?; + self.barrier_manager.check_status_running()?; } let ctrl = self.clone(); let fut = async move { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index d7388f50da09..e0f4f8cb82b5 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -284,11 +284,7 @@ impl GlobalStreamManager { .await { // try to cancel buffered creating command. - if self - .barrier_scheduler - .try_cancel_scheduled_create(table_id) - .await - { + if self.barrier_scheduler.try_cancel_scheduled_create(table_id) { tracing::debug!( "cancelling streaming job {table_id} in buffer queue." );