diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index e611d0b0d5884..525364b60dc1c 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -164,7 +164,8 @@ impl StreamService for StreamServiceImpl { } self.mgr - .send_barrier(&barrier, req.actor_ids_to_send, req.actor_ids_to_collect)?; + .send_barrier(&barrier, req.actor_ids_to_send, req.actor_ids_to_collect) + .await?; Ok(Response::new(InjectBarrierResponse { request_id: req.request_id, diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 5129f04d5e8ce..dc07c6986c8e0 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -33,6 +33,7 @@ use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan; +use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::StreamNode; use risingwave_storage::monitor::HummockTraceFutureExt; @@ -219,7 +220,7 @@ impl LocalStreamManager { } /// Broadcast a barrier to all senders. Save a receiver in barrier manager - pub fn send_barrier( + pub async fn send_barrier( &self, barrier: &Barrier, actor_ids_to_send: impl IntoIterator, @@ -229,6 +230,11 @@ impl LocalStreamManager { .streaming_metrics .barrier_inflight_latency .start_timer(); + if barrier.kind == BarrierKind::Initial { + let core = self.core.lock().await; + core.get_watermark_epoch() + .store(barrier.epoch.curr, std::sync::atomic::Ordering::SeqCst); + } let mut barrier_manager = self.context.lock_barrier_manager(); barrier_manager.send_barrier( barrier,