diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 8f9b2f3d313ac..ce7fa7985007f 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -29,6 +29,7 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_common_service::tracing::TracingExtractLayer; +use risingwave_meta::barrier::StreamRpcManager; use risingwave_meta::controller::catalog::CatalogController; use risingwave_meta::controller::cluster::ClusterController; use risingwave_meta::manager::MetadataManager; @@ -525,6 +526,8 @@ pub async fn start_service_as_election_leader( let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(); let mut sub_tasks = vec![shutdown_handle]; + let stream_rpc_manager = StreamRpcManager::new(env.clone()); + let barrier_manager = GlobalBarrierManager::new( scheduled_barriers, env.clone(), @@ -533,6 +536,7 @@ pub async fn start_service_as_election_leader( source_manager.clone(), sink_manager.clone(), meta_metrics.clone(), + stream_rpc_manager.clone(), ); { @@ -549,6 +553,7 @@ pub async fn start_service_as_election_leader( barrier_scheduler.clone(), source_manager.clone(), hummock_manager.clone(), + stream_rpc_manager, ) .unwrap(), ); diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 4c9e3ba2b5f71..33270fc2204f9 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -53,6 +53,7 @@ impl ScaleServiceImpl { let scale_controller = Arc::new(ScaleController::new( &metadata_manager, source_manager.clone(), + stream_manager.stream_rpc_manager.clone(), stream_manager.env.clone(), )); diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 19a9c8b48a806..6b1b73d6ca697 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -35,9 +35,8 @@ use risingwave_pb::stream_plan::{ PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor, ThrottleMutation, UpdateMutation, }; -use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; +use risingwave_pb::stream_service::WaitEpochCommitRequest; use thiserror_ext::AsReport; -use uuid::Uuid; use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo}; use super::trace::TracedEpoch; @@ -739,27 +738,16 @@ impl CommandContext { /// Clean up actors in CNs if needed, used by drop, cancel and reschedule commands. async fn clean_up(&self, actors: Vec) -> MetaResult<()> { - let futures = self.info.node_map.values().map(|node| { - let request_id = Uuid::new_v4().to_string(); - let actor_ids = actors.clone(); - - async move { - let client = self - .barrier_manager_context - .env - .stream_client_pool() - .get(node) - .await?; - let request = DropActorsRequest { - request_id, - actor_ids, - }; - client.drop_actors(request).await - } - }); - - try_join_all(futures).await?; - Ok(()) + self.barrier_manager_context + .stream_rpc_manager + .drop_actors( + &self.info.node_map, + self.info + .node_map + .keys() + .map(|worker_id| (*worker_id, actors.clone())), + ) + .await } pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index c6d57b01dca53..8536a611aafaa 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -71,6 +71,7 @@ mod state; mod trace; pub use self::command::{Command, ReplaceTablePlan, Reschedule}; +pub use self::rpc::StreamRpcManager; pub use self::schedule::BarrierScheduler; pub use self::trace::TracedEpoch; @@ -149,6 +150,8 @@ pub struct GlobalBarrierManagerContext { metrics: Arc, + stream_rpc_manager: StreamRpcManager, + env: MetaSrvEnv, } @@ -381,6 +384,7 @@ impl GlobalBarrierManager { source_manager: SourceManagerRef, sink_manager: SinkCoordinatorManager, metrics: Arc, + stream_rpc_manager: StreamRpcManager, ) -> Self { let enable_recovery = env.opts.enable_recovery; let in_flight_barrier_nums = env.opts.in_flight_barrier_nums; @@ -397,6 +401,7 @@ impl GlobalBarrierManager { let scale_controller = Arc::new(ScaleController::new( &metadata_manager, source_manager.clone(), + stream_rpc_manager.clone(), env.clone(), )); @@ -409,6 +414,7 @@ impl GlobalBarrierManager { sink_manager, metrics, tracker: Arc::new(Mutex::new(tracker)), + stream_rpc_manager, env: env.clone(), }; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 37c270da30c78..0f8ecd027dcd0 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -17,9 +17,6 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::{anyhow, Context}; -use futures::future::try_join_all; -use futures::stream::FuturesUnordered; -use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StateTableId; @@ -29,14 +26,10 @@ use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::AddMutation; -use risingwave_pb::stream_service::{ - BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, UpdateActorsRequest, -}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tracing::{debug, warn, Instrument}; -use uuid::Uuid; use super::TracedEpoch; use crate::barrier::command::CommandContext; @@ -979,31 +972,19 @@ impl GlobalBarrierManagerContext { return Err(anyhow!("actors dropped during update").into()); } - info.actor_map.iter().map(|(node_id, actors)| { - let node_actors = all_node_actors.remove(node_id).unwrap_or_default(); - let node = info.node_map.get(node_id).unwrap(); - let actor_infos = actor_infos.clone(); - - async move { - let client = self.env.stream_client_pool().get(node).await?; - client - .broadcast_actor_info_table(BroadcastActorInfoTableRequest { - info: actor_infos, - }) - .await?; - - let request_id = Uuid::new_v4().to_string(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "update actors"); - client - .update_actors(UpdateActorsRequest { - request_id, - actors: node_actors, - }) - .await?; - - Ok(()) as MetaResult<()> - } - }).collect::>().try_collect::<()>().await?; + self.stream_rpc_manager + .broadcast_update_actor_info( + &info.node_map, + info.actor_map.keys().cloned(), + actor_infos.into_iter(), + info.actor_map.keys().map(|node_id| { + ( + *node_id, + all_node_actors.remove(node_id).unwrap_or_default(), + ) + }), + ) + .await?; Ok(()) } @@ -1015,26 +996,14 @@ impl GlobalBarrierManagerContext { return Ok(()); } - info.actor_map - .iter() - .map(|(node_id, actors)| async move { - let actors = actors.iter().cloned().collect(); - let node = info.node_map.get(node_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - - let request_id = Uuid::new_v4().to_string(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors"); - client - .build_actors(BuildActorsRequest { - request_id, - actor_id: actors, - }) - .await?; - - Ok(()) as MetaResult<_> - }) - .collect::>() - .try_collect::<()>() + self.stream_rpc_manager + .build_actors( + &info.node_map, + info.actor_map.iter().map(|(node_id, actors)| { + let actors = actors.iter().cloned().collect(); + (*node_id, actors) + }), + ) .await?; Ok(()) @@ -1042,17 +1011,11 @@ impl GlobalBarrierManagerContext { /// Reset all compute nodes by calling `force_stop_actors`. async fn reset_compute_nodes(&self, info: &InflightActorInfo) -> MetaResult<()> { - let futures = info.node_map.values().map(|worker_node| async move { - let client = self.env.stream_client_pool().get(worker_node).await?; - debug!(worker = ?worker_node.id, "force stop actors"); - client - .force_stop_actors(ForceStopActorsRequest { - request_id: Uuid::new_v4().to_string(), - }) - .await - }); + debug!(worker = ?info.node_map.keys().collect_vec(), "force stop actors"); + self.stream_rpc_manager + .force_stop_actors(info.node_map.values()) + .await?; - try_join_all(futures).await?; debug!("all compute nodes have been reset."); Ok(()) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 55c9fce4c4081..670ee7cf10929 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::future::Future; -use std::ops::Deref; use std::sync::Arc; use anyhow::anyhow; @@ -24,10 +23,16 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; +use risingwave_common::hash::ActorId; use risingwave_common::util::tracing::TracingContext; -use risingwave_pb::stream_plan::{Barrier, BarrierMutation}; -use risingwave_pb::stream_service::{BarrierCompleteRequest, InjectBarrierRequest}; -use risingwave_rpc_client::StreamClientPoolRef; +use risingwave_pb::common::{ActorInfo, WorkerNode}; +use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor}; +use risingwave_pb::stream_service::{ + BarrierCompleteRequest, BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, + ForceStopActorsRequest, InjectBarrierRequest, UpdateActorsRequest, +}; +use risingwave_rpc_client::error::RpcError; +use risingwave_rpc_client::StreamClient; use rw_futures_util::pending_on_none; use tokio::sync::oneshot; use uuid::Uuid; @@ -77,17 +82,21 @@ impl GlobalBarrierManagerContext { ) -> BarrierCompletionFuture { let (tx, rx) = oneshot::channel(); let prev_epoch = command_context.prev_epoch.value().0; - let result = self.inject_barrier_inner(command_context.clone()).await; + let result = self + .stream_rpc_manager + .inject_barrier(command_context.clone()) + .await; match result { Ok(node_need_collect) => { // todo: the collect handler should be abort when recovery. - tokio::spawn(Self::collect_barrier( - self.env.clone(), - node_need_collect, - self.env.stream_client_pool_ref(), - command_context, - tx, - )); + tokio::spawn({ + let stream_rpc_manager = self.stream_rpc_manager.clone(); + async move { + stream_rpc_manager + .collect_barrier(node_need_collect, command_context, tx) + .await + } + }); } Err(e) => { let _ = tx.send(BarrierCompletion { @@ -104,9 +113,11 @@ impl GlobalBarrierManagerContext { }, }) } +} +impl StreamRpcManager { /// Send inject-barrier-rpc to stream service and wait for its response before returns. - async fn inject_barrier_inner( + async fn inject_barrier( &self, command_context: Arc, ) -> MetaResult> { @@ -114,38 +125,44 @@ impl GlobalBarrierManagerContext { let mutation = command_context.to_mutation().await?; let info = command_context.info.clone(); let mut node_need_collect = HashMap::new(); - let inject_futures = info.node_map.iter().filter_map(|(node_id, node)| { - let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec(); - let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec(); - if actor_ids_to_collect.is_empty() { - // No need to send or collect barrier for this node. - assert!(actor_ids_to_send.is_empty()); - node_need_collect.insert(*node_id, false); - None - } else { - node_need_collect.insert(*node_id, true); - let mutation = mutation.clone(); - let request_id = Uuid::new_v4().to_string(); - let barrier = Barrier { - epoch: Some(risingwave_pb::data::Epoch { - curr: command_context.curr_epoch.value().0, - prev: command_context.prev_epoch.value().0, - }), - mutation: mutation.clone().map(|_| BarrierMutation { mutation }), - tracing_context: TracingContext::from_span(command_context.curr_epoch.span()) + self.make_request( + info.node_map.iter().filter_map(|(node_id, node)| { + let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec(); + let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec(); + if actor_ids_to_collect.is_empty() { + // No need to send or collect barrier for this node. + assert!(actor_ids_to_send.is_empty()); + node_need_collect.insert(*node_id, false); + None + } else { + node_need_collect.insert(*node_id, true); + let mutation = mutation.clone(); + let barrier = Barrier { + epoch: Some(risingwave_pb::data::Epoch { + curr: command_context.curr_epoch.value().0, + prev: command_context.prev_epoch.value().0, + }), + mutation: mutation.clone().map(|_| BarrierMutation { mutation }), + tracing_context: TracingContext::from_span( + command_context.curr_epoch.span(), + ) .to_protobuf(), - kind: command_context.kind as i32, - passed_actors: vec![], - }; - async move { - let client = self.env.stream_client_pool().get(node).await?; - - let request = InjectBarrierRequest { - request_id, - barrier: Some(barrier), - actor_ids_to_send, - actor_ids_to_collect, + kind: command_context.kind as i32, + passed_actors: vec![], }; + Some(( + node, + InjectBarrierRequest { + request_id: Self::new_request_id(), + barrier: Some(barrier), + actor_ids_to_send, + actor_ids_to_collect, + }, + )) + } + }), + |client, request| { + async move { tracing::debug!( target: "events::meta::barrier::inject_barrier", ?request, "inject barrier request" @@ -154,10 +171,10 @@ impl GlobalBarrierManagerContext { // This RPC returns only if this worker node has injected this barrier. client.inject_barrier(request).await } - .into() - } - }); - try_join_all(inject_futures).await.inspect_err(|e| { + }, + ) + .await + .inspect_err(|e| { // Record failure in event log. use risingwave_pb::meta::event_log; use thiserror_ext::AsReport; @@ -175,9 +192,8 @@ impl GlobalBarrierManagerContext { /// Send barrier-complete-rpc and wait for responses from all CNs async fn collect_barrier( - env: MetaSrvEnv, + &self, node_need_collect: HashMap, - client_pool_ref: StreamClientPoolRef, command_context: Arc, barrier_complete_tx: oneshot::Sender, ) { @@ -186,34 +202,34 @@ impl GlobalBarrierManagerContext { TracingContext::from_span(command_context.prev_epoch.span()).to_protobuf(); let info = command_context.info.clone(); - let client_pool = client_pool_ref.deref(); - let collect_futures = info.node_map.iter().filter_map(|(node_id, node)| { - if !*node_need_collect.get(node_id).unwrap() { - // No need to send or collect barrier for this node. - None - } else { - let request_id = Uuid::new_v4().to_string(); - let tracing_context = tracing_context.clone(); - async move { - let client = client_pool.get(node).await?; - let request = BarrierCompleteRequest { - request_id, - prev_epoch, - tracing_context, - }; - tracing::debug!( - target: "events::meta::barrier::barrier_complete", - ?request, "barrier complete" - ); + let result = self + .broadcast( + info.node_map.iter().filter_map(|(node_id, node)| { + if !*node_need_collect.get(node_id).unwrap() { + // No need to send or collect barrier for this node. + None + } else { + Some(node) + } + }), + |client| { + let tracing_context = tracing_context.clone(); + async move { + let request = BarrierCompleteRequest { + request_id: Self::new_request_id(), + prev_epoch, + tracing_context, + }; + tracing::debug!( + target: "events::meta::barrier::barrier_complete", + ?request, "barrier complete" + ); - // This RPC returns only if this worker node has collected this barrier. - client.barrier_complete(request).await - } - .into() - } - }); - - let result = try_join_all(collect_futures) + // This RPC returns only if this worker node has collected this barrier. + client.barrier_complete(request).await + } + }, + ) .await .inspect_err(|e| { // Record failure in event log. @@ -224,7 +240,8 @@ impl GlobalBarrierManagerContext { cur_epoch: command_context.curr_epoch.value().0, error: e.to_report_string(), }; - env.event_log_manager_ref() + self.env + .event_log_manager_ref() .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); }) .map_err(Into::into); @@ -233,3 +250,144 @@ impl GlobalBarrierManagerContext { .inspect_err(|_| tracing::warn!(prev_epoch, "failed to notify barrier completion")); } } + +#[derive(Clone)] +pub struct StreamRpcManager { + env: MetaSrvEnv, +} + +impl StreamRpcManager { + pub fn new(env: MetaSrvEnv) -> Self { + Self { env } + } + + async fn make_request> + 'static>( + &self, + request: impl Iterator, + f: impl Fn(StreamClient, REQ) -> Fut, + ) -> MetaResult> { + let pool = self.env.stream_client_pool(); + let f = &f; + Ok(try_join_all(request.map(|(node, input)| async move { + let client = pool.get(node).await?; + f(client, input).await + })) + .await?) + } + + async fn broadcast> + 'static>( + &self, + nodes: impl Iterator, + f: impl Fn(StreamClient) -> Fut, + ) -> MetaResult> { + self.make_request(nodes.map(|node| (node, ())), |client, ()| f(client)) + .await + } + + fn new_request_id() -> String { + Uuid::new_v4().to_string() + } + + pub async fn build_actors( + &self, + node_map: &HashMap, + node_actors: impl Iterator)>, + ) -> MetaResult<()> { + self.make_request( + node_actors.map(|(worker_id, actors)| (node_map.get(&worker_id).unwrap(), actors)), + |client, actors| async move { + let request_id = Self::new_request_id(); + tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors"); + client + .build_actors(BuildActorsRequest { + request_id, + actor_id: actors, + }) + .await + }, + ) + .await?; + Ok(()) + } + + /// Broadcast and update actor info in CN. + /// `node_actors_to_create` must be a subset of `broadcast_worker_ids`. + pub async fn broadcast_update_actor_info( + &self, + worker_nodes: &HashMap, + broadcast_worker_ids: impl Iterator, + actor_infos_to_broadcast: impl Iterator, + node_actors_to_create: impl Iterator)>, + ) -> MetaResult<()> { + let actor_infos = actor_infos_to_broadcast.collect_vec(); + let mut node_actors_to_create = node_actors_to_create.collect::>(); + self.make_request( + broadcast_worker_ids + .map(|worker_id| { + let node = worker_nodes.get(&worker_id).unwrap(); + let actors = node_actors_to_create.remove(&worker_id); + (node, actors) + }), + |client, actors| { + let info = actor_infos.clone(); + async move { + client + .broadcast_actor_info_table(BroadcastActorInfoTableRequest { info }) + .await?; + if let Some(actors) = actors { + let request_id = Self::new_request_id(); + let actor_ids = actors.iter().map(|actor| actor.actor_id).collect_vec(); + tracing::debug!(request_id = request_id.as_str(), actors = ?actor_ids, "update actors"); + client + .update_actors(UpdateActorsRequest { request_id, actors }) + .await?; + } + Ok(()) + } + }, + ) + .await?; + assert!( + node_actors_to_create.is_empty(), + "remaining uncreated actors: {:?}", + node_actors_to_create + ); + Ok(()) + } + + pub async fn drop_actors( + &self, + node_map: &HashMap, + node_actors: impl Iterator)>, + ) -> MetaResult<()> { + self.make_request( + node_actors + .map(|(worker_id, actor_ids)| (node_map.get(&worker_id).unwrap(), actor_ids)), + |client, actor_ids| async move { + client + .drop_actors(DropActorsRequest { + request_id: Self::new_request_id(), + actor_ids, + }) + .await + }, + ) + .await?; + Ok(()) + } + + pub async fn force_stop_actors( + &self, + nodes: impl Iterator, + ) -> MetaResult<()> { + self.broadcast(nodes, |client| async move { + client + .force_stop_actors(ForceStopActorsRequest { + request_id: Self::new_request_id(), + }) + .await + }) + .await?; + Ok(()) + } +} diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 3a21c812086f7..2bd711b0a9d31 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -39,17 +39,13 @@ use risingwave_pb::meta::table_fragments::{self, ActorStatus, Fragment}; use risingwave_pb::meta::FragmentParallelUnitMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode}; -use risingwave_pb::stream_service::{ - BroadcastActorInfoTableRequest, BuildActorsRequest, UpdateActorsRequest, -}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; -use uuid::Uuid; -use crate::barrier::{Command, Reschedule}; +use crate::barrier::{Command, Reschedule, StreamRpcManager}; use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::serving::{ @@ -375,6 +371,8 @@ pub struct ScaleController { pub source_manager: SourceManagerRef, + pub stream_rpc_manager: StreamRpcManager, + pub env: MetaSrvEnv, } @@ -382,9 +380,11 @@ impl ScaleController { pub fn new( metadata_manager: &MetadataManager, source_manager: SourceManagerRef, + stream_rpc_manager: StreamRpcManager, env: MetaSrvEnv, ) -> Self { Self { + stream_rpc_manager, metadata_manager: metadata_manager.clone(), source_manager, env, @@ -693,52 +693,35 @@ impl ScaleController { async fn create_actors_on_compute_node( &self, worker_nodes: &HashMap, - actor_infos_to_broadcast: BTreeMap, + actor_infos_to_broadcast: BTreeMap, node_actors_to_create: HashMap>, - broadcast_worker_ids: HashSet, + broadcast_worker_ids: HashSet, ) -> MetaResult<()> { - for worker_id in &broadcast_worker_ids { - let node = worker_nodes.get(worker_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - - let actor_infos_to_broadcast = actor_infos_to_broadcast.values().cloned().collect(); - - client - .to_owned() - .broadcast_actor_info_table(BroadcastActorInfoTableRequest { - info: actor_infos_to_broadcast, - }) - .await?; - } - - for (node_id, stream_actors) in &node_actors_to_create { - let node = worker_nodes.get(node_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - let request_id = Uuid::new_v4().to_string(); - let request = UpdateActorsRequest { - request_id, - actors: stream_actors.clone(), - }; - - client.to_owned().update_actors(request).await?; - } - - for (node_id, stream_actors) in node_actors_to_create { - let node = worker_nodes.get(&node_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - let request_id = Uuid::new_v4().to_string(); + self.stream_rpc_manager + .broadcast_update_actor_info( + worker_nodes, + broadcast_worker_ids.into_iter(), + actor_infos_to_broadcast.values().cloned(), + node_actors_to_create.clone().into_iter(), + ) + .await?; - client - .to_owned() - .build_actors(BuildActorsRequest { - request_id, - actor_id: stream_actors - .iter() - .map(|stream_actor| stream_actor.actor_id) - .collect(), - }) - .await?; - } + self.stream_rpc_manager + .build_actors( + worker_nodes, + node_actors_to_create + .iter() + .map(|(node_id, stream_actors)| { + ( + *node_id, + stream_actors + .iter() + .map(|stream_actor| stream_actor.actor_id) + .collect_vec(), + ) + }), + ) + .await?; Ok(()) } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 4d14f0caa2828..b098800168fd3 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -15,26 +15,20 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; -use futures::future::{join_all, try_join_all, BoxFuture}; -use futures::stream::FuturesUnordered; -use futures::TryStreamExt; +use futures::future::{join_all, BoxFuture}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::Dispatcher; -use risingwave_pb::stream_service::{ - BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, UpdateActorsRequest, -}; use thiserror_ext::AsReport; use tokio::sync::mpsc::Sender; use tokio::sync::{oneshot, Mutex, RwLock}; use tracing::Instrument; -use uuid::Uuid; use super::{Locations, RescheduleOptions, ScaleController, ScaleControllerRef, TableResizePolicy}; -use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan}; +use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager}; use crate::hummock::HummockManagerRef; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob}; use crate::model::{ActorId, TableFragments, TableParallelism}; @@ -202,6 +196,8 @@ pub struct GlobalStreamManager { pub reschedule_lock: RwLock<()>, pub(crate) scale_controller: ScaleControllerRef, + + pub stream_rpc_manager: StreamRpcManager, } impl GlobalStreamManager { @@ -211,10 +207,12 @@ impl GlobalStreamManager { barrier_scheduler: BarrierScheduler, source_manager: SourceManagerRef, hummock_manager: HummockManagerRef, + stream_rpc_manager: StreamRpcManager, ) -> MetaResult { let scale_controller = Arc::new(ScaleController::new( &metadata_manager, source_manager.clone(), + stream_rpc_manager.clone(), env.clone(), )); @@ -227,6 +225,7 @@ impl GlobalStreamManager { creating_job_info: Arc::new(CreatingStreamingJobInfo::default()), reschedule_lock: RwLock::new(()), scale_controller, + stream_rpc_manager, }) } @@ -306,28 +305,12 @@ impl GlobalStreamManager { let node_actors = table_fragments.worker_actor_ids(); let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; - let node_actors = node_actors - .into_iter() - .map(|(id, actor_ids)| { - ( - cluster_info.worker_nodes.get(&id).cloned().unwrap(), - actor_ids, - ) - }) - .collect_vec(); - let futures = node_actors.into_iter().map(|(node, actor_ids)| { - let request_id = Uuid::new_v4().to_string(); - async move { - let client = - self.env.stream_client_pool().get(&node).await?; - let request = DropActorsRequest { - request_id, - actor_ids, - }; - client.drop_actors(request).await - } - }); - try_join_all(futures).await?; + self.stream_rpc_manager + .drop_actors( + &cluster_info.worker_nodes, + node_actors.into_iter(), + ) + .await?; if let MetadataManager::V1(mgr) = &self.metadata_manager { mgr.fragment_manager @@ -381,8 +364,7 @@ impl GlobalStreamManager { // 2. all upstream actors. let actor_infos_to_broadcast = building_locations .actor_infos() - .chain(existing_locations.actor_infos()) - .collect_vec(); + .chain(existing_locations.actor_infos()); let building_worker_actors = building_locations.worker_actors(); @@ -390,57 +372,28 @@ impl GlobalStreamManager { // The first stage does 2 things: broadcast actor info, and send local actor ids to // different WorkerNodes. Such that each WorkerNode knows the overall actor // allocation, but not actually builds it. We initialize all channels in this stage. - building_worker_actors.iter().map(|(worker_id, actors)| { - let stream_actors = actors - .iter() - .map(|actor_id| actor_map[actor_id].clone()) - .collect::>(); - let worker_node = building_locations.worker_locations.get(worker_id).unwrap(); - let actor_infos_to_broadcast = &actor_infos_to_broadcast; - async move { - let client = self.env.stream_client_pool().get(worker_node).await?; - - client - .broadcast_actor_info_table(BroadcastActorInfoTableRequest { - info: actor_infos_to_broadcast.clone(), - }) - .await?; - - let request_id = Uuid::new_v4().to_string(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "update actors"); - client - .update_actors(UpdateActorsRequest { - request_id, - actors: stream_actors.clone(), - }) - .await?; - - Ok(()) as MetaResult<_> - } - }).collect::>().try_collect::<()>().await?; + self.stream_rpc_manager + .broadcast_update_actor_info( + &building_locations.worker_locations, + building_worker_actors.keys().cloned(), + actor_infos_to_broadcast, + building_worker_actors.iter().map(|(worker_id, actors)| { + let stream_actors = actors + .iter() + .map(|actor_id| actor_map[actor_id].clone()) + .collect::>(); + (*worker_id, stream_actors) + }), + ) + .await?; // In the second stage, each [`WorkerNode`] builds local actors and connect them with // channels. - building_worker_actors - .iter() - .map(|(worker_id, actors)| async move { - let worker_node = building_locations.worker_locations.get(worker_id).unwrap(); - - let client = self.env.stream_client_pool().get(worker_node).await?; - - let request_id = Uuid::new_v4().to_string(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors"); - client - .build_actors(BuildActorsRequest { - request_id, - actor_id: actors.clone(), - }) - .await?; - - Ok(()) as MetaResult<()> - }) - .collect::>() - .try_collect::<()>() + self.stream_rpc_manager + .build_actors( + &building_locations.worker_locations, + building_worker_actors.into_iter(), + ) .await?; Ok(()) @@ -825,7 +778,7 @@ mod tests { use tonic::{Request, Response, Status}; use super::*; - use crate::barrier::GlobalBarrierManager; + use crate::barrier::{GlobalBarrierManager, StreamRpcManager}; use crate::hummock::{CompactorManager, HummockManager}; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ @@ -852,7 +805,7 @@ mod tests { impl StreamService for FakeStreamService { async fn update_actors( &self, - request: Request, + request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); let mut guard = self.inner.actor_streams.lock().unwrap(); @@ -881,7 +834,7 @@ mod tests { async fn broadcast_actor_info_table( &self, - request: Request, + request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); let mut guard = self.inner.actor_infos.lock().unwrap(); @@ -1033,6 +986,8 @@ mod tests { let (sink_manager, _) = SinkCoordinatorManager::start_worker(); + let stream_rpc_manager = StreamRpcManager::new(env.clone()); + let barrier_manager = GlobalBarrierManager::new( scheduled_barriers, env.clone(), @@ -1041,6 +996,7 @@ mod tests { source_manager.clone(), sink_manager, meta_metrics.clone(), + stream_rpc_manager.clone(), ); let stream_manager = GlobalStreamManager::new( @@ -1049,6 +1005,7 @@ mod tests { barrier_scheduler.clone(), source_manager.clone(), hummock_manager, + stream_rpc_manager, )?; let (join_handle_2, shutdown_tx_2) = GlobalBarrierManager::start(barrier_manager);