diff --git a/src/meta/src/barrier/creating_job_control.rs b/src/meta/src/barrier/creating_job_control.rs index 54f2ac94705e7..92183629dc458 100644 --- a/src/meta/src/barrier/creating_job_control.rs +++ b/src/meta/src/barrier/creating_job_control.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::util::epoch::Epoch; -use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_service::BarrierCompleteResponse; @@ -211,7 +210,6 @@ impl CreatingStreamingJobControl { pub(super) fn may_inject_fake_barrier( &mut self, control_stream_manager: &mut ControlStreamManager, - node_map: &HashMap, is_checkpoint: bool, ) -> MetaResult<()> { if let CreatingStreamingJobStatus::ConsumingSnapshot { @@ -230,7 +228,6 @@ impl CreatingStreamingJobControl { let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); let node_to_collect = control_stream_manager.inject_barrier( table_id, - node_map, None, ( &TracedEpoch::new(self.backfill_epoch), @@ -248,7 +245,6 @@ impl CreatingStreamingJobControl { for command in pending_commands { let node_to_collect = control_stream_manager.inject_barrier( table_id, - node_map, command.to_mutation(), (&command.curr_epoch, &command.prev_epoch), &command.kind, @@ -273,7 +269,6 @@ impl CreatingStreamingJobControl { }; let node_to_collect = control_stream_manager.inject_barrier( table_id, - node_map, None, (&curr_epoch, &prev_epoch), &kind, @@ -313,7 +308,6 @@ impl CreatingStreamingJobControl { CreatingStreamingJobStatus::ConsumingLogStore { graph_info } => { let node_to_collect = control_stream_manager.inject_barrier( Some(table_id), - &command_ctx.node_map, if to_finish { // erase the mutation on upstream except the last Finish command command_ctx.to_mutation() diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index b1e6ebebfffd8..e241e7de14c51 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -968,11 +968,7 @@ impl GlobalBarrierManager { .creating_streaming_job_controls .values_mut() { - creating_job.may_inject_fake_barrier( - &mut self.control_stream_manager, - self.active_streaming_nodes.current(), - checkpoint, - )? + creating_job.may_inject_fake_barrier(&mut self.control_stream_manager, checkpoint)? } self.pending_non_checkpoint_barriers diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 89a1e96c3a36d..80c437b32a32c 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -256,7 +256,6 @@ impl ControlStreamManager { ) -> MetaResult> { self.inject_barrier( None, - &command_ctx.node_map, command_ctx.to_mutation(), (&command_ctx.curr_epoch, &command_ctx.prev_epoch), &command_ctx.kind, @@ -269,7 +268,6 @@ impl ControlStreamManager { pub(super) fn inject_barrier( &mut self, creating_table_id: Option, - node_map: &HashMap, mutation: Option, (curr_epoch, prev_epoch): (&TracedEpoch, &TracedEpoch), kind: &BarrierKind, @@ -288,17 +286,21 @@ impl ControlStreamManager { }) .unwrap_or(u32::MAX); - for worker_id in pre_applied_graph_info.worker_ids() { - if !node_map.contains_key(&worker_id) { - return Err(anyhow!("worker id {} not exist", worker_id).into()); + for worker_id in pre_applied_graph_info.worker_ids().chain( + applied_graph_info + .into_iter() + .flat_map(|info| info.worker_ids()), + ) { + if !self.nodes.contains_key(&worker_id) { + return Err(anyhow!("unconnected worker node {}", worker_id).into()); } } let mut node_need_collect = HashSet::new(); - node_map - .iter() - .map(|(node_id, worker_node)| { + self.nodes + .iter_mut() + .map(|(node_id, node)| { let actor_ids_to_send: Vec<_> = pre_applied_graph_info.actor_ids_to_send(*node_id).collect(); let actor_ids_to_collect: Vec<_> = pre_applied_graph_info @@ -318,15 +320,6 @@ impl ControlStreamManager { }; { - let Some(node) = self.nodes.get_mut(node_id) else { - if actor_ids_to_collect.is_empty() { - // Worker node get disconnected but has no actor to collect. Simply skip it. - return Ok(()); - } - return Err( - anyhow!("unconnected worker node: {:?}", worker_node.host).into() - ); - }; let mutation = mutation.clone(); let barrier = Barrier { epoch: Some(risingwave_pb::data::Epoch {