diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 419a61f113edf..54ffc3d5ff79c 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -17,31 +17,6 @@ message BuildActorInfo { map related_subscriptions = 2; } -// Describe the fragments which will be running on this node -message UpdateActorsRequest { - string request_id = 1; - repeated BuildActorInfo actors = 2; -} - -message UpdateActorsResponse { - common.Status status = 1; -} - -message BroadcastActorInfoTableRequest { - repeated common.ActorInfo info = 1; -} - -// Create channels and gRPC connections for a fragment -message BuildActorsRequest { - string request_id = 1; - repeated uint32 actor_id = 2; -} - -message BuildActorsResponse { - string request_id = 1; - common.Status status = 2; -} - message DropActorsRequest { string request_id = 1; repeated uint32 actor_ids = 2; @@ -68,6 +43,9 @@ message InjectBarrierRequest { // we specify the set of snapshot backfill actors that needs to be pre-synced with the upstream barrier mutation, // so that the input executor won't be blocked at waiting for the mutation of upstream barriers. repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7; + + repeated common.ActorInfo broadcast_info = 8; + repeated BuildActorInfo actors_to_build = 9; } message BarrierCompleteResponse { @@ -95,11 +73,6 @@ message BarrierCompleteResponse { uint64 epoch = 9; } -// Before starting streaming, the leader node broadcast the actor-host table to needed workers. -message BroadcastActorInfoTableResponse { - common.Status status = 1; -} - message WaitEpochCommitRequest { uint64 epoch = 1; } @@ -136,9 +109,6 @@ message StreamingControlStreamResponse { } service StreamService { - rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse); - rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse); - rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse); rpc DropActors(DropActorsRequest) returns (DropActorsResponse); rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse); diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 2222f09e45f3d..eb055a174b3ea 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -19,7 +19,6 @@ use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; use risingwave_stream::error::StreamError; use risingwave_stream::task::{LocalStreamManager, StreamEnvironment}; -use thiserror_ext::AsReport; use tokio::sync::mpsc::unbounded_channel; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{Request, Response, Status, Streaming}; @@ -41,62 +40,6 @@ impl StreamService for StreamServiceImpl { type StreamingControlStreamStream = impl Stream>; - #[cfg_attr(coverage, coverage(off))] - async fn update_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - let res = self.mgr.update_actors(req.actors).await; - match res { - Err(e) => { - error!(error = %e.as_report(), "failed to update stream actor"); - Err(e.into()) - } - Ok(()) => Ok(Response::new(UpdateActorsResponse { status: None })), - } - } - - #[cfg_attr(coverage, coverage(off))] - async fn build_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - - let actor_id = req.actor_id; - let res = self.mgr.build_actors(actor_id).await; - match res { - Err(e) => { - error!(error = %e.as_report(), "failed to build actors"); - Err(e.into()) - } - Ok(()) => Ok(Response::new(BuildActorsResponse { - request_id: req.request_id, - status: None, - })), - } - } - - #[cfg_attr(coverage, coverage(off))] - async fn broadcast_actor_info_table( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - - let res = self.mgr.update_actor_info(req.info).await; - match res { - Err(e) => { - error!(error = %e.as_report(), "failed to update actor info table actor"); - Err(e.into()) - } - Ok(()) => Ok(Response::new(BroadcastActorInfoTableResponse { - status: None, - })), - } - } - #[cfg_attr(coverage, coverage(off))] async fn drop_actors( &self, diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 04b8fed711dd1..0bea5f37940d6 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -844,6 +844,42 @@ impl CommandContext { mutation } + pub fn actors_to_create(&self) -> Option>> { + match &self.command { + Command::CreateStreamingJob { info, job_type } => { + let mut map = match job_type { + CreateStreamingJobType::Normal => HashMap::new(), + CreateStreamingJobType::SinkIntoTable(replace_table) => { + replace_table.new_table_fragments.actors_to_create() + } + CreateStreamingJobType::SnapshotBackfill(_) => { + // for snapshot backfill, the actors to create is measured separately + return None; + } + }; + for (worker_id, new_actors) in info.table_fragments.actors_to_create() { + map.entry(worker_id).or_default().extend(new_actors) + } + Some(map) + } + Command::RescheduleFragment { reschedules, .. } => { + let mut map: HashMap> = HashMap::new(); + for (actor, status) in reschedules + .values() + .flat_map(|reschedule| reschedule.newly_created_actors.iter()) + { + let worker_id = status.location.as_ref().unwrap().worker_node_id; + map.entry(worker_id).or_default().push(actor.clone()); + } + Some(map) + } + Command::ReplaceTable(replace_table) => { + Some(replace_table.new_table_fragments.actors_to_create()) + } + _ => None, + } + } + fn generate_update_mutation_for_replace_table( old_table_fragments: &TableFragments, merge_updates: &[MergeUpdate], diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 0d478fcf66142..c5a52437e2b7d 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -21,20 +21,23 @@ use std::mem::take; use std::sync::Arc; use std::time::Duration; +use itertools::Itertools; use prometheus::HistogramTimer; use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge}; use risingwave_common::util::epoch::Epoch; use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; -use risingwave_pb::stream_service::BarrierCompleteResponse; +use risingwave_pb::stream_service::{BarrierCompleteResponse, BuildActorInfo}; use tracing::{debug, info}; use crate::barrier::command::CommandContext; use crate::barrier::creating_job::barrier_control::{ CreatingStreamingJobBarrierControl, CreatingStreamingJobBarrierType, }; -use crate::barrier::creating_job::status::CreatingStreamingJobStatus; +use crate::barrier::creating_job::status::{ + CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus, +}; use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; @@ -89,6 +92,8 @@ impl CreatingStreamingJobControl { let table_id = info.table_fragments.table_id(); let table_id_str = format!("{}", table_id.table_id); + let actors_to_create = info.table_fragments.actors_to_create(); + Self { info, snapshot_backfill_info, @@ -103,6 +108,23 @@ impl CreatingStreamingJobControl { backfill_epoch, pending_non_checkpoint_barriers: vec![], snapshot_backfill_actors, + actors_to_create: Some( + actors_to_create + .into_iter() + .map(|(worker_id, actors)| { + ( + worker_id, + actors + .into_iter() + .map(|actor| BuildActorInfo { + actor: Some(actor), + related_subscriptions: Default::default(), + }) + .collect_vec(), + ) + }) + .collect(), + ), }, upstream_lag: metrics .snapshot_backfill_lag @@ -256,7 +278,13 @@ impl CreatingStreamingJobControl { .active_graph_info() .expect("must exist when having barriers to inject"); let table_id = self.info.table_fragments.table_id(); - for (curr_epoch, prev_epoch, kind) in barriers_to_inject { + for CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, + new_actors, + } in barriers_to_inject + { let node_to_collect = control_stream_manager.inject_barrier( Some(table_id), None, @@ -265,6 +293,7 @@ impl CreatingStreamingJobControl { graph_info, Some(graph_info), HashMap::new(), + new_actors, )?; self.barrier_control.enqueue_epoch( prev_epoch.value().0, @@ -329,6 +358,7 @@ impl CreatingStreamingJobControl { graph_info, Some(graph_info), HashMap::new(), + None, )?; self.barrier_control.enqueue_epoch( command_ctx.prev_epoch.value().0, @@ -375,6 +405,7 @@ impl CreatingStreamingJobControl { Some(graph_info) }, HashMap::new(), + None, )?; let prev_epoch = command_ctx.prev_epoch.value().0; self.barrier_control.enqueue_epoch( diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index f2d0022298345..0569752b1056b 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use risingwave_common::util::epoch::Epoch; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; +use risingwave_pb::stream_service::BuildActorInfo; use crate::barrier::command::CommandContext; use crate::barrier::info::InflightGraphInfo; @@ -39,6 +40,7 @@ pub(super) enum CreatingStreamingJobStatus { /// The `prev_epoch` of pending non checkpoint barriers pending_non_checkpoint_barriers: Vec, snapshot_backfill_actors: HashMap>, + actors_to_create: Option>>, }, ConsumingLogStore { graph_info: InflightGraphInfo, @@ -53,6 +55,13 @@ pub(super) enum CreatingStreamingJobStatus { }, } +pub(super) struct CreatingJobInjectBarrierInfo { + pub curr_epoch: TracedEpoch, + pub prev_epoch: TracedEpoch, + pub kind: BarrierKind, + pub new_actors: Option>>, +} + impl CreatingStreamingJobStatus { pub(super) fn active_graph_info(&self) -> Option<&InflightGraphInfo> { match self { @@ -84,14 +93,10 @@ impl CreatingStreamingJobStatus { /// return /// - Some(vec[(`curr_epoch`, `prev_epoch`, `barrier_kind`)]) of barriers to newly inject /// - Some(`graph_info`) when the status should transit to `ConsumingLogStore` - #[expect(clippy::type_complexity)] pub(super) fn may_inject_fake_barrier( &mut self, is_checkpoint: bool, - ) -> Option<( - Vec<(TracedEpoch, TracedEpoch, BarrierKind)>, - Option, - )> { + ) -> Option<(Vec, Option)> { if let CreatingStreamingJobStatus::ConsumingSnapshot { prev_epoch_fake_physical_time, pending_commands, @@ -99,27 +104,32 @@ impl CreatingStreamingJobStatus { graph_info, pending_non_checkpoint_barriers, ref backfill_epoch, + actors_to_create, .. } = self { if create_mview_tracker.has_pending_finished_jobs() { + assert!(actors_to_create.is_none()); pending_non_checkpoint_barriers.push(*backfill_epoch); let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); - let barriers_to_inject = [( - TracedEpoch::new(Epoch(*backfill_epoch)), - TracedEpoch::new(prev_epoch), - BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), - )] - .into_iter() - .chain(pending_commands.drain(..).map(|command_ctx| { - ( - command_ctx.curr_epoch.clone(), - command_ctx.prev_epoch.clone(), - command_ctx.kind.clone(), - ) - })) - .collect(); + let barriers_to_inject = + [CreatingJobInjectBarrierInfo { + curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)), + prev_epoch: TracedEpoch::new(prev_epoch), + kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), + new_actors: None, + }] + .into_iter() + .chain(pending_commands.drain(..).map(|command_ctx| { + CreatingJobInjectBarrierInfo { + curr_epoch: command_ctx.curr_epoch.clone(), + prev_epoch: command_ctx.prev_epoch.clone(), + kind: command_ctx.kind.clone(), + new_actors: None, + } + })) + .collect(); let graph_info = take(graph_info); Some((barriers_to_inject, Some(graph_info))) @@ -135,7 +145,15 @@ impl CreatingStreamingJobStatus { } else { BarrierKind::Barrier }; - Some((vec![(curr_epoch, prev_epoch, kind)], None)) + Some(( + vec![CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, + new_actors: actors_to_create.take(), + }], + None, + )) } } else { None diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 41884b28a64c1..25fe1fd2ceff7 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BTreeSet, HashMap, HashSet}; -use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; @@ -23,25 +22,24 @@ use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::WorkerSlotId; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model_v2::StreamingParallelism; -use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::{PausedReason, Recovery}; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::AddMutation; +use risingwave_pb::stream_service::BuildActorInfo; use thiserror_ext::AsReport; use tokio::time::Instant; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tracing::{debug, info, warn, Instrument}; use super::{CheckpointControl, TracedEpoch}; -use crate::barrier::command::CommandContext; use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo}; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; -use crate::barrier::{BarrierKind, Command, GlobalBarrierManager, GlobalBarrierManagerContext}; +use crate::barrier::{BarrierKind, GlobalBarrierManager, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId}; use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; @@ -369,52 +367,38 @@ impl GlobalBarrierManager { }; // update and build all actors. - self.context - .update_actors(&info, &subscription_info, &active_streaming_nodes) + let node_actors = self + .context + .load_all_actors(&info, &subscription_info, &active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "update actors failed"); })?; - self.context - .build_actors(&info, &active_streaming_nodes) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "build_actors failed"); - })?; // get split assignments for all actors let source_split_assignments = self.context.source_manager.list_assignments().await; - let command = Command::Plain(Some(Mutation::Add(AddMutation { + let mutation = Mutation::Add(AddMutation { // Actors built during recovery is not treated as newly added actors. actor_dispatchers: Default::default(), added_actors: Default::default(), actor_splits: build_actor_connector_splits(&source_split_assignments), pause: paused_reason.is_some(), subscriptions_to_add: Default::default(), - }))); + }); // Use a different `curr_epoch` for each recovery attempt. let new_epoch = prev_epoch.next(); - // Inject the `Initial` barrier to initialize all executors. - let command_ctx = Arc::new(CommandContext::new( - active_streaming_nodes.current().clone(), - subscription_info.clone(), - prev_epoch.clone(), - new_epoch.clone(), - paused_reason, - command, - BarrierKind::Initial, - self.context.clone(), - tracing::Span::current(), // recovery span - )); - - let mut node_to_collect = control_stream_manager.inject_command_ctx_barrier( - &command_ctx, + let mut node_to_collect = control_stream_manager.inject_barrier( + None, + Some(mutation), + (&new_epoch, &prev_epoch), + &BarrierKind::Initial, &info, Some(&info), HashMap::new(), + Some(node_actors), )?; debug!(?node_to_collect, "inject initial barrier"); while !node_to_collect.is_empty() { @@ -422,18 +406,13 @@ impl GlobalBarrierManager { .next_complete_barrier_response() .await; let resp = result?; - assert_eq!(resp.epoch, command_ctx.prev_epoch.value().0); + assert_eq!(resp.epoch, prev_epoch.value().0); assert!(node_to_collect.remove(&worker_id)); } debug!("collected initial barrier"); ( - BarrierManagerState::new( - new_epoch, - info, - subscription_info, - command_ctx.next_paused_reason(), - ), + BarrierManagerState::new(new_epoch, info, subscription_info, paused_reason), active_streaming_nodes, control_stream_manager, tracker, @@ -1114,36 +1093,18 @@ impl GlobalBarrierManagerContext { } /// Update all actors in compute nodes. - async fn update_actors( + async fn load_all_actors( &self, info: &InflightGraphInfo, subscription_info: &InflightSubscriptionInfo, active_nodes: &ActiveStreamingWorkerNodes, - ) -> MetaResult<()> { + ) -> MetaResult>> { if info.actor_map.is_empty() { tracing::debug!("no actor to update, skipping."); - return Ok(()); + return Ok(HashMap::new()); } - let actor_infos: Vec<_> = info - .actor_map - .iter() - .map(|(node_id, actors)| { - let host = active_nodes - .current() - .get(node_id) - .ok_or_else(|| anyhow::anyhow!("worker evicted, wait for online."))? - .host - .clone(); - Ok(actors.iter().map(move |&actor_id| ActorInfo { - actor_id, - host: host.clone(), - })) as MetaResult<_> - }) - .flatten_ok() - .try_collect()?; - - let mut all_node_actors = self + let all_node_actors = self .metadata_manager .all_node_actors(false, &subscription_info.mv_depended_subscriptions) .await?; @@ -1160,45 +1121,7 @@ impl GlobalBarrierManagerContext { return Err(anyhow!("actors dropped during update").into()); } - self.stream_rpc_manager - .broadcast_update_actor_info( - active_nodes.current(), - info.actor_map.keys().cloned(), - actor_infos.into_iter(), - info.actor_map.keys().map(|node_id| { - ( - *node_id, - all_node_actors.remove(node_id).unwrap_or_default(), - ) - }), - ) - .await?; - - Ok(()) - } - - /// Build all actors in compute nodes. - async fn build_actors( - &self, - info: &InflightGraphInfo, - active_nodes: &ActiveStreamingWorkerNodes, - ) -> MetaResult<()> { - if info.actor_map.is_empty() { - tracing::debug!("no actor to build, skipping."); - return Ok(()); - } - - self.stream_rpc_manager - .build_actors( - active_nodes.current(), - info.actor_map.iter().map(|(node_id, actors)| { - let actors = actors.iter().cloned().collect(); - (*node_id, actors) - }), - ) - .await?; - - Ok(()) + Ok(all_node_actors) } } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 1b4ab6207db9a..7ad468b04aa4c 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -30,12 +30,12 @@ use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::{Barrier, BarrierMutation}; +use risingwave_pb::stream_service::build_actor_info::SubscriptionIds; use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest; use risingwave_pb::stream_service::{ streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse, - BroadcastActorInfoTableRequest, BuildActorInfo, BuildActorsRequest, DropActorsRequest, - InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse, - UpdateActorsRequest, + BuildActorInfo, DropActorsRequest, InjectBarrierRequest, StreamingControlStreamRequest, + StreamingControlStreamResponse, }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClient; @@ -263,6 +263,39 @@ impl ControlStreamManager { pre_applied_graph_info, applied_graph_info, actor_ids_to_pre_sync_mutation, + command_ctx.actors_to_create().map(|actors_to_create| { + actors_to_create + .into_iter() + .map(|(worker_id, actors)| { + ( + worker_id, + actors + .into_iter() + .map(|actor| BuildActorInfo { + actor: Some(actor), + // TODO: consider subscriber of backfilling mv + related_subscriptions: command_ctx + .subscription_info + .mv_depended_subscriptions + .iter() + .map(|(table_id, subscriptions)| { + ( + table_id.table_id, + SubscriptionIds { + subscription_ids: subscriptions + .keys() + .cloned() + .collect(), + }, + ) + }) + .collect(), + }) + .collect_vec(), + ) + }) + .collect() + }), ) } @@ -275,6 +308,7 @@ impl ControlStreamManager { pre_applied_graph_info: &InflightGraphInfo, applied_graph_info: Option<&InflightGraphInfo>, actor_ids_to_pre_sync_mutation: HashMap>, + mut new_actors: Option>>, ) -> MetaResult> { fail_point!("inject_barrier_err", |_| risingwave_common::bail!( "inject_barrier_err" @@ -287,17 +321,41 @@ impl ControlStreamManager { }) .unwrap_or(u32::MAX); - for worker_id in pre_applied_graph_info.worker_ids().chain( - applied_graph_info - .into_iter() - .flat_map(|info| info.worker_ids()), - ) { + for worker_id in pre_applied_graph_info + .worker_ids() + .chain( + applied_graph_info + .into_iter() + .flat_map(|info| info.worker_ids()), + ) + .chain( + new_actors + .iter() + .flat_map(|new_actors| new_actors.keys().cloned()), + ) + { if !self.nodes.contains_key(&worker_id) { return Err(anyhow!("unconnected worker node {}", worker_id).into()); } } let mut node_need_collect = HashSet::new(); + let new_actors_location_to_broadcast = new_actors + .iter() + .flatten() + .flat_map(|(worker_id, actor_infos)| { + actor_infos.iter().map(|actor_info| ActorInfo { + actor_id: actor_info.actor.as_ref().unwrap().actor_id, + host: self + .nodes + .get(worker_id) + .expect("have checked exist previously") + .worker + .host + .clone(), + }) + }) + .collect_vec(); self.nodes .iter_mut() @@ -344,6 +402,14 @@ impl ControlStreamManager { .flatten() .cloned() .collect(), + broadcast_info: new_actors_location_to_broadcast.clone(), + actors_to_build: new_actors + .as_mut() + .map(|new_actors| new_actors.remove(node_id)) + .into_iter() + .flatten() + .flatten() + .collect(), }, ), ), @@ -472,73 +538,6 @@ impl StreamRpcManager { Uuid::new_v4().to_string() } - pub async fn build_actors( - &self, - node_map: &HashMap, - node_actors: impl Iterator)>, - ) -> MetaResult<()> { - self.make_request( - node_actors.map(|(worker_id, actors)| (node_map.get(&worker_id).unwrap(), actors)), - |client, actors| async move { - let request_id = Self::new_request_id(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors"); - client - .build_actors(BuildActorsRequest { - request_id, - actor_id: actors, - }) - .await - }, - ) - .await?; - Ok(()) - } - - /// Broadcast and update actor info in CN. - /// `node_actors_to_create` must be a subset of `broadcast_worker_ids`. - pub async fn broadcast_update_actor_info( - &self, - worker_nodes: &HashMap, - broadcast_worker_ids: impl Iterator, - actor_infos_to_broadcast: impl Iterator, - node_actors_to_create: impl Iterator)>, - ) -> MetaResult<()> { - let actor_infos = actor_infos_to_broadcast.collect_vec(); - let mut node_actors_to_create = node_actors_to_create.collect::>(); - self.make_request( - broadcast_worker_ids - .map(|worker_id| { - let node = worker_nodes.get(&worker_id).unwrap(); - let actors = node_actors_to_create.remove(&worker_id); - (node, actors) - }), - |client, actors| { - let info = actor_infos.clone(); - async move { - client - .broadcast_actor_info_table(BroadcastActorInfoTableRequest { info }) - .await?; - if let Some(actors) = actors { - let request_id = Self::new_request_id(); - let actor_ids = actors.iter().map(|actor| actor.actor.as_ref().unwrap().actor_id).collect_vec(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actor_ids, "update actors"); - client - .update_actors(UpdateActorsRequest { request_id, actors }) - .await?; - } - Ok(()) - } - }, - ) - .await?; - assert!( - node_actors_to_create.is_empty(), - "remaining uncreated actors: {:?}", - node_actors_to_create - ); - Ok(()) - } - pub async fn drop_actors( &self, node_map: &HashMap, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 6fb30ee6aa9fc..bec6b95cfb0f9 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -527,14 +527,19 @@ impl TableFragments { actors } - /// Returns actor map: `actor_id` => `StreamActor`. - pub fn actor_map(&self) -> HashMap { - let mut actor_map = HashMap::default(); - self.fragments.values().for_each(|fragment| { - fragment.actors.iter().for_each(|actor| { - actor_map.insert(actor.actor_id, actor.clone()); + pub fn actors_to_create(&self) -> HashMap> { + let mut actor_map: HashMap<_, Vec<_>> = HashMap::new(); + self.fragments + .values() + .flat_map(|fragment| fragment.actors.iter()) + .for_each(|actor| { + let worker_id = self + .actor_status + .get(&actor.actor_id) + .expect("should exist") + .worker_id(); + actor_map.entry(worker_id).or_default().push(actor.clone()); }); - }); actor_map } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 42ed98b372c7d..6ecb061e509b4 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -31,7 +31,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_meta_model_v2::{actor, fragment, ObjectId, StreamingParallelism}; -use risingwave_pb::common::{ActorInfo, Buffer, PbActorLocation, WorkerNode, WorkerType}; +use risingwave_pb::common::{Buffer, PbActorLocation, WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::fragment::{ @@ -43,8 +43,6 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, PbDispatcher, PbStreamActor, StreamNode, }; -use risingwave_pb::stream_service::build_actor_info::SubscriptionIds; -use risingwave_pb::stream_service::BuildActorInfo; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -187,8 +185,6 @@ pub struct RescheduleContext { actor_status: BTreeMap, /// Meta information of all `Fragment`, used to find the `Fragment`'s `Actor` fragment_map: HashMap, - /// Indexes for all `Worker`s - worker_nodes: HashMap, /// Index of all `Actor` upstreams, specific to `Dispatcher` upstream_dispatchers: HashMap>, /// Fragments with stream source @@ -891,7 +887,6 @@ impl ScaleController { actor_map, actor_status, fragment_map, - worker_nodes, upstream_dispatchers, stream_source_fragment_ids, no_shuffle_target_fragment_ids, @@ -900,42 +895,6 @@ impl ScaleController { }) } - pub(crate) async fn create_actors_on_compute_node( - &self, - worker_nodes: &HashMap, - actor_infos_to_broadcast: BTreeMap, - node_actors_to_create: HashMap>, - broadcast_worker_ids: HashSet, - ) -> MetaResult<()> { - self.stream_rpc_manager - .broadcast_update_actor_info( - worker_nodes, - broadcast_worker_ids.into_iter(), - actor_infos_to_broadcast.values().cloned(), - node_actors_to_create.clone().into_iter(), - ) - .await?; - - self.stream_rpc_manager - .build_actors( - worker_nodes, - node_actors_to_create - .iter() - .map(|(node_id, stream_actors)| { - ( - *node_id, - stream_actors - .iter() - .map(|stream_actor| stream_actor.actor.as_ref().unwrap().actor_id) - .collect_vec(), - ) - }), - ) - .await?; - - Ok(()) - } - /// From the high-level [`WorkerReschedule`] to the low-level reschedule plan [`Reschedule`]. /// /// Returns `(reschedule_fragment, applied_reschedules)` @@ -1296,116 +1255,6 @@ impl ScaleController { } } - if !options.skip_create_new_actors { - // After modification, for newly created actors, both upstream and downstream actor ids - // have been modified - let mut actor_infos_to_broadcast = BTreeMap::new(); - let mut node_actors_to_create: HashMap> = HashMap::new(); - let mut broadcast_worker_ids = HashSet::new(); - - let subscriptions: HashMap<_, SubscriptionIds> = self - .metadata_manager - .get_mv_depended_subscriptions() - .await? - .iter() - .map(|(table_id, subscriptions)| { - ( - table_id.table_id, - SubscriptionIds { - subscription_ids: subscriptions.keys().cloned().collect(), - }, - ) - }) - .collect(); - - for actors_to_create in fragment_actors_to_create.values() { - for (new_actor_id, worker_id) in actors_to_create { - let new_actor = new_created_actors.get(new_actor_id).unwrap(); - for upstream_actor_id in &new_actor.upstream_actor_id { - if new_created_actors.contains_key(upstream_actor_id) { - continue; - } - - let upstream_worker_id = ctx.actor_id_to_worker_id(upstream_actor_id)?; - - let upstream_worker = - ctx.worker_nodes.get(&upstream_worker_id).with_context(|| { - format!("upstream worker {} not found", upstream_worker_id) - })?; - - // Force broadcast upstream actor info, because the actor information of the new - // node may not have been synchronized yet - actor_infos_to_broadcast.insert( - *upstream_actor_id, - ActorInfo { - actor_id: *upstream_actor_id, - host: upstream_worker.host.clone(), - }, - ); - - broadcast_worker_ids.insert(upstream_worker_id); - } - - for dispatcher in &new_actor.dispatcher { - for downstream_actor_id in &dispatcher.downstream_actor_id { - if new_created_actors.contains_key(downstream_actor_id) { - continue; - } - let downstream_worker_id = - ctx.actor_id_to_worker_id(downstream_actor_id)?; - - let downstream_worker = ctx - .worker_nodes - .get(&downstream_worker_id) - .with_context(|| { - format!("downstream worker {} not found", downstream_worker_id) - })?; - - actor_infos_to_broadcast.insert( - *downstream_actor_id, - ActorInfo { - actor_id: *downstream_actor_id, - host: downstream_worker.host.clone(), - }, - ); - - broadcast_worker_ids.insert(downstream_worker_id); - } - } - - let worker = ctx.worker_nodes.get(worker_id).unwrap(); - - node_actors_to_create - .entry(worker.id) - .or_default() - .push(BuildActorInfo { - actor: Some(new_actor.clone()), - // TODO: may include only the subscriptions related to the table fragment - // of the actor. - related_subscriptions: subscriptions.clone(), - }); - - broadcast_worker_ids.insert(worker.id); - - actor_infos_to_broadcast.insert( - *new_actor_id, - ActorInfo { - actor_id: *new_actor_id, - host: worker.host.clone(), - }, - ); - } - } - - self.create_actors_on_compute_node( - &ctx.worker_nodes, - actor_infos_to_broadcast, - node_actors_to_create, - broadcast_worker_ids, - ) - .await?; - } - // For stream source fragments, we need to reallocate the splits. // Because we are in the Pause state, so it's no problem to reallocate let mut fragment_stream_source_actor_splits = HashMap::new(); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 220f69bc0a58d..4ee399282a142 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -35,7 +35,7 @@ use crate::barrier::{ }; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob}; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; -use crate::stream::{to_build_actor_info, SourceManagerRef}; +use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; pub type GlobalStreamManagerRef = Arc; @@ -291,12 +291,6 @@ impl GlobalStreamManager { // try to cancel buffered creating command. if self.barrier_scheduler.try_cancel_scheduled_create(table_id) { tracing::debug!("cancelling streaming job {table_id} in buffer queue."); - let node_actors = table_fragments.worker_actor_ids(); - let cluster_info = - self.metadata_manager.get_streaming_cluster_info().await?; - self.stream_rpc_manager - .drop_actors(&cluster_info.worker_nodes, node_actors.into_iter()) - .await?; if let MetadataManager::V1(mgr) = &self.metadata_manager { mgr.fragment_manager @@ -334,66 +328,6 @@ impl GlobalStreamManager { bail!("receiver failed to get notification version for finished stream job") } - async fn build_actors( - &self, - table_fragments: &TableFragments, - building_locations: &Locations, - existing_locations: &Locations, - subscription_depend_table_id: TableId, - ) -> MetaResult<()> { - let actor_map = table_fragments.actor_map(); - - // Actors on each stream node will need to know where their upstream lies. `actor_info` - // includes such information. It contains: - // 1. actors in the current create-streaming-job request. - // 2. all upstream actors. - let actor_infos_to_broadcast = building_locations - .actor_infos() - .chain(existing_locations.actor_infos()); - - let building_worker_actors = building_locations.worker_actors(); - let subscriptions = self - .metadata_manager - .get_mv_depended_subscriptions() - .await?; - - // We send RPC request in two stages. - // The first stage does 2 things: broadcast actor info, and send local actor ids to - // different WorkerNodes. Such that each WorkerNode knows the overall actor - // allocation, but not actually builds it. We initialize all channels in this stage. - self.stream_rpc_manager - .broadcast_update_actor_info( - &building_locations.worker_locations, - building_worker_actors.keys().cloned(), - actor_infos_to_broadcast, - building_worker_actors.iter().map(|(worker_id, actors)| { - let stream_actors = actors - .iter() - .map(|actor_id| { - to_build_actor_info( - actor_map[actor_id].clone(), - &subscriptions, - subscription_depend_table_id, - ) - }) - .collect::>(); - (*worker_id, stream_actors) - }), - ) - .await?; - - // In the second stage, each [`WorkerNode`] builds local actors and connect them with - // channels. - self.stream_rpc_manager - .build_actors( - &building_locations.worker_locations, - building_worker_actors.into_iter(), - ) - .await?; - - Ok(()) - } - async fn create_streaming_job_impl( &self, table_fragments: TableFragments, @@ -401,8 +335,6 @@ impl GlobalStreamManager { streaming_job, dispatchers, upstream_root_actors, - building_locations, - existing_locations, definition, create_type, ddl_type, @@ -415,27 +347,12 @@ impl GlobalStreamManager { let mut replace_table_command = None; let mut replace_table_id = None; - self.build_actors( - &table_fragments, - &building_locations, - &existing_locations, - table_fragments.table_id(), - ) - .await?; tracing::debug!( table_id = %table_fragments.table_id(), "built actors finished" ); if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { - self.build_actors( - &table_fragments, - &context.building_locations, - &context.existing_locations, - context.old_table_fragments.table_id(), - ) - .await?; - match &self.metadata_manager { MetadataManager::V1(mgr) => { // Add table fragments to meta store with state: `State::Initial`. @@ -552,20 +469,11 @@ impl GlobalStreamManager { old_table_fragments, merge_updates, dispatchers, - building_locations, - existing_locations, dummy_id, streaming_job, + .. }: ReplaceTableContext, ) -> MetaResult<()> { - self.build_actors( - &table_fragments, - &building_locations, - &existing_locations, - old_table_fragments.table_id(), - ) - .await?; - let dummy_table_id = table_fragments.table_id(); let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?; @@ -898,51 +806,6 @@ mod tests { type StreamingControlStreamStream = impl Stream>; - async fn update_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - let mut guard = self.inner.actor_streams.lock().unwrap(); - for actor in req.get_actors() { - let actor = actor.actor.as_ref().unwrap(); - guard.insert(actor.get_actor_id(), actor.clone()); - } - - Ok(Response::new(UpdateActorsResponse { status: None })) - } - - async fn build_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - let mut guard = self.inner.actor_ids.lock().unwrap(); - for id in req.get_actor_id() { - guard.insert(*id); - } - - Ok(Response::new(BuildActorsResponse { - request_id: "".to_string(), - status: None, - })) - } - - async fn broadcast_actor_info_table( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - let mut guard = self.inner.actor_infos.lock().unwrap(); - for info in req.get_info() { - guard.insert(info.get_actor_id(), info.get_host()?.clone()); - } - - Ok(Response::new(BroadcastActorInfoTableResponse { - status: None, - })) - } - async fn drop_actors( &self, _request: Request, @@ -972,6 +835,24 @@ mod tests { })); } streaming_control_stream_request::Request::InjectBarrier(req) => { + { + let mut guard = inner.actor_infos.lock().unwrap(); + for info in req.broadcast_info { + guard.insert( + info.get_actor_id(), + info.get_host().unwrap().clone(), + ); + } + } + { + let mut guard = inner.actor_streams.lock().unwrap(); + let mut actor_ids = inner.actor_ids.lock().unwrap(); + for actor in req.actors_to_build { + let actor = actor.actor.as_ref().unwrap(); + assert!(actor_ids.insert(actor.actor_id)); + guard.insert(actor.get_actor_id(), actor.clone()); + } + } let _ = tx.send(Ok(StreamingControlStreamResponse { response: Some( streaming_control_stream_response::Response::CompleteBarrier( diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index c3f876549a36d..920b6f0777f37 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -70,10 +70,7 @@ pub type StreamClientPoolRef = Arc; macro_rules! for_all_stream_rpc { ($macro:ident) => { $macro! { - { 0, update_actors, UpdateActorsRequest, UpdateActorsResponse } - ,{ 0, build_actors, BuildActorsRequest, BuildActorsResponse } - ,{ 0, broadcast_actor_info_table, BroadcastActorInfoTableRequest, BroadcastActorInfoTableResponse } - ,{ 0, drop_actors, DropActorsRequest, DropActorsResponse } + { 0, drop_actors, DropActorsRequest, DropActorsResponse } ,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } } }; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index b6cf8c525a5ed..88e86a5998758 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -52,7 +52,6 @@ use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::{HummockVersionId, LocalSstableInfo, SyncResult}; -use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::streaming_control_stream_request::{InitRequest, Request}; use risingwave_pb::stream_service::streaming_control_stream_response::{ @@ -215,18 +214,6 @@ pub(super) enum LocalActorOperation { actors: Vec, result_sender: oneshot::Sender<()>, }, - UpdateActors { - actors: Vec, - result_sender: oneshot::Sender>, - }, - BuildActors { - actors: Vec, - result_sender: oneshot::Sender>, - }, - UpdateActorInfo { - new_actor_infos: Vec, - result_sender: oneshot::Sender>, - }, TakeReceiver { ids: UpDownActorIds, result_sender: oneshot::Sender>, @@ -431,6 +418,14 @@ impl LocalBarrierWorker { match request.request.expect("should not be empty") { Request::InjectBarrier(req) => { let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?; + self.update_actor_info(req.broadcast_info)?; + let actors = req + .actors_to_build + .iter() + .map(|actor| actor.actor.as_ref().unwrap().actor_id) + .collect_vec(); + self.update_actors(req.actors_to_build)?; + self.start_create_actors(&actors)?; self.send_barrier( &barrier, req.actor_ids_to_collect.into_iter().collect(), @@ -506,23 +501,6 @@ impl LocalBarrierWorker { self.drop_actors(&actors); let _ = result_sender.send(()); } - LocalActorOperation::UpdateActors { - actors, - result_sender, - } => { - let result = self.update_actors(actors); - let _ = result_sender.send(result); - } - LocalActorOperation::BuildActors { - actors, - result_sender, - } => self.start_create_actors(&actors, result_sender), - LocalActorOperation::UpdateActorInfo { - new_actor_infos, - result_sender, - } => { - let _ = result_sender.send(self.update_actor_info(new_actor_infos)); - } LocalActorOperation::TakeReceiver { ids, result_sender } => { let _ = result_sender.send(self.current_shared_context.take_receiver(ids)); } @@ -1092,6 +1070,8 @@ pub(crate) mod barrier_test_utils { table_ids_to_sync: vec![], partial_graph_id: u32::MAX, actor_ids_to_pre_sync_barrier_mutation: vec![], + broadcast_info: vec![], + actors_to_build: vec![], }, )), })) diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index a083cc3974da9..60b7341371497 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -42,7 +42,6 @@ use risingwave_storage::monitor::HummockTraceFutureExt; use risingwave_storage::{dispatch_state_store, StateStore}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; -use tokio::sync::oneshot; use tokio::task::JoinHandle; use tonic::Status; @@ -225,33 +224,6 @@ impl LocalStreamManager { .await } - pub async fn update_actors(&self, actors: Vec) -> StreamResult<()> { - self.actor_op_tx - .send_and_await(|result_sender| LocalActorOperation::UpdateActors { - actors, - result_sender, - }) - .await? - } - - pub async fn build_actors(&self, actors: Vec) -> StreamResult<()> { - self.actor_op_tx - .send_and_await(|result_sender| LocalActorOperation::BuildActors { - actors, - result_sender, - }) - .await? - } - - pub async fn update_actor_info(&self, new_actor_infos: Vec) -> StreamResult<()> { - self.actor_op_tx - .send_and_await(|result_sender| LocalActorOperation::UpdateActorInfo { - new_actor_infos, - result_sender, - }) - .await? - } - pub async fn take_receiver(&self, ids: UpDownActorIds) -> StreamResult { self.actor_op_tx .send_and_await(|result_sender| LocalActorOperation::TakeReceiver { @@ -326,31 +298,18 @@ impl LocalBarrierWorker { /// This function could only be called once during the lifecycle of `LocalStreamManager` for /// now. - pub(super) fn start_create_actors( - &mut self, - actors: &[ActorId], - result_sender: oneshot::Sender>, - ) { - let actors: Vec<_> = { - let actor_result = actors - .iter() - .map(|actor_id| { - self.actor_manager_state - .actors - .remove(actor_id) - .ok_or_else(|| anyhow!("No such actor with actor id:{}", actor_id)) - }) - .try_collect(); - match actor_result { - Ok(actors) => actors, - Err(e) => { - let _ = result_sender.send(Err(e.into())); - return; - } - } - }; + pub(super) fn start_create_actors(&mut self, actors: &[ActorId]) -> StreamResult<()> { + let actors: Vec<_> = actors + .iter() + .map(|actor_id| { + self.actor_manager_state + .actors + .remove(actor_id) + .ok_or_else(|| anyhow!("No such actor with actor id:{}", actor_id)) + }) + .try_collect()?; self.spawn_actors(actors); - let _ = result_sender.send(Ok(())); + Ok(()) } }