Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: no need to resolve hanging channel #8050

Merged
merged 3 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 1 addition & 49 deletions dashboard/proto/gen/stream_service.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,10 @@ import "stream_plan.proto";
option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message HangingChannel {
common.ActorInfo upstream = 1;
common.ActorInfo downstream = 2;
}

// Describe the fragments which will be running on this node
message UpdateActorsRequest {
string request_id = 1;
repeated stream_plan.StreamActor actors = 2;
repeated HangingChannel hanging_channels = 3;
}

message UpdateActorsResponse {
Expand Down
5 changes: 1 addition & 4 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ impl StreamService for StreamServiceImpl {
request: Request<UpdateActorsRequest>,
) -> std::result::Result<Response<UpdateActorsResponse>, Status> {
let req = request.into_inner();
let res = self
.mgr
.update_actors(&req.actors, &req.hanging_channels)
.await;
let res = self.mgr.update_actors(&req.actors).await;
match res {
Err(e) => {
error!("failed to update stream actor {}", e);
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ where
.update_actors(UpdateActorsRequest {
request_id,
actors: node_actors.get(node_id).cloned().unwrap_or_default(),
..Default::default()
})
.await?;
}
Expand Down
63 changes: 6 additions & 57 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use risingwave_pb::stream_plan::{
DispatcherType, FragmentTypeFlag, PauseMutation, ResumeMutation, StreamActor, StreamNode,
};
use risingwave_pb::stream_service::{
BroadcastActorInfoTableRequest, BuildActorsRequest, HangingChannel, UpdateActorsRequest,
BroadcastActorInfoTableRequest, BuildActorsRequest, UpdateActorsRequest,
};
use uuid::Uuid;

Expand Down Expand Up @@ -784,9 +784,6 @@ where
}
}

// Note: we must create hanging channels at first, creating hanging channels with upstream
// actors after scaling will cause the barrier to fail to push down.
let mut worker_hanging_channels: HashMap<WorkerId, Vec<HangingChannel>> = HashMap::new();
let mut new_created_actors = HashMap::new();
for fragment_id in reschedules.keys() {
let actors_to_create = fragment_actors_to_create
Expand All @@ -803,36 +800,8 @@ where
.zip_eq_debug(repeat(fragment.actors.first().unwrap()).take(actors_to_create.len()))
{
let new_actor_id = actor_to_create.0;
let new_parallel_unit_id = actor_to_create.1;
let mut new_actor = sample_actor.clone();

let worker = ctx.parallel_unit_id_to_worker(new_parallel_unit_id)?;

// Because of the transferability of no shuffle, the upstream and downstream actors
// of no shuffle are always created at the same time, so the upstream and downstream
// scaling process of 1-1 no shuffle does not need to create a hanging channel,
// but for the downstream of no shuffle with multiple upstreams (e.g. dynamic filter
// actor), the hanging channel from the remaining upstreams to the new actor needs
// to be created
for upstream_actor_id in &new_actor.upstream_actor_id {
let upstream_worker_id = ctx
.actor_id_to_parallel_unit(upstream_actor_id)?
.worker_node_id;
worker_hanging_channels
.entry(upstream_worker_id)
.or_default()
.push(HangingChannel {
upstream: Some(ActorInfo {
actor_id: *upstream_actor_id,
host: None,
}),
downstream: Some(ActorInfo {
actor_id: *new_actor_id,
host: worker.host.clone(),
}),
});
}

// This should be assigned before the `modify_actor_upstream_and_downstream` call,
// because we need to use the new actor id to find the upstream and
// downstream in the NoShuffle relationship
Expand Down Expand Up @@ -935,8 +904,7 @@ where
}

self.create_actors_on_compute_node(
&ctx,
worker_hanging_channels,
&ctx.worker_nodes,
actor_infos_to_broadcast,
node_actors_to_create,
broadcast_worker_ids,
Expand Down Expand Up @@ -1173,14 +1141,13 @@ where

async fn create_actors_on_compute_node(
&self,
ctx: &RescheduleContext,
worker_hanging_channels: HashMap<WorkerId, Vec<HangingChannel>>,
worker_nodes: &HashMap<WorkerId, WorkerNode>,
actor_infos_to_broadcast: BTreeMap<u32, ActorInfo>,
node_actors_to_create: HashMap<WorkerId, Vec<StreamActor>>,
broadcast_worker_ids: HashSet<u32>,
) -> MetaResult<()> {
for worker_id in &broadcast_worker_ids {
let node = ctx.worker_nodes.get(worker_id).unwrap();
let node = worker_nodes.get(worker_id).unwrap();
let client = self.env.stream_client_pool().get(node).await?;

let actor_infos_to_broadcast = actor_infos_to_broadcast.values().cloned().collect();
Expand All @@ -1193,38 +1160,20 @@ where
.await?;
}

let mut worker_hanging_channels = worker_hanging_channels;

for (node_id, stream_actors) in &node_actors_to_create {
let node = ctx.worker_nodes.get(node_id).unwrap();
let node = worker_nodes.get(node_id).unwrap();
let client = self.env.stream_client_pool().get(node).await?;
let request_id = Uuid::new_v4().to_string();
let request = UpdateActorsRequest {
request_id,
actors: stream_actors.clone(),
hanging_channels: worker_hanging_channels.remove(node_id).unwrap_or_default(),
};

client.to_owned().update_actors(request).await?;
}

for (node_id, hanging_channels) in worker_hanging_channels {
let node = ctx.worker_nodes.get(&node_id).unwrap();
let client = self.env.stream_client_pool().get(node).await?;
let request_id = Uuid::new_v4().to_string();

client
.to_owned()
.update_actors(UpdateActorsRequest {
request_id,
actors: vec![],
hanging_channels,
})
.await?;
}

for (node_id, stream_actors) in node_actors_to_create {
let node = ctx.worker_nodes.get(&node_id).unwrap();
let node = worker_nodes.get(&node_id).unwrap();
let client = self.env.stream_client_pool().get(node).await?;
let request_id = Uuid::new_v4().to_string();

Expand Down
7 changes: 0 additions & 7 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,6 @@ impl Locations {
.into_group_map()
}

/// Returns the `ActorInfo` map for every actor.
pub fn actor_info_map(&self) -> HashMap<ActorId, ActorInfo> {
self.actor_infos()
.map(|info| (info.actor_id, info))
.collect()
}

/// Returns an iterator of `ActorInfo`.
pub fn actor_infos(&self) -> impl Iterator<Item = ActorInfo> + '_ {
self.actor_locations
Expand Down
62 changes: 1 addition & 61 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ use futures::future::{try_join_all, BoxFuture};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_pb::catalog::Table;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::Dispatcher;
use risingwave_pb::stream_service::{
BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, HangingChannel,
UpdateActorsRequest,
BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, UpdateActorsRequest,
};
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -334,48 +332,7 @@ where
.chain(existing_locations.actor_infos())
.collect_vec();

let building_actor_infos = building_locations.actor_info_map();
let building_worker_actors = building_locations.worker_actors();
let existing_worker_actors = existing_locations.worker_actors();

// Hanging channels for each worker node.
let mut hanging_channels = {
// upstream_actor_id -> Vec<downstream_actor_info>
let up_id_to_down_info = dispatchers
.iter()
.map(|(&up_id, dispatchers)| {
let down_infos = dispatchers
.iter()
.flat_map(|d| d.downstream_actor_id.iter())
.map(|down_id| building_actor_infos[down_id].clone())
.collect_vec();
(up_id, down_infos)
})
.collect::<HashMap<_, _>>();

existing_worker_actors
.iter()
.map(|(&worker_id, up_ids)| {
(
worker_id,
up_ids
.iter()
.flat_map(|up_id| {
up_id_to_down_info[up_id]
.iter()
.map(|down_info| HangingChannel {
upstream: Some(ActorInfo {
actor_id: *up_id,
host: None,
}),
downstream: Some(down_info.clone()),
})
})
.collect_vec(),
)
})
.collect::<HashMap<_, _>>()
};

// We send RPC request in two stages.
// The first stage does 2 things: broadcast actor info, and send local actor ids to
Expand All @@ -402,23 +359,6 @@ where
.update_actors(UpdateActorsRequest {
request_id,
actors: stream_actors.clone(),
hanging_channels: hanging_channels.remove(worker_id).unwrap_or_default(),
})
.await?;
}

// Build **remaining** hanging channels on compute nodes.
for (worker_id, hanging_channels) in hanging_channels {
let worker_node = building_locations.worker_locations.get(&worker_id).unwrap();
let client = self.env.stream_client_pool().get(worker_node).await?;

let request_id = Uuid::new_v4().to_string();

client
.update_actors(UpdateActorsRequest {
request_id,
actors: vec![],
hanging_channels,
})
.await?;
}
Expand Down
15 changes: 3 additions & 12 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ mod tests {
use crate::executor::exchange::output::Output;
use crate::executor::exchange::permit::channel_for_test;
use crate::executor::receiver::ReceiverExecutor;
use crate::task::test_utils::{add_local_channels, helper_make_local_actor};
use crate::task::test_utils::helper_make_local_actor;

#[derive(Debug)]
pub struct MockOutput {
Expand Down Expand Up @@ -977,24 +977,15 @@ mod tests {
let (untouched, old, new) = (234, 235, 238); // broadcast downstream actors
let (old_simple, new_simple) = (114, 514); // simple downstream actors

// 1. Register info and channels in context.
// 1. Register info in context.
{
let mut actor_infos = ctx.actor_infos.write();

for local_actor_id in [actor_id, untouched, old, new, old_simple, new_simple] {
actor_infos.insert(local_actor_id, helper_make_local_actor(local_actor_id));
}
}
add_local_channels(
ctx.clone(),
vec![
(actor_id, untouched),
(actor_id, old),
(actor_id, new),
(actor_id, old_simple),
(actor_id, new_simple),
],
);
// actor_id -> untouched, old, new, old_simple, new_simple

let broadcast_dispatcher_id = 666;
let broadcast_dispatcher = DispatcherImpl::new(
Expand Down
Loading