diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs
index 33270fc2204f9..885198172c6a5 100644
--- a/src/meta/service/src/scale_service.rs
+++ b/src/meta/service/src/scale_service.rs
@@ -141,7 +141,7 @@ impl ScaleService for ScaleServiceImpl {
         &self,
         request: Request<RescheduleRequest>,
     ) -> Result<Response<RescheduleResponse>, Status> {
-        self.barrier_manager.check_status_running().await?;
+        self.barrier_manager.check_status_running()?;
 
         let RescheduleRequest {
             reschedules,
@@ -235,7 +235,7 @@ impl ScaleService for ScaleServiceImpl {
         &self,
         request: Request<GetReschedulePlanRequest>,
     ) -> Result<Response<GetReschedulePlanResponse>, Status> {
-        self.barrier_manager.check_status_running().await?;
+        self.barrier_manager.check_status_running()?;
 
         let req = request.into_inner();
 
diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs
index 9de6df91fed39..d4641f293d785 100644
--- a/src/meta/src/barrier/mod.rs
+++ b/src/meta/src/barrier/mod.rs
@@ -19,6 +19,7 @@ use std::mem::take;
 use std::sync::Arc;
 use std::time::Duration;
 
+use arc_swap::ArcSwap;
 use fail::fail_point;
 use itertools::Itertools;
 use prometheus::HistogramTimer;
@@ -32,6 +33,7 @@ use risingwave_hummock_sdk::table_watermark::{
 };
 use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId};
 use risingwave_pb::catalog::table::TableType;
+use risingwave_pb::common::WorkerNode;
 use risingwave_pb::ddl_service::DdlProgress;
 use risingwave_pb::meta::subscribe_response::{Info, Operation};
 use risingwave_pb::meta::PausedReason;
@@ -134,7 +136,7 @@ struct Scheduled {
 
 #[derive(Clone)]
 pub struct GlobalBarrierManagerContext {
-    status: Arc<Mutex<BarrierManagerStatus>>,
+    status: Arc<ArcSwap<BarrierManagerStatus>>,
 
     tracker: Arc<Mutex<CreateMviewProgressTracker>>,
 
@@ -406,7 +408,7 @@ impl GlobalBarrierManager {
         ));
 
         let context = GlobalBarrierManagerContext {
-            status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)),
+            status: Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))),
             metadata_manager,
             hummock_manager,
             source_manager,
@@ -480,6 +482,7 @@ impl GlobalBarrierManager {
         let interval = Duration::from_millis(
             self.env.system_params_reader().await.barrier_interval_ms() as u64,
         );
+        self.scheduled_barriers.set_min_interval(interval);
         tracing::info!(
             "Starting barrier manager with: interval={:?}, enable_recovery={}, in_flight_barrier_nums={}",
             interval,
@@ -517,8 +520,7 @@ impl GlobalBarrierManager {
             // Even if there's no actor to recover, we still go through the recovery process to
             // inject the first `Initial` barrier.
             self.context
-                .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap))
-                .await;
+                .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap));
             let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0);
 
             let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
@@ -530,10 +532,8 @@ impl GlobalBarrierManager {
                 .await
         };
 
-        self.context.set_status(BarrierManagerStatus::Running).await;
+        self.context.set_status(BarrierManagerStatus::Running);
 
-        let mut min_interval = tokio::time::interval(interval);
-        min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
         let (local_notification_tx, mut local_notification_rx) =
             tokio::sync::mpsc::unbounded_channel();
         self.env
@@ -556,11 +556,7 @@ impl GlobalBarrierManager {
                     let notification = notification.unwrap();
                     // Handle barrier interval and checkpoint frequency changes
                     if let LocalNotification::SystemParamsChange(p) = &notification {
-                        let new_interval = Duration::from_millis(p.barrier_interval_ms() as u64);
-                        if new_interval != min_interval.period() {
-                            min_interval = tokio::time::interval(new_interval);
-                            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
-                        }
+                        self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
                         self.scheduled_barriers
                             .set_checkpoint_frequency(p.checkpoint_frequency() as usize)
                     }
@@ -572,15 +568,17 @@ impl GlobalBarrierManager {
                     )
                     .await;
                 }
