Skip to content

Commit

Permalink
refactor: register actor barrier sender asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 19, 2024
1 parent cfea9f3 commit 4c1c09c
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 215 deletions.
3 changes: 1 addition & 2 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ message DropActorsResponse {
message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
Expand Down Expand Up @@ -145,4 +144,4 @@ service StreamService {
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
}

// TODO: Lifecycle management for actors.
// TODO: Lifecycle management for actors.
18 changes: 0 additions & 18 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,24 +259,6 @@ impl InflightGraphInfo {
})
}

/// Returns actor list to send in the target worker node.
pub fn actor_ids_to_send(&self, node_id: WorkerId) -> impl Iterator<Item = ActorId> + '_ {
self.fragment_infos
.values()
.filter(|info| info.is_injectable)
.flat_map(move |info| {
info.actors
.iter()
.filter_map(move |(actor_id, actor_node_id)| {
if *actor_node_id == node_id {
Some(*actor_id)
} else {
None
}
})
})
}

pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
self.fragment_infos
.values()
Expand Down
7 changes: 0 additions & 7 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,9 @@ impl ControlStreamManager {
self.nodes
.iter_mut()
.map(|(node_id, node)| {
let actor_ids_to_send: Vec<_> =
pre_applied_graph_info.actor_ids_to_send(*node_id).collect();
let actor_ids_to_collect: Vec<_> = pre_applied_graph_info
.actor_ids_to_collect(*node_id)
.collect();
if actor_ids_to_collect.is_empty() {
// No need to send or collect barrier for this node.
assert!(actor_ids_to_send.is_empty());
}
let table_ids_to_sync = if let Some(graph_info) = applied_graph_info {
graph_info
.existing_table_ids()
Expand Down Expand Up @@ -340,7 +334,6 @@ impl ControlStreamManager {
InjectBarrierRequest {
request_id: StreamRpcManager::new_request_id(),
barrier: Some(barrier),
actor_ids_to_send,
actor_ids_to_collect,
table_ids_to_sync,
partial_graph_id,
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
barrier_test_env.inject_barrier(&b1, [], [actor_id]);
barrier_test_env.inject_barrier(&b1, [actor_id]);
tx.send(Message::Barrier(b1.clone().into_dispatcher()))
.await
.unwrap();
Expand All @@ -1291,7 +1291,7 @@ mod tests {

// 6. Send another barrier.
let b2 = Barrier::new_test_barrier(test_epoch(2));
barrier_test_env.inject_barrier(&b2, [], [actor_id]);
barrier_test_env.inject_barrier(&b2, [actor_id]);
tx.send(Message::Barrier(b2.into_dispatcher()))
.await
.unwrap();
Expand Down Expand Up @@ -1330,7 +1330,7 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
barrier_test_env.inject_barrier(&b3, [], [actor_id]);
barrier_test_env.inject_barrier(&b3, [actor_id]);
tx.send(Message::Barrier(b3.into_dispatcher()))
.await
.unwrap();
Expand All @@ -1344,7 +1344,7 @@ mod tests {

// 11. Send another barrier.
let b4 = Barrier::new_test_barrier(test_epoch(4));
barrier_test_env.inject_barrier(&b4, [], [actor_id]);
barrier_test_env.inject_barrier(&b4, [actor_id]);
tx.send(Message::Barrier(b4.into_dispatcher()))
.await
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async fn test_merger_sum_aggr() {

let mut epoch = test_epoch(1);
let b1 = Barrier::new_test_barrier(epoch);
barrier_test_env.inject_barrier(&b1, [], actors.clone());
barrier_test_env.inject_barrier(&b1, actors.clone());
input
.send(Message::Barrier(b1.into_dispatcher()))
.await
Expand All @@ -244,7 +244,7 @@ async fn test_merger_sum_aggr() {
input.send(Message::Chunk(chunk)).await.unwrap();
}
let b = Barrier::new_test_barrier(epoch);
barrier_test_env.inject_barrier(&b, [], actors.clone());
barrier_test_env.inject_barrier(&b, actors.clone());
input
.send(Message::Barrier(b.into_dispatcher()))
.await
Expand All @@ -253,7 +253,7 @@ async fn test_merger_sum_aggr() {
}
let b = Barrier::new_test_barrier(epoch)
.with_mutation(Mutation::Stop(actors.clone().into_iter().collect()));
barrier_test_env.inject_barrier(&b, [], actors);
barrier_test_env.inject_barrier(&b, actors);
input
.send(Message::Barrier(b.into_dispatcher()))
.await
Expand Down
7 changes: 3 additions & 4 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,13 +524,13 @@ mod tests {
.map(|(_, epoch)| {
let barrier = Barrier::with_prev_epoch_for_test(*epoch, *prev_epoch);
*prev_epoch = *epoch;
barrier_test_env.inject_barrier(&barrier, [], [actor_id]);
barrier_test_env.inject_barrier(&barrier, [actor_id]);
(*epoch, barrier)
})
.collect();
let b2 = Barrier::with_prev_epoch_for_test(test_epoch(1000), *prev_epoch)
.with_mutation(Mutation::Stop(HashSet::default()));
barrier_test_env.inject_barrier(&b2, [], [actor_id]);
barrier_test_env.inject_barrier(&b2, [actor_id]);

for (tx_id, tx) in txs.into_iter().enumerate() {
let epochs = epochs.clone();
Expand Down Expand Up @@ -703,7 +703,7 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
barrier_test_env.inject_barrier(&b1, [], [actor_id]);
barrier_test_env.inject_barrier(&b1, [actor_id]);
send!(
[untouched, old],
Message::Barrier(b1.clone().into_dispatcher())
Expand Down Expand Up @@ -822,7 +822,6 @@ mod tests {

test_env.inject_barrier(
&exchange_client_test_barrier(),
[],
[remote_input.actor_id()],
);

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ mod tests {
},
));

barrier_test_env.inject_barrier(&b1, [], [actor_id]);
barrier_test_env.inject_barrier(&b1, [actor_id]);

send!([new], Message::Barrier(b1.clone().into_dispatcher()));
assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream.
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/from_proto/barrier_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ impl ExecutorBuilder for BarrierRecvExecutorBuilder {

let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.shared_context
.local_barrier_manager
.register_sender(params.actor_context.id, sender);

let exec = BarrierRecvExecutor::new(params.actor_context, barrier_receiver);
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/from_proto/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ impl ExecutorBuilder for NowExecutorBuilder {
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.shared_context
.local_barrier_manager
.register_sender(params.actor_context.id, sender);

let mode = if let Ok(pb_mode) = node.get_mode() {
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ impl ExecutorBuilder for SourceExecutorBuilder {
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.shared_context
.local_barrier_manager
.register_sender(params.actor_context.id, sender);
let system_params = params.env.system_params_manager_ref().get_params();

Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/from_proto/stream_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ impl ExecutorBuilder for StreamScanExecutorBuilder {
let vnodes = params.vnode_bitmap.map(Arc::new);
let (barrier_tx, barrier_rx) = mpsc::unbounded_channel();
params
.create_actor_context
.shared_context
.local_barrier_manager
.register_sender(params.actor_context.id, barrier_tx);

let upstream_table =
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/from_proto/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ impl ExecutorBuilder for ValuesExecutorBuilder {
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.shared_context
.local_barrier_manager
.register_sender(params.actor_context.id, sender);
let progress = params
.local_barrier_manager
Expand Down
Loading

0 comments on commit 4c1c09c

Please sign in to comment.