Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: register actor barrier sender asynchronously #18104

Merged
merged 10 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@
common.Status status = 2;
}

message InjectBarrierRequest {

Check failure on line 55 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "actor_ids_to_send" on message "InjectBarrierRequest" was deleted without reserving the name "actor_ids_to_send".

Check failure on line 55 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "actor_ids_to_send" on message "InjectBarrierRequest" was deleted without reserving the number "3".
string request_id = 1;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
Expand Down
18 changes: 0 additions & 18 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,24 +259,6 @@ impl InflightGraphInfo {
})
}

/// Returns actor list to send in the target worker node.
pub fn actor_ids_to_send(&self, node_id: WorkerId) -> impl Iterator<Item = ActorId> + '_ {
self.fragment_infos
.values()
.filter(|info| info.is_injectable)
.flat_map(move |info| {
info.actors
.iter()
.filter_map(move |(actor_id, actor_node_id)| {
if *actor_node_id == node_id {
Some(*actor_id)
} else {
None
}
})
})
}

pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
self.fragment_infos
.values()
Expand Down
7 changes: 0 additions & 7 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,9 @@ impl ControlStreamManager {
self.nodes
.iter_mut()
.map(|(node_id, node)| {
let actor_ids_to_send: Vec<_> =
pre_applied_graph_info.actor_ids_to_send(*node_id).collect();
let actor_ids_to_collect: Vec<_> = pre_applied_graph_info
.actor_ids_to_collect(*node_id)
.collect();
if actor_ids_to_collect.is_empty() {
// No need to send or collect barrier for this node.
assert!(actor_ids_to_send.is_empty());
}
let table_ids_to_sync = if let Some(graph_info) = applied_graph_info {
graph_info
.existing_table_ids()
Expand Down Expand Up @@ -340,7 +334,6 @@ impl ControlStreamManager {
InjectBarrierRequest {
request_id: StreamRpcManager::new_request_id(),
barrier: Some(barrier),
actor_ids_to_send,
actor_ids_to_collect,
table_ids_to_sync,
partial_graph_id,
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
barrier_test_env.inject_barrier(&b1, [], [actor_id]);
barrier_test_env.inject_barrier(&b1, [actor_id]);
tx.send(Message::Barrier(b1.clone().into_dispatcher()))
.await
.unwrap();
Expand All @@ -1291,7 +1291,7 @@ mod tests {

// 6. Send another barrier.
let b2 = Barrier::new_test_barrier(test_epoch(2));
barrier_test_env.inject_barrier(&b2, [], [actor_id]);
barrier_test_env.inject_barrier(&b2, [actor_id]);
tx.send(Message::Barrier(b2.into_dispatcher()))
.await
.unwrap();
Expand Down Expand Up @@ -1330,7 +1330,7 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
barrier_test_env.inject_barrier(&b3, [], [actor_id]);
barrier_test_env.inject_barrier(&b3, [actor_id]);
tx.send(Message::Barrier(b3.into_dispatcher()))
.await
.unwrap();
Expand All @@ -1344,7 +1344,7 @@ mod tests {

// 11. Send another barrier.
let b4 = Barrier::new_test_barrier(test_epoch(4));
barrier_test_env.inject_barrier(&b4, [], [actor_id]);
barrier_test_env.inject_barrier(&b4, [actor_id]);
tx.send(Message::Barrier(b4.into_dispatcher()))
.await
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async fn test_merger_sum_aggr() {

let mut epoch = test_epoch(1);
let b1 = Barrier::new_test_barrier(epoch);
barrier_test_env.inject_barrier(&b1, [], actors.clone());
barrier_test_env.inject_barrier(&b1, actors.clone());
input
.send(Message::Barrier(b1.into_dispatcher()))
.await
Expand All @@ -244,7 +244,7 @@ async fn test_merger_sum_aggr() {
input.send(Message::Chunk(chunk)).await.unwrap();
}
let b = Barrier::new_test_barrier(epoch);
barrier_test_env.inject_barrier(&b, [], actors.clone());
barrier_test_env.inject_barrier(&b, actors.clone());
input
.send(Message::Barrier(b.into_dispatcher()))
.await
Expand All @@ -253,7 +253,7 @@ async fn test_merger_sum_aggr() {
}
let b = Barrier::new_test_barrier(epoch)
.with_mutation(Mutation::Stop(actors.clone().into_iter().collect()));
barrier_test_env.inject_barrier(&b, [], actors);
barrier_test_env.inject_barrier(&b, actors);
input
.send(Message::Barrier(b.into_dispatcher()))
.await
Expand Down
12 changes: 4 additions & 8 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,13 +524,13 @@ mod tests {
.map(|(_, epoch)| {
let barrier = Barrier::with_prev_epoch_for_test(*epoch, *prev_epoch);
*prev_epoch = *epoch;
barrier_test_env.inject_barrier(&barrier, [], [actor_id]);
barrier_test_env.inject_barrier(&barrier, [actor_id]);
(*epoch, barrier)
})
.collect();
let b2 = Barrier::with_prev_epoch_for_test(test_epoch(1000), *prev_epoch)
.with_mutation(Mutation::Stop(HashSet::default()));
barrier_test_env.inject_barrier(&b2, [], [actor_id]);
barrier_test_env.inject_barrier(&b2, [actor_id]);

for (tx_id, tx) in txs.into_iter().enumerate() {
let epochs = epochs.clone();
Expand Down Expand Up @@ -703,7 +703,7 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
barrier_test_env.inject_barrier(&b1, [], [actor_id]);
barrier_test_env.inject_barrier(&b1, [actor_id]);
send!(
[untouched, old],
Message::Barrier(b1.clone().into_dispatcher())
Expand Down Expand Up @@ -820,11 +820,7 @@ mod tests {
)
};

test_env.inject_barrier(
&exchange_client_test_barrier(),
[],
[remote_input.actor_id()],
);
test_env.inject_barrier(&exchange_client_test_barrier(), [remote_input.actor_id()]);

pin_mut!(remote_input);

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ mod tests {
},
));

barrier_test_env.inject_barrier(&b1, [], [actor_id]);
barrier_test_env.inject_barrier(&b1, [actor_id]);

send!([new], Message::Barrier(b1.clone().into_dispatcher()));
assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream.
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/from_proto/barrier_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use risingwave_pb::stream_plan::BarrierRecvNode;
use tokio::sync::mpsc::unbounded_channel;

use super::*;
use crate::executor::BarrierRecvExecutor;
Expand All @@ -33,10 +32,10 @@ impl ExecutorBuilder for BarrierRecvExecutorBuilder {
"barrier receiver should not have input"
);

let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.register_sender(params.actor_context.id, sender);
let barrier_receiver = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);

let exec = BarrierRecvExecutor::new(params.actor_context, barrier_receiver);
Ok((params.info, exec).into())
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/from_proto/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries};
use risingwave_storage::StateStore;
use tokio::sync::mpsc::unbounded_channel;

use super::ExecutorBuilder;
use crate::common::table::state_table::StateTable;
Expand All @@ -36,10 +35,10 @@ impl ExecutorBuilder for NowExecutorBuilder {
node: &NowNode,
store: impl StateStore,
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.register_sender(params.actor_context.id, sender);
let barrier_receiver = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);

let mode = if let Ok(pb_mode) = node.get_mode() {
match pb_mode {
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_pb::plan_common::{
};
use risingwave_pb::stream_plan::SourceNode;
use risingwave_storage::panic_store::PanicStateStore;
use tokio::sync::mpsc::unbounded_channel;

use super::*;
use crate::executor::source::{
Expand Down Expand Up @@ -141,10 +140,10 @@ impl ExecutorBuilder for SourceExecutorBuilder {
node: &Self::Node,
store: impl StateStore,
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.register_sender(params.actor_context.id, sender);
let barrier_receiver = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);
let system_params = params.env.system_params_manager_ref().get_params();

if let Some(source) = &node.source_inner {
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/from_proto/stream_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType};
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use tokio::sync::mpsc;

use super::*;
use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
Expand Down Expand Up @@ -154,10 +153,10 @@ impl ExecutorBuilder for StreamScanExecutorBuilder {
.collect_vec();

let vnodes = params.vnode_bitmap.map(Arc::new);
let (barrier_tx, barrier_rx) = mpsc::unbounded_channel();
params
.create_actor_context
.register_sender(params.actor_context.id, barrier_tx);
let barrier_rx = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);

let upstream_table =
StorageTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc);
Expand Down
9 changes: 4 additions & 5 deletions src/stream/src/from_proto/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use itertools::Itertools;
use risingwave_expr::expr::build_non_strict_from_prost;
use risingwave_pb::stream_plan::ValuesNode;
use risingwave_storage::StateStore;
use tokio::sync::mpsc::unbounded_channel;

use super::ExecutorBuilder;
use crate::error::StreamResult;
Expand All @@ -35,10 +34,10 @@ impl ExecutorBuilder for ValuesExecutorBuilder {
node: &ValuesNode,
_store: impl StateStore,
) -> StreamResult<Executor> {
let (sender, barrier_receiver) = unbounded_channel();
params
.create_actor_context
.register_sender(params.actor_context.id, sender);
let barrier_receiver = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);
let progress = params
.local_barrier_manager
.register_create_mview_progress(params.actor_context.id);
Expand Down
Loading
Loading