-
-                // There's barrier scheduled.
-                _ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
-                    min_interval.reset(); // Reset the interval as we have a new barrier.
-                    self.handle_new_barrier().await;
-                }
-                // Minimum interval reached.
-                _ = min_interval.tick(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
-                    self.handle_new_barrier().await;
+                scheduled = self.scheduled_barriers.next_barrier(),
+                    if self
+                        .checkpoint_control
+                        .can_inject_barrier(self.in_flight_barrier_nums) => {
+                    let all_nodes = self
+                        .context
+                        .metadata_manager
+                        .list_active_streaming_compute_nodes()
+                        .await
+                        .unwrap();
+                    self.handle_new_barrier(scheduled, all_nodes);
                 }
             }
             self.checkpoint_control.update_barrier_nums_metrics();
@@ -588,26 +586,20 @@ impl GlobalBarrierManager {
     }
 
     /// Handle the new barrier from the scheduled queue and inject it.
-    async fn handle_new_barrier(&mut self) {
-        assert!(self
-            .checkpoint_control
-            .can_inject_barrier(self.in_flight_barrier_nums));
-
+    fn handle_new_barrier(
+        &mut self,
+        scheduled: Scheduled,
+        nodes: impl IntoIterator<Item = WorkerNode>,
+    ) {
         let Scheduled {
             command,
             mut notifiers,
             send_latency_timer,
             checkpoint,
             span,
-        } = self.scheduled_barriers.pop_or_default().await;
+        } = scheduled;
 
-        let all_nodes = self
-            .context
-            .metadata_manager
-            .list_active_streaming_compute_nodes()
-            .await
-            .unwrap();
-        self.state.resolve_worker_nodes(all_nodes);
+        self.state.resolve_worker_nodes(nodes);
         let info = self.state.apply_command(&command);
 
         let (prev_epoch, curr_epoch) = self.state.next_epoch_pair();
@@ -629,15 +621,12 @@ impl GlobalBarrierManager {
             command,
             kind,
             self.context.clone(),
-            span.clone(),
+            span,
         ));
 
         send_latency_timer.observe_duration();
 
-        self.rpc_manager
-            .inject_barrier(command_ctx.clone())
-            .instrument(span)
-            .await;
+        self.rpc_manager.inject_barrier(command_ctx.clone());
 
         // Notify about the injection.
         let prev_paused_reason = self.state.paused_reason();
@@ -649,7 +638,7 @@ impl GlobalBarrierManager {
             prev_paused_reason,
             curr_paused_reason,
         };
-        notifiers.iter_mut().for_each(|n| n.notify_injected(info));
+        notifiers.iter_mut().for_each(|n| n.notify_started(info));
 
         // Update the paused state after the barrier is injected.
         self.state.set_paused_reason(curr_paused_reason);
