From 9c6cec19592a16367ab76f5a26c57b730645bd69 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 17 Sep 2024 14:48:46 +0800 Subject: [PATCH] fix: revert split assignment check (#18554) --- src/meta/src/barrier/command.rs | 22 ++---- src/meta/src/stream/source_manager.rs | 98 +-------------------------- 2 files changed, 7 insertions(+), 113 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 927374da31586..c18ad5d0f2b3b 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -46,9 +46,7 @@ use super::trace::TracedEpoch; use crate::barrier::{GlobalBarrierManagerContext, InflightSubscriptionInfo}; use crate::manager::{DdlType, InflightFragmentInfo, MetadataManager, StreamingJob, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; -use crate::stream::{ - build_actor_connector_splits, validate_assignment, SplitAssignment, ThrottleConfig, -}; +use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; use crate::MetaResult; /// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors @@ -525,6 +523,7 @@ impl Command { Command::SourceSplitAssignment(change) => { let mut diff = HashMap::new(); + for actor_splits in change.values() { diff.extend(actor_splits.clone()); } @@ -573,16 +572,7 @@ impl Command { }) .collect(); let added_actors = table_fragments.actor_ids(); - - let mut checked_split_assignment = split_assignment.clone(); - checked_split_assignment - .iter_mut() - .for_each(|(_, assignment)| { - // No related actor running before, we don't need to check the mutation - // should be wrapped with pause. - validate_assignment(assignment); - }); - let actor_splits = checked_split_assignment + let actor_splits = split_assignment .values() .flat_map(build_actor_connector_splits) .collect(); @@ -788,11 +778,7 @@ impl Command { let mut actor_splits = HashMap::new(); for reschedule in reschedules.values() { - let mut checked_assignment = reschedule.actor_splits.clone(); - // Update mutation always wrapped by Pause and Resume mutation. no further action needed. - _ = validate_assignment(&mut checked_assignment); - - for (actor_id, splits) in &checked_assignment { + for (actor_id, splits) in &reschedule.actor_splits { actor_splits.insert( *actor_id as ActorId, ConnectorSplits { diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 68d74902c1c77..8fcceac82c0cf 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -588,37 +588,6 @@ where ) } -pub fn validate_assignment(assignment: &mut HashMap>) -> bool { - let mut dup_assignment_found_flag = false; - - // check if one split is assign to multiple actors - let mut split_to_actor = HashMap::new(); - for (actor_id, splits) in &mut *assignment { - for split in splits { - split_to_actor - .entry(split.id()) - .or_insert_with(Vec::new) - .push(*actor_id); - } - } - - for (split_id, actor_ids) in &mut split_to_actor { - if actor_ids.len() > 1 { - tracing::warn!(split_id = ?split_id, actor_ids = ?actor_ids, "split is assigned to multiple actors"); - dup_assignment_found_flag = true; - } - // keep the first actor and remove the rest from the assignment - for actor_id in actor_ids.iter().skip(1) { - assignment - .get_mut(actor_id) - .unwrap() - .retain(|split| split.id() != *split_id); - } - } - - dup_assignment_found_flag -} - fn align_backfill_splits( backfill_actors: impl IntoIterator)>, upstream_assignment: &HashMap>, @@ -1141,28 +1110,15 @@ impl SourceManager { /// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change` /// to update states in `SourceManager`. async fn tick(&self) -> MetaResult<()> { - let mut split_assignment = { + let split_assignment = { let core_guard = self.core.lock().await; core_guard.reassign_splits().await? }; - let dup_assignment_flag = split_assignment - .iter_mut() - .map(|(_, assignment)| validate_assignment(assignment)) - .reduce(|a, b| a || b) - .unwrap_or(false); - if !split_assignment.is_empty() { let command = Command::SourceSplitAssignment(split_assignment); tracing::info!(command = ?command, "pushing down split assignment command"); - if dup_assignment_flag { - tracing::warn!("duplicate split assignment found, wrap with pause and resume"); - self.barrier_scheduler - .run_config_change_command_with_pause(command) - .await?; - } else { - self.barrier_scheduler.run_command(command).await?; - } + self.barrier_scheduler.run_command(command).await?; } Ok(()) @@ -1223,14 +1179,11 @@ mod tests { use risingwave_common::types::JsonbVal; use risingwave_connector::error::ConnectorResult; - use risingwave_connector::source::test_source::TestSourceSplit; - use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; + use risingwave_connector::source::{SplitId, SplitMetaData}; use serde::{Deserialize, Serialize}; - use super::validate_assignment; use crate::model::{ActorId, FragmentId}; use crate::stream::source_manager::{reassign_splits, SplitDiffOptions}; - use crate::stream::SplitAssignment; #[derive(Debug, Copy, Clone, Serialize, Deserialize)] struct TestSplit { @@ -1351,51 +1304,6 @@ mod tests { assert!(!diff.is_empty()) } - #[test] - fn test_validate_assignment() { - let mut fragment_assignment: SplitAssignment; - let test_assignment: HashMap> = maplit::hashmap! { - 0 => vec![SplitImpl::Test( - TestSourceSplit {id: "1".into(), properties: Default::default(), offset: Default::default()} - ), SplitImpl::Test( - TestSourceSplit {id: "2".into(), properties: Default::default(), offset: Default::default()} - )], - 1 => vec![SplitImpl::Test( - TestSourceSplit {id: "3".into(), properties: Default::default(), offset: Default::default()} - )], - 2 => vec![SplitImpl::Test( - TestSourceSplit {id: "1".into(), properties: Default::default(), offset: Default::default()} - )], - }; - fragment_assignment = maplit::hashmap! { - 1 => test_assignment, - }; - - let dup_assignment_flag = fragment_assignment - .iter_mut() - .map(|(_, assignment)| validate_assignment(assignment)) - .reduce(|a, b| a || b) - .unwrap_or(false); - assert!(dup_assignment_flag); - { - let mut split_to_actor = HashMap::new(); - for actor_to_splits in fragment_assignment.values() { - for (actor_id, splits) in actor_to_splits { - let _ = splits.iter().map(|split| { - split_to_actor - .entry(split.id()) - .or_insert_with(Vec::new) - .push(*actor_id) - }); - } - } - - for actor_ids in split_to_actor.values() { - assert_eq!(actor_ids.len(), 1); - } - } - } - #[test] fn test_reassign_splits() { let actor_splits = HashMap::new();