Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 21, 2024
1 parent 352a139 commit 7d657c5
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 31 deletions.
6 changes: 2 additions & 4 deletions src/stream/src/from_proto/barrier_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use risingwave_pb::stream_plan::BarrierRecvNode;
use tokio::sync::mpsc::unbounded_channel;

use super::*;
use crate::executor::BarrierRecvExecutor;
Expand All @@ -33,11 +32,10 @@ impl ExecutorBuilder for BarrierRecvExecutorBuilder {
"barrier receiver should not have input"
);

let (sender, barrier_receiver) = unbounded_channel();
params
let barrier_receiver = params
.shared_context
.local_barrier_manager
.register_sender(params.actor_context.id, sender);
.subscribe_barrier(params.actor_context.id);

let exec = BarrierRecvExecutor::new(params.actor_context, barrier_receiver);
Ok((params.info, exec).into())
Expand Down
6 changes: 2 additions & 4 deletions src/stream/src/from_proto/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries};
use risingwave_storage::StateStore;
use tokio::sync::mpsc::unbounded_channel;

use super::ExecutorBuilder;
use crate::common::table::state_table::StateTable;
Expand All @@ -36,11 +35,10 @@ impl ExecutorBuilder for NowExecutorBuilder {
node: &NowNode,
store: impl StateStore,
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
let barrier_receiver = params
.shared_context
.local_barrier_manager
.register_sender(params.actor_context.id, sender);
.subscribe_barrier(params.actor_context.id);

let mode = if let Ok(pb_mode) = node.get_mode() {
match pb_mode {
Expand Down
6 changes: 2 additions & 4 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_pb::plan_common::{
};
use risingwave_pb::stream_plan::SourceNode;
use risingwave_storage::panic_store::PanicStateStore;
use tokio::sync::mpsc::unbounded_channel;

use super::*;
use crate::executor::source::{
Expand Down Expand Up @@ -141,11 +140,10 @@ impl ExecutorBuilder for SourceExecutorBuilder {
node: &Self::Node,
store: impl StateStore,
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
let barrier_receiver = params
.shared_context
.local_barrier_manager
.register_sender(params.actor_context.id, sender);
.subscribe_barrier(params.actor_context.id);
let system_params = params.env.system_params_manager_ref().get_params();

if let Some(source) = &node.source_inner {
Expand Down
6 changes: 2 additions & 4 deletions src/stream/src/from_proto/stream_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType};
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use tokio::sync::mpsc;

use super::*;
use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
Expand Down Expand Up @@ -154,11 +153,10 @@ impl ExecutorBuilder for StreamScanExecutorBuilder {
.collect_vec();

let vnodes = params.vnode_bitmap.map(Arc::new);
let (barrier_tx, barrier_rx) = mpsc::unbounded_channel();
params
let barrier_rx = params
.shared_context
.local_barrier_manager
.register_sender(params.actor_context.id, barrier_tx);
.subscribe_barrier(params.actor_context.id);

let upstream_table =
StorageTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc);
Expand Down
6 changes: 2 additions & 4 deletions src/stream/src/from_proto/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use itertools::Itertools;
use risingwave_expr::expr::build_non_strict_from_prost;
use risingwave_pb::stream_plan::ValuesNode;
use risingwave_storage::StateStore;
use tokio::sync::mpsc::unbounded_channel;

use super::ExecutorBuilder;
use crate::error::StreamResult;
Expand All @@ -35,11 +34,10 @@ impl ExecutorBuilder for ValuesExecutorBuilder {
node: &ValuesNode,
_store: impl StateStore,
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
let barrier_receiver = params
.shared_context
.local_barrier_manager
.register_sender(params.actor_context.id, sender);
.subscribe_barrier(params.actor_context.id);
let progress = params
.local_barrier_manager
.register_create_mview_progress(params.actor_context.id);
Expand Down
6 changes: 4 additions & 2 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,11 +893,13 @@ impl LocalBarrierManager {
rx
}

pub fn register_sender(&self, actor_id: ActorId, tx: UnboundedSender<Barrier>) {
pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
let (tx, rx) = mpsc::unbounded_channel();
self.send_event(LocalBarrierEvent::RegisterBarrierSender {
actor_id,
barrier_sender: tx,
})
});
rx
}
}

Expand Down
15 changes: 6 additions & 9 deletions src/stream/src/task/barrier_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ async fn test_managed_barrier_collection() -> StreamResult<()> {
let manager = &test_env.shared_context.local_barrier_manager;

let register_sender = |actor_id: u32| {
let (barrier_tx, barrier_rx) = unbounded_channel();
test_env
let barrier_rx = test_env
.shared_context
.local_barrier_manager
.register_sender(actor_id, barrier_tx);
.subscribe_barrier(actor_id);
(actor_id, barrier_rx)
};

Expand Down Expand Up @@ -91,11 +90,10 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> {
let manager = &test_env.shared_context.local_barrier_manager;

let register_sender = |actor_id: u32| {
let (barrier_tx, barrier_rx) = unbounded_channel();
test_env
let barrier_rx = test_env
.shared_context
.local_barrier_manager
.register_sender(actor_id, barrier_tx);
.subscribe_barrier(actor_id);
(actor_id, barrier_rx)
};

Expand Down Expand Up @@ -170,11 +168,10 @@ async fn test_late_register_barrier_sender() -> StreamResult<()> {
let manager = &test_env.shared_context.local_barrier_manager;

let register_sender = |actor_id: u32| {
let (barrier_tx, barrier_rx) = unbounded_channel();
test_env
let barrier_rx = test_env
.shared_context
.local_barrier_manager
.register_sender(actor_id, barrier_tx);
.subscribe_barrier(actor_id);
(actor_id, barrier_rx)
};

Expand Down

0 comments on commit 7d657c5

Please sign in to comment.