@@ -728,8 +717,7 @@ impl GlobalBarrierManager {
             self.context
                 .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover(
                     err.clone(),
-                )))
-                .await;
+                )));
             let latest_snapshot = self.context.hummock_manager.latest_snapshot();
             let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch
             let span = tracing::info_span!(
@@ -745,7 +733,7 @@ impl GlobalBarrierManager {
                 .recovery(prev_epoch, None, &self.scheduled_barriers)
                 .instrument(span)
                 .await;
-            self.context.set_status(BarrierManagerStatus::Running).await;
+            self.context.set_status(BarrierManagerStatus::Running);
         } else {
             panic!("failed to execute barrier: {}", err.as_report());
         }
@@ -897,9 +885,9 @@ impl GlobalBarrierManager {
 
 impl GlobalBarrierManagerContext {
     /// Check the status of barrier manager, return error if it is not `Running`.
-    pub async fn check_status_running(&self) -> MetaResult<()> {
-        let status = self.status.lock().await;
-        match &*status {
+    pub fn check_status_running(&self) -> MetaResult<()> {
+        let status = self.status.load();
+        match &**status {
             BarrierManagerStatus::Starting
             | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
                 bail!("The cluster is bootstrapping")
@@ -912,9 +900,8 @@ impl GlobalBarrierManagerContext {
     }
 
     /// Set barrier manager status.
-    async fn set_status(&self, new_status: BarrierManagerStatus) {
-        let mut status = self.status.lock().await;
-        *status = new_status;
+    fn set_status(&self, new_status: BarrierManagerStatus) {
+        self.status.store(Arc::new(new_status));
     }
 
     /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
diff --git a/src/meta/src/barrier/notifier.rs b/src/meta/src/barrier/notifier.rs
index 1675544d7a410..e142e9be514cf 100644
--- a/src/meta/src/barrier/notifier.rs
+++ b/src/meta/src/barrier/notifier.rs
@@ -31,8 +31,8 @@ pub struct BarrierInfo {
 /// Used for notifying the status of a scheduled command/barrier.
 #[derive(Debug, Default)]
 pub(crate) struct Notifier {
-    /// Get notified when scheduled barrier is injected to compute nodes.
-    pub injected: Option<oneshot::Sender<BarrierInfo>>,
+    /// Get notified when scheduled barrier has started to be handled.
+    pub started: Option<oneshot::Sender<BarrierInfo>>,
 
     /// Get notified when scheduled barrier is collected or failed.
     pub collected: Option<oneshot::Sender<MetaResult<()>>>,
@@ -43,8 +43,8 @@ pub(crate) struct Notifier {
 
 impl Notifier {
     /// Notify when we have injected a barrier to compute nodes.
-    pub fn notify_injected(&mut self, info: BarrierInfo) {
-        if let Some(tx) = self.injected.take() {
+    pub fn notify_started(&mut self, info: BarrierInfo) {
+        if let Some(tx) = self.started.take() {
             tx.send(info).ok();
         }
     }
diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs
index e4b93a286a3f8..ac5fee9d5ecdf 100644
--- a/src/meta/src/barrier/recovery.rs
+++ b/src/meta/src/barrier/recovery.rs
@@ -301,8 +301,7 @@ impl GlobalBarrierManagerContext {
         &self,
         scheduled_barriers: &ScheduledBarriers,
     ) -> MetaResult<bool> {
-        let (dropped_actors, cancelled) =
-            scheduled_barriers.pre_apply_drop_cancel_scheduled().await;
+        let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled();
         let applied = !dropped_actors.is_empty() || !cancelled.is_empty();
         if !cancelled.is_empty() {
             match &self.metadata_manager {
@@ -337,9 +336,7 @@ impl GlobalBarrierManagerContext {
         scheduled_barriers: &ScheduledBarriers,
     ) -> BarrierManagerState {
         // Mark blocked and abort buffered schedules, they might be dirty already.
-        scheduled_barriers
-            .abort_and_mark_blocked("cluster is under recovering")
-            .await;
+        scheduled_barriers.abort_and_mark_blocked("cluster is under recovering");
 
         tracing::info!("recovery start!");
         self.clean_dirty_streaming_jobs()
@@ -437,8 +434,12 @@ impl GlobalBarrierManagerContext {
                             command_ctx.wait_epoch_commit(mce).await?;
                         }
                     };
-                    let await_barrier_complete = self.inject_barrier(command_ctx.clone()).await;
-                    let res = match await_barrier_complete.await.result {
+
+                    let res = match self
+                        .inject_barrier(command_ctx.clone(), None, None)
+                        .await
+                        .result
+                    {
                         Ok(response) => {
                             if let Err(err) = command_ctx.post_collect().await {
                                 warn!(error = %err.as_report(), "post_collect failed");
@@ -467,7 +468,7 @@ impl GlobalBarrierManagerContext {
         .expect("Retry until recovery success.");
 
         recovery_timer.observe_duration();
-        scheduled_barriers.mark_ready().await;
+        scheduled_barriers.mark_ready();
 
         tracing::info!(
             epoch = state.in_flight_prev_epoch().value().0,
diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs
index 670ee7cf10929..877f935f25207 100644
--- a/src/meta/src/barrier/rpc.rs
+++ b/src/meta/src/barrier/rpc.rs
@@ -35,6 +35,7 @@ use risingwave_rpc_client::error::RpcError;
 use risingwave_rpc_client::StreamClient;
 use rw_futures_util::pending_on_none;
 use tokio::sync::oneshot;
+use tracing::Instrument;
 use uuid::Uuid;
 
 use super::command::CommandContext;
@@ -47,6 +48,8 @@ pub(super) struct BarrierRpcManager {
 
     /// Futures that await on the completion of barrier.
     injected_in_progress_barrier: FuturesUnordered<BarrierCompletionFuture>,
+
+    prev_injecting_barrier: Option<oneshot::Receiver<()>>,
 }
 
 impl BarrierRpcManager {
@@ -54,15 +57,24 @@ impl BarrierRpcManager {
         Self {
             context,
             injected_in_progress_barrier: FuturesUnordered::new(),
+            prev_injecting_barrier: None,
         }
     }
 
     pub(super) fn clear(&mut self) {
         self.injected_in_progress_barrier = FuturesUnordered::new();
+        self.prev_injecting_barrier = None;
     }
 
-    pub(super) async fn inject_barrier(&mut self, command_context: Arc<CommandContext>) {
-        let await_complete_future = self.context.inject_barrier(command_context).await;
+    pub(super) fn inject_barrier(&mut self, command_context: Arc<CommandContext>) {
+        // this is to notify that the barrier has been injected so that the next
+        // barrier can be injected to avoid out of order barrier injection.
+        // TODO: can be removed when bidi-stream control in implemented.
+        let (inject_tx, inject_rx) = oneshot::channel();
+        let prev_inject_rx = self.prev_injecting_barrier.replace(inject_rx);
+        let await_complete_future =
+            self.context
+                .inject_barrier(command_context, Some(inject_tx), prev_inject_rx);
         self.injected_in_progress_barrier
             .push(await_complete_future);
     }
@@ -76,35 +88,49 @@ pub(super) type BarrierCompletionFuture = impl Future<Output = BarrierCompletion
 
 impl GlobalBarrierManagerContext {
     /// Inject a barrier to all CNs and spawn a task to collect it
-    pub(super) async fn inject_barrier(
+    pub(super) fn inject_barrier(
         &self,
         command_context: Arc<CommandContext>,
+        inject_tx: Option<oneshot::Sender<()>>,
+        prev_inject_rx: Option<oneshot::Receiver<()>>,
     ) -> BarrierCompletionFuture {
         let (tx, rx) = oneshot::channel();
         let prev_epoch = command_context.prev_epoch.value().0;
-        let result = self
-            .stream_rpc_manager
-            .inject_barrier(command_context.clone())
-            .await;
-        match result {
-            Ok(node_need_collect) => {
-                // todo: the collect handler should be abort when recovery.
-                tokio::spawn({
-                    let stream_rpc_manager = self.stream_rpc_manager.clone();
-                    async move {
-                        stream_rpc_manager
-                            .collect_barrier(node_need_collect, command_context, tx)
-                            .await
-                    }
-                });
+        let stream_rpc_manager = self.stream_rpc_manager.clone();
+        // todo: the collect handler should be abort when recovery.
+        let _join_handle = tokio::spawn(async move {
+            let span = command_context.span.clone();
+            if let Some(prev_inject_rx) = prev_inject_rx {
+                if prev_inject_rx.await.is_err() {
+                    let _ = tx.send(BarrierCompletion {
+                        prev_epoch,
+                        result: Err(anyhow!("prev barrier failed to be injected").into()),
+                    });
+                    return;
+                }
             }
-            Err(e) => {
-                let _ = tx.send(BarrierCompletion {
-                    prev_epoch,
-                    result: Err(e),
-                });
+            let result = stream_rpc_manager
+                .inject_barrier(command_context.clone())
+                .instrument(span.clone())
+                .await;
+            match result {
+                Ok(node_need_collect) => {
+                    if let Some(inject_tx) = inject_tx {
+                        let _ = inject_tx.send(());
+                    }
+                    stream_rpc_manager
+                        .collect_barrier(node_need_collect, command_context, tx)
+                        .instrument(span.clone())
+                        .await;
+                }
+                Err(e) => {
+                    let _ = tx.send(BarrierCompletion {
+                        prev_epoch,
+                        result: Err(e),
+                    });
+                }
             }
-        }
+        });
         rx.map(move |result| match result {
             Ok(completion) => completion,
             Err(_e) => BarrierCompletion {
diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs
index 56152c18baa70..f599456f94bea 100644
--- a/src/meta/src/barrier/schedule.rs
+++ b/src/meta/src/barrier/schedule.rs
@@ -13,17 +13,21 @@
 // limitations under the License.
 
 use std::collections::{HashSet, VecDeque};
+use std::future::pending;
 use std::iter::once;
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::time::Instant;
+use std::time::{Duration, Instant};
 
 use anyhow::{anyhow, Context};
 use assert_matches::assert_matches;
+use parking_lot::Mutex;
 use risingwave_common::catalog::TableId;
 use risingwave_pb::hummock::HummockSnapshot;
 use risingwave_pb::meta::PausedReason;
-use tokio::sync::{oneshot, watch, RwLock};
+use tokio::select;
+use tokio::sync::{oneshot, watch};
+use tokio::time::Interval;
+use tracing::warn;
 
 use super::notifier::{BarrierInfo, Notifier};
 use super::{Command, Scheduled};
@@ -37,19 +41,11 @@ use crate::{MetaError, MetaResult};
 /// We manually implement one here instead of using channels since we may need to update the front
 /// of the queue to add some notifiers for instant flushes.
 struct Inner {
-    queue: RwLock<ScheduledQueue>,
+    queue: Mutex<ScheduledQueue>,
 
     /// When `queue` is not empty anymore, all subscribers of this watcher will be notified.
     changed_tx: watch::Sender<()>,
 
-    /// The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true)
-    num_uncheckpointed_barrier: AtomicUsize,
-
-    /// Force checkpoint in next barrier.
-    force_checkpoint: AtomicBool,
-
-    checkpoint_frequency: AtomicUsize,
-
     /// Used for recording send latency of each barrier.
     metrics: Arc<MetaMetrics>,
 }
@@ -62,7 +58,7 @@ enum QueueStatus {
     Blocked(String),
 }
 
-struct ScheduledQueue {
+pub(super) struct ScheduledQueue {
     queue: VecDeque<Scheduled>,
     status: QueueStatus,
 }
@@ -154,11 +150,8 @@ impl BarrierScheduler {
             checkpoint_frequency,
         );
         let inner = Arc::new(Inner {
-            queue: RwLock::new(ScheduledQueue::new()),
+            queue: Mutex::new(ScheduledQueue::new()),
             changed_tx: watch::channel(()).0,
-            num_uncheckpointed_barrier: AtomicUsize::new(0),
-            checkpoint_frequency: AtomicUsize::new(checkpoint_frequency),
-            force_checkpoint: AtomicBool::new(false),
             metrics,
         });
 
@@ -167,13 +160,19 @@ impl BarrierScheduler {
                 inner: inner.clone(),
                 hummock_manager,
             },
-            ScheduledBarriers { inner },
+            ScheduledBarriers {
+                num_uncheckpointed_barrier: 0,
+                force_checkpoint: false,
+                checkpoint_frequency,
+                inner,
+                min_interval: None,
+            },
         )
     }
 
     /// Push a scheduled barrier into the queue.
-    async fn push(&self, scheduleds: impl IntoIterator<Item = Scheduled>) -> MetaResult<()> {
-        let mut queue = self.inner.queue.write().await;
+    fn push(&self, scheduleds: impl IntoIterator<Item = Scheduled>) -> MetaResult<()> {
+        let mut queue = self.inner.queue.lock();
         for scheduled in scheduleds {
             queue.push_back(scheduled)?;
             if queue.len() == 1 {
@@ -184,8 +183,8 @@ impl BarrierScheduler {
     }
 
     /// Try to cancel scheduled cmd for create streaming job, return true if cancelled.
-    pub async fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool {
-        let queue = &mut self.inner.queue.write().await;
+    pub fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool {
+        let queue = &mut self.inner.queue.lock();
         if let Some(idx) = queue.queue.iter().position(|scheduled| {
             if let Command::CreateStreamingJob {
                 table_fragments, ..
@@ -207,12 +206,12 @@ impl BarrierScheduler {
     /// Attach `new_notifiers` to the very first scheduled barrier. If there's no one scheduled, a
     /// default barrier will be created. If `new_checkpoint` is true, the barrier will become a
     /// checkpoint.
-    async fn attach_notifiers(
+    fn attach_notifiers(
         &self,
         new_notifiers: Vec<Notifier>,
         new_checkpoint: bool,
     ) -> MetaResult<()> {
-        let mut queue = self.inner.queue.write().await;
+        let mut queue = self.inner.queue.lock();
         match queue.queue.front_mut() {
             Some(Scheduled {
                 notifiers,
@@ -243,7 +242,7 @@ impl BarrierScheduler {
             collected: Some(tx),
             ..Default::default()
         };
-        self.attach_notifiers(vec![notifier], checkpoint).await?;
+        self.attach_notifiers(vec![notifier], checkpoint)?;
         rx.await.unwrap()
     }
 
@@ -258,23 +257,23 @@ impl BarrierScheduler {
         let mut scheduleds = Vec::with_capacity(commands.len());
 
         for command in commands {
-            let (injected_tx, injected_rx) = oneshot::channel();
+            let (started_tx, started_rx) = oneshot::channel();
             let (collect_tx, collect_rx) = oneshot::channel();
             let (finish_tx, finish_rx) = oneshot::channel();
 
-            contexts.push((injected_rx, collect_rx, finish_rx));
+            contexts.push((started_rx, collect_rx, finish_rx));
             scheduleds.push(self.inner.new_scheduled(
                 command.need_checkpoint(),
                 command,
                 once(Notifier {
-                    injected: Some(injected_tx),
+                    started: Some(started_tx),
                     collected: Some(collect_tx),
                     finished: Some(finish_tx),
                 }),
             ));
         }
 
-        self.push(scheduleds).await?;
+        self.push(scheduleds)?;
 
         let mut infos = Vec::with_capacity(contexts.len());
 
@@ -340,21 +339,62 @@ impl BarrierScheduler {
 /// The receiver side of the barrier scheduling queue.
 /// Held by the [`super::GlobalBarrierManager`] to execute these commands.
 pub struct ScheduledBarriers {
+    min_interval: Option<Interval>,
+
+    /// Force checkpoint in next barrier.
+    force_checkpoint: bool,
+
+    /// The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true)
+    num_uncheckpointed_barrier: usize,
+    checkpoint_frequency: usize,
     inner: Arc<Inner>,
 }
 
 impl ScheduledBarriers {
-    /// Pop a scheduled barrier from the queue, or a default checkpoint barrier if not exists.
-    pub(super) async fn pop_or_default(&self) -> Scheduled {
-        let mut queue = self.inner.queue.write().await;
-        let checkpoint = self.try_get_checkpoint();
-        let scheduled = match queue.queue.pop_front() {
-            Some(mut scheduled) => {
+    pub(super) fn set_min_interval(&mut self, min_interval: Duration) {
+        let set_new_interval = match &self.min_interval {
+            None => true,
+            Some(prev_min_interval) => min_interval != prev_min_interval.period(),
+        };
+        if set_new_interval {
+            let mut min_interval = tokio::time::interval(min_interval);
+            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+            self.min_interval = Some(min_interval);
+        }
+    }
+
+    pub(super) async fn next_barrier(&mut self) -> Scheduled {
+        let scheduled = select! {
+            biased;
+            mut scheduled = async {
+                loop {
+                    let mut rx = self.inner.changed_tx.subscribe();
+                    {
+                        let mut queue = self.inner.queue.lock();
+                        if let Some(scheduled) = queue.queue.pop_front() {
+                            break scheduled
+                        }
+                    }
+                    rx.changed().await.unwrap();
+                }
+            } => {
+                if let Some(min_interval) = &mut self.min_interval {
+                    min_interval.reset();
+                }
+                let checkpoint = self.try_get_checkpoint();
                 scheduled.checkpoint = scheduled.checkpoint || checkpoint;
                 scheduled
-            }
-            None => {
-                // If no command scheduled, create a periodic barrier by default.
+            },
+            _ = async {
+                match &mut self.min_interval {
+                    Some(min_interval) => min_interval.tick().await,
+                    None => {
+                        warn!("min interval not set and pending");
+                        pending::<_>().await
+                    }
+                }
+            } => {
+                let checkpoint = self.try_get_checkpoint();
                 self.inner
                     .new_scheduled(checkpoint, Command::barrier(), std::iter::empty())
             }
@@ -363,22 +403,10 @@ impl ScheduledBarriers {
         scheduled
     }
 
-    /// Wait for at least one scheduled barrier in the queue.
-    pub(super) async fn wait_one(&self) {
-        let queue = self.inner.queue.read().await;
-        if queue.len() > 0 {
-            return;
-        }
-        let mut rx = self.inner.changed_tx.subscribe();
-        drop(queue);
-
-        rx.changed().await.unwrap();
-    }
-
     /// Mark command scheduler as blocked and abort all queued scheduled command and notify with
     /// specific reason.
-    pub(super) async fn abort_and_mark_blocked(&self, reason: impl Into<String> + Copy) {
-        let mut queue = self.inner.queue.write().await;
+    pub(super) fn abort_and_mark_blocked(&self, reason: impl Into<String> + Copy) {
+        let mut queue = self.inner.queue.lock();
         queue.mark_blocked(reason.into());
         while let Some(Scheduled { notifiers, .. }) = queue.queue.pop_front() {
             notifiers
@@ -388,15 +416,15 @@ impl ScheduledBarriers {
     }
 
     /// Mark command scheduler as ready to accept new command.
-    pub(super) async fn mark_ready(&self) {
-        let mut queue = self.inner.queue.write().await;
+    pub(super) fn mark_ready(&self) {
+        let mut queue = self.inner.queue.lock();
         queue.mark_ready();
     }
 
     /// Try to pre apply drop and cancel scheduled command and return them if any.
     /// It should only be called in recovery.
-    pub(super) async fn pre_apply_drop_cancel_scheduled(&self) -> (Vec<ActorId>, HashSet<TableId>) {
-        let mut queue = self.inner.queue.write().await;
+    pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> (Vec<ActorId>, HashSet<TableId>) {
+        let mut queue = self.inner.queue.lock();
         assert_matches!(queue.status, QueueStatus::Blocked(_));
         let (mut drop_table_ids, mut cancel_table_ids) = (vec![], HashSet::new());
 
@@ -426,37 +454,26 @@ impl ScheduledBarriers {
 
     /// Whether the barrier(checkpoint = true) should be injected.
     fn try_get_checkpoint(&self) -> bool {
-        self.inner
-            .num_uncheckpointed_barrier
-            .load(Ordering::Relaxed)
-            + 1
-            >= self.inner.checkpoint_frequency.load(Ordering::Relaxed)
-            || self.inner.force_checkpoint.load(Ordering::Relaxed)
+        self.num_uncheckpointed_barrier + 1 >= self.checkpoint_frequency || self.force_checkpoint
     }
 
     /// Make the `checkpoint` of the next barrier must be true
-    pub fn force_checkpoint_in_next_barrier(&self) {
-        self.inner.force_checkpoint.store(true, Ordering::Relaxed)
+    pub fn force_checkpoint_in_next_barrier(&mut self) {
+        self.force_checkpoint = true;
     }
 
     /// Update the `checkpoint_frequency`
-    pub fn set_checkpoint_frequency(&self, frequency: usize) {
-        self.inner
-            .checkpoint_frequency
-            .store(frequency, Ordering::Relaxed);
+    pub fn set_checkpoint_frequency(&mut self, frequency: usize) {
+        self.checkpoint_frequency = frequency;
     }
 
     /// Update the `num_uncheckpointed_barrier`
-    fn update_num_uncheckpointed_barrier(&self, checkpoint: bool) {
+    fn update_num_uncheckpointed_barrier(&mut self, checkpoint: bool) {
         if checkpoint {
-            self.inner
-                .num_uncheckpointed_barrier
-                .store(0, Ordering::Relaxed);
-            self.inner.force_checkpoint.store(false, Ordering::Relaxed);
+            self.num_uncheckpointed_barrier = 0;
+            self.force_checkpoint = false;
         } else {
-            self.inner
-                .num_uncheckpointed_barrier
-                .fetch_add(1, Ordering::Relaxed);
+            self.num_uncheckpointed_barrier += 1;
         }
     }
 }
diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs
index 560f17118c58f..de1f7b0bb6d09 100644
--- a/src/meta/src/barrier/state.rs
+++ b/src/meta/src/barrier/state.rs
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use risingwave_pb::common::PbWorkerNode;
+use risingwave_pb::common::WorkerNode;
 use risingwave_pb::meta::PausedReason;
 
 use crate::barrier::info::InflightActorInfo;
@@ -70,7 +70,7 @@ impl BarrierManagerState {
     }
 
     // TODO: optimize it as incremental updates.
-    pub fn resolve_worker_nodes(&mut self, nodes: Vec<PbWorkerNode>) {
+    pub fn resolve_worker_nodes(&mut self, nodes: impl IntoIterator<Item = WorkerNode>) {
         self.inflight_actor_infos.resolve_worker_nodes(nodes);
     }
 
diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs
index 84ff4ea1de682..5c3111bd2f44e 100644
--- a/src/meta/src/rpc/ddl_controller.rs
+++ b/src/meta/src/rpc/ddl_controller.rs
@@ -269,7 +269,7 @@ impl DdlController {
     /// would be a huge hassle and pain if we don't spawn here.
     pub async fn run_command(&self, command: DdlCommand) -> MetaResult<NotificationVersion> {
         if !command.allow_in_recovery() {
-            self.barrier_manager.check_status_running().await?;
+            self.barrier_manager.check_status_running()?;
         }
         let ctrl = self.clone();
         let fut = async move {
diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs
index 5889292756ca7..517781f944d92 100644
--- a/src/meta/src/stream/stream_manager.rs
+++ b/src/meta/src/stream/stream_manager.rs
@@ -293,11 +293,7 @@ impl GlobalStreamManager {
                             .await
                         {
                             // try to cancel buffered creating command.
-                            if self
-                                .barrier_scheduler
-                                .try_cancel_scheduled_create(table_id)
-                                .await
-                            {
+                            if self.barrier_scheduler.try_cancel_scheduled_create(table_id) {
                                 tracing::debug!(
                                     "cancelling streaming job {table_id} in buffer queue."
                                 );