diff --git a/src/stream/src/from_proto/barrier_recv.rs b/src/stream/src/from_proto/barrier_recv.rs index ce38cb6a49e87..21bbdece8008e 100644 --- a/src/stream/src/from_proto/barrier_recv.rs +++ b/src/stream/src/from_proto/barrier_recv.rs @@ -13,7 +13,6 @@ // limitations under the License. use risingwave_pb::stream_plan::BarrierRecvNode; -use tokio::sync::mpsc::unbounded_channel; use super::*; use crate::executor::BarrierRecvExecutor; @@ -33,11 +32,10 @@ impl ExecutorBuilder for BarrierRecvExecutorBuilder { "barrier receiver should not have input" ); - let (sender, barrier_receiver) = unbounded_channel(); - params + let barrier_receiver = params .shared_context .local_barrier_manager - .register_sender(params.actor_context.id, sender); + .subscribe_barrier(params.actor_context.id); let exec = BarrierRecvExecutor::new(params.actor_context, barrier_receiver); Ok((params.info, exec).into()) diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index 6b79eb81e6c89..d3cc352150292 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -18,7 +18,6 @@ use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries}; use risingwave_storage::StateStore; -use tokio::sync::mpsc::unbounded_channel; use super::ExecutorBuilder; use crate::common::table::state_table::StateTable; @@ -36,11 +35,10 @@ impl ExecutorBuilder for NowExecutorBuilder { node: &NowNode, store: impl StateStore, ) -> StreamResult { - let (sender, barrier_receiver) = unbounded_channel(); - params + let barrier_receiver = params .shared_context .local_barrier_manager - .register_sender(params.actor_context.id, sender); + .subscribe_barrier(params.actor_context.id); let mode = if let Ok(pb_mode) = node.get_mode() { match pb_mode { diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 919cf37e70e3a..98746a672e43c 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -27,7 +27,6 @@ use risingwave_pb::plan_common::{ }; use risingwave_pb::stream_plan::SourceNode; use risingwave_storage::panic_store::PanicStateStore; -use tokio::sync::mpsc::unbounded_channel; use super::*; use crate::executor::source::{ @@ -141,11 +140,10 @@ impl ExecutorBuilder for SourceExecutorBuilder { node: &Self::Node, store: impl StateStore, ) -> StreamResult { - let (sender, barrier_receiver) = unbounded_channel(); - params + let barrier_receiver = params .shared_context .local_barrier_manager - .register_sender(params.actor_context.id, sender); + .subscribe_barrier(params.actor_context.id); let system_params = params.env.system_params_manager_ref().get_params(); if let Some(source) = &node.source_inner { diff --git a/src/stream/src/from_proto/stream_scan.rs b/src/stream/src/from_proto/stream_scan.rs index 3a3811421a455..eaa4367524554 100644 --- a/src/stream/src/from_proto/stream_scan.rs +++ b/src/stream/src/from_proto/stream_scan.rs @@ -20,7 +20,6 @@ use risingwave_common::util::value_encoding::BasicSerde; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType}; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use tokio::sync::mpsc; use super::*; use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; @@ -154,11 +153,10 @@ impl ExecutorBuilder for StreamScanExecutorBuilder { .collect_vec(); let vnodes = params.vnode_bitmap.map(Arc::new); - let (barrier_tx, barrier_rx) = mpsc::unbounded_channel(); - params + let barrier_rx = params .shared_context .local_barrier_manager - .register_sender(params.actor_context.id, barrier_tx); + .subscribe_barrier(params.actor_context.id); let upstream_table = StorageTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc); diff --git a/src/stream/src/from_proto/values.rs b/src/stream/src/from_proto/values.rs index 42fae84d8024c..10654c5f75b6a 100644 --- a/src/stream/src/from_proto/values.rs +++ b/src/stream/src/from_proto/values.rs @@ -16,7 +16,6 @@ use itertools::Itertools; use risingwave_expr::expr::build_non_strict_from_prost; use risingwave_pb::stream_plan::ValuesNode; use risingwave_storage::StateStore; -use tokio::sync::mpsc::unbounded_channel; use super::ExecutorBuilder; use crate::error::StreamResult; @@ -35,11 +34,10 @@ impl ExecutorBuilder for ValuesExecutorBuilder { node: &ValuesNode, _store: impl StateStore, ) -> StreamResult { - let (sender, barrier_receiver) = unbounded_channel(); - params + let barrier_receiver = params .shared_context .local_barrier_manager - .register_sender(params.actor_context.id, sender); + .subscribe_barrier(params.actor_context.id); let progress = params .local_barrier_manager .register_create_mview_progress(params.actor_context.id); diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 99c6bb2f29730..b6cf8c525a5ed 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -893,11 +893,13 @@ impl LocalBarrierManager { rx } - pub fn register_sender(&self, actor_id: ActorId, tx: UnboundedSender) { + pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver { + let (tx, rx) = mpsc::unbounded_channel(); self.send_event(LocalBarrierEvent::RegisterBarrierSender { actor_id, barrier_sender: tx, - }) + }); + rx } } diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index 0fb71e5240431..d6a8256aebb61 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -31,11 +31,10 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { let manager = &test_env.shared_context.local_barrier_manager; let register_sender = |actor_id: u32| { - let (barrier_tx, barrier_rx) = unbounded_channel(); - test_env + let barrier_rx = test_env .shared_context .local_barrier_manager - .register_sender(actor_id, barrier_tx); + .subscribe_barrier(actor_id); (actor_id, barrier_rx) }; @@ -91,11 +90,10 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { let manager = &test_env.shared_context.local_barrier_manager; let register_sender = |actor_id: u32| { - let (barrier_tx, barrier_rx) = unbounded_channel(); - test_env + let barrier_rx = test_env .shared_context .local_barrier_manager - .register_sender(actor_id, barrier_tx); + .subscribe_barrier(actor_id); (actor_id, barrier_rx) }; @@ -170,11 +168,10 @@ async fn test_late_register_barrier_sender() -> StreamResult<()> { let manager = &test_env.shared_context.local_barrier_manager; let register_sender = |actor_id: u32| { - let (barrier_tx, barrier_rx) = unbounded_channel(); - test_env + let barrier_rx = test_env .shared_context .local_barrier_manager - .register_sender(actor_id, barrier_tx); + .subscribe_barrier(actor_id); (actor_id, barrier_rx) };