Skip to content

Commit

Permalink
refactor: register actor barrier sender asynchronously (#18104)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 21, 2024
1 parent dd65156 commit da46c4d
Show file tree
Hide file tree
Showing 16 changed files with 310 additions and 358 deletions.
1 change: 0 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ message DropActorsResponse {
message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
Expand Down
18 changes: 0 additions & 18 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,24 +259,6 @@ impl InflightGraphInfo {
})
}

/// Returns actor list to send in the target worker node.
pub fn actor_ids_to_send(&self, node_id: WorkerId) -> impl Iterator<Item = ActorId> + '_ {
self.fragment_infos
.values()
.filter(|info| info.is_injectable)
.flat_map(move |info| {
info.actors
.iter()
.filter_map(move |(actor_id, actor_node_id)| {
if *actor_node_id == node_id {
Some(*actor_id)
} else {
None
}
})
})
}

pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
self.fragment_infos
.values()
Expand Down
7 changes: 0 additions & 7 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,9 @@ impl ControlStreamManager {
self.nodes
.iter_mut()
.map(|(node_id, node)| {
let actor_ids_to_send: Vec<_> =
pre_applied_graph_info.actor_ids_to_send(*node_id).collect();
let actor_ids_to_collect: Vec<_> = pre_applied_graph_info
.actor_ids_to_collect(*node_id)
.collect();
if actor_ids_to_collect.is_empty() {
// No need to send or collect barrier for this node.
assert!(actor_ids_to_send.is_empty());
}
let table_ids_to_sync = if let Some(graph_info) = applied_graph_info {
graph_info
.existing_table_ids()
Expand Down Expand Up @@ -340,7 +334,6 @@ impl ControlStreamManager {
InjectBarrierRequest {
request_id: StreamRpcManager::new_request_id(),
barrier: Some(barrier),
actor_ids_to_send,
actor_ids_to_collect,
table_ids_to_sync,
partial_graph_id,
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
barrier_test_env.inject_barrier(&b1, [], [actor_id]);
barrier_test_env.inject_barrier(&b1, [actor_id]);
tx.send(Message::Barrier(b1.clone().into_dispatcher()))
.await
.unwrap();
Expand All @@ -1291,7 +1291,7 @@ mod tests {

// 6. Send another barrier.
let b2 = Barrier::new_test_barrier(test_epoch(2));
barrier_test_env.inject_barrier(&b2, [], [actor_id]);
barrier_test_env.inject_barrier(&b2, [actor_id]);
tx.send(Message::Barrier(b2.into_dispatcher()))
.await
.unwrap();
Expand Down Expand Up @@ -1330,7 +1330,7 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
barrier_test_env.inject_barrier(&b3, [], [actor_id]);
barrier_test_env.inject_barrier(&b3, [actor_id]);
tx.send(Message::Barrier(b3.into_dispatcher()))
.await
.unwrap();
Expand All @@ -1344,7 +1344,7 @@ mod tests {

// 11. Send another barrier.
let b4 = Barrier::new_test_barrier(test_epoch(4));
barrier_test_env.inject_barrier(&b4, [], [actor_id]);
barrier_test_env.inject_barrier(&b4, [actor_id]);
tx.send(Message::Barrier(b4.into_dispatcher()))
.await
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async fn test_merger_sum_aggr() {

let mut epoch = test_epoch(1);
let b1 = Barrier::new_test_barrier(epoch);
barrier_test_env.inject_barrier(&b1, [], actors.clone());
barrier_test_env.inject_barrier(&b1, actors.clone());
input
.send(Message::Barrier(b1.into_dispatcher()))
.await
Expand All @@ -244,7 +244,7 @@ async fn test_merger_sum_aggr() {
input.send(Message::Chunk(chunk)).await.unwrap();
}
let b = Barrier::new_test_barrier(epoch);
barrier_test_env.inject_barrier(&b, [], actors.clone());
barrier_test_env.inject_barrier(&b, actors.clone());
input
.send(Message::Barrier(b.into_dispatcher()))
.await
Expand All @@ -253,7 +253,7 @@ async fn test_merger_sum_aggr() {
}
let b = Barrier::new_test_barrier(epoch)
.with_mutation(Mutation::Stop(actors.clone().into_iter().collect()));
barrier_test_env.inject_barrier(&b, [], actors);
barrier_test_env.inject_barrier(&b, actors);
input
.send(Message::Barrier(b.into_dispatcher()))
.await
Expand Down
12 changes: 4 additions & 8 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,13 +524,13 @@ mod tests {
.map(|(_, epoch)| {
let barrier = Barrier::with_prev_epoch_for_test(*epoch, *prev_epoch);
*prev_epoch = *epoch;
barrier_test_env.inject_barrier(&barrier, [], [actor_id]);
barrier_test_env.inject_barrier(&barrier, [actor_id]);
(*epoch, barrier)
})
.collect();
let b2 = Barrier::with_prev_epoch_for_test(test_epoch(1000), *prev_epoch)
.with_mutation(Mutation::Stop(HashSet::default()));
barrier_test_env.inject_barrier(&b2, [], [actor_id]);
barrier_test_env.inject_barrier(&b2, [actor_id]);

for (tx_id, tx) in txs.into_iter().enumerate() {
let epochs = epochs.clone();
Expand Down Expand Up @@ -703,7 +703,7 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
barrier_test_env.inject_barrier(&b1, [], [actor_id]);
barrier_test_env.inject_barrier(&b1, [actor_id]);
send!(
[untouched, old],
Message::Barrier(b1.clone().into_dispatcher())
Expand Down Expand Up @@ -820,11 +820,7 @@ mod tests {
)
};

test_env.inject_barrier(
&exchange_client_test_barrier(),
[],
[remote_input.actor_id()],
);
test_env.inject_barrier(&exchange_client_test_barrier(), [remote_input.actor_id()]);

pin_mut!(remote_input);

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ mod tests {
},
));

barrier_test_env.inject_barrier(&b1, [], [actor_id]);
barrier_test_env.inject_barrier(&b1, [actor_id]);

send!([new], Message::Barrier(b1.clone().into_dispatcher()));
assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream.
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/from_proto/barrier_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use risingwave_pb::stream_plan::BarrierRecvNode;
use tokio::sync::mpsc::unbounded_channel;

use super::*;
use crate::executor::BarrierRecvExecutor;
Expand All @@ -33,10 +32,10 @@ impl ExecutorBuilder for BarrierRecvExecutorBuilder {
"barrier receiver should not have input"
);

let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.register_sender(params.actor_context.id, sender);
let barrier_receiver = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);

let exec = BarrierRecvExecutor::new(params.actor_context, barrier_receiver);
Ok((params.info, exec).into())
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/from_proto/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries};
use risingwave_storage::StateStore;
use tokio::sync::mpsc::unbounded_channel;

use super::ExecutorBuilder;
use crate::common::table::state_table::StateTable;
Expand All @@ -36,10 +35,10 @@ impl ExecutorBuilder for NowExecutorBuilder {
node: &NowNode,
store: impl StateStore,
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.register_sender(params.actor_context.id, sender);
let barrier_receiver = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);

let mode = if let Ok(pb_mode) = node.get_mode() {
match pb_mode {
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_pb::plan_common::{
};
use risingwave_pb::stream_plan::SourceNode;
use risingwave_storage::panic_store::PanicStateStore;
use tokio::sync::mpsc::unbounded_channel;

use super::*;
use crate::executor::source::{
Expand Down Expand Up @@ -141,10 +140,10 @@ impl ExecutorBuilder for SourceExecutorBuilder {
node: &Self::Node,
store: impl StateStore,
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.register_sender(params.actor_context.id, sender);
let barrier_receiver = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);
let system_params = params.env.system_params_manager_ref().get_params();

if let Some(source) = &node.source_inner {
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/from_proto/stream_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType};
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use tokio::sync::mpsc;

use super::*;
use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
Expand Down Expand Up @@ -154,10 +153,10 @@ impl ExecutorBuilder for StreamScanExecutorBuilder {
.collect_vec();

let vnodes = params.vnode_bitmap.map(Arc::new);
let (barrier_tx, barrier_rx) = mpsc::unbounded_channel();
params
.create_actor_context
.register_sender(params.actor_context.id, barrier_tx);
let barrier_rx = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);

let upstream_table =
StorageTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc);
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/from_proto/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use itertools::Itertools;
use risingwave_expr::expr::build_non_strict_from_prost;
use risingwave_pb::stream_plan::ValuesNode;
use risingwave_storage::StateStore;
use tokio::sync::mpsc::unbounded_channel;

use super::ExecutorBuilder;
use crate::error::StreamResult;
Expand All @@ -35,10 +34,10 @@ impl ExecutorBuilder for ValuesExecutorBuilder {
node: &ValuesNode,
_store: impl StateStore,
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.register_sender(params.actor_context.id, sender);
let barrier_receiver = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);
let progress = params
.local_barrier_manager
.register_create_mview_progress(params.actor_context.id);
Expand Down
Loading

0 comments on commit da46c4d

Please sign in to comment.