Skip to content

Commit

Permalink
fix: revert split assignment check (#18554)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Sep 17, 2024
1 parent 11ad34b commit 9c6cec1
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 113 deletions.
22 changes: 4 additions & 18 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ use super::trace::TracedEpoch;
use crate::barrier::{GlobalBarrierManagerContext, InflightSubscriptionInfo};
use crate::manager::{DdlType, InflightFragmentInfo, MetadataManager, StreamingJob, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{
build_actor_connector_splits, validate_assignment, SplitAssignment, ThrottleConfig,
};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::MetaResult;

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
Expand Down Expand Up @@ -525,6 +523,7 @@ impl Command {

Command::SourceSplitAssignment(change) => {
let mut diff = HashMap::new();

for actor_splits in change.values() {
diff.extend(actor_splits.clone());
}
Expand Down Expand Up @@ -573,16 +572,7 @@ impl Command {
})
.collect();
let added_actors = table_fragments.actor_ids();

let mut checked_split_assignment = split_assignment.clone();
checked_split_assignment
.iter_mut()
.for_each(|(_, assignment)| {
// No related actor running before, we don't need to check the mutation
// should be wrapped with pause.
validate_assignment(assignment);
});
let actor_splits = checked_split_assignment
let actor_splits = split_assignment
.values()
.flat_map(build_actor_connector_splits)
.collect();
Expand Down Expand Up @@ -788,11 +778,7 @@ impl Command {
let mut actor_splits = HashMap::new();

for reschedule in reschedules.values() {
let mut checked_assignment = reschedule.actor_splits.clone();
// Update mutation always wrapped by Pause and Resume mutation. no further action needed.
_ = validate_assignment(&mut checked_assignment);

for (actor_id, splits) in &checked_assignment {
for (actor_id, splits) in &reschedule.actor_splits {
actor_splits.insert(
*actor_id as ActorId,
ConnectorSplits {
Expand Down
98 changes: 3 additions & 95 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,37 +588,6 @@ where
)
}

pub fn validate_assignment(assignment: &mut HashMap<ActorId, Vec<SplitImpl>>) -> bool {
let mut dup_assignment_found_flag = false;

// check if one split is assign to multiple actors
let mut split_to_actor = HashMap::new();
for (actor_id, splits) in &mut *assignment {
for split in splits {
split_to_actor
.entry(split.id())
.or_insert_with(Vec::new)
.push(*actor_id);
}
}

for (split_id, actor_ids) in &mut split_to_actor {
if actor_ids.len() > 1 {
tracing::warn!(split_id = ?split_id, actor_ids = ?actor_ids, "split is assigned to multiple actors");
dup_assignment_found_flag = true;
}
// keep the first actor and remove the rest from the assignment
for actor_id in actor_ids.iter().skip(1) {
assignment
.get_mut(actor_id)
.unwrap()
.retain(|split| split.id() != *split_id);
}
}

dup_assignment_found_flag
}

fn align_backfill_splits(
backfill_actors: impl IntoIterator<Item = (ActorId, Vec<ActorId>)>,
upstream_assignment: &HashMap<ActorId, Vec<SplitImpl>>,
Expand Down Expand Up @@ -1141,28 +1110,15 @@ impl SourceManager {
/// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change`
/// to update states in `SourceManager`.
async fn tick(&self) -> MetaResult<()> {
let mut split_assignment = {
let split_assignment = {
let core_guard = self.core.lock().await;
core_guard.reassign_splits().await?
};

let dup_assignment_flag = split_assignment
.iter_mut()
.map(|(_, assignment)| validate_assignment(assignment))
.reduce(|a, b| a || b)
.unwrap_or(false);

if !split_assignment.is_empty() {
let command = Command::SourceSplitAssignment(split_assignment);
tracing::info!(command = ?command, "pushing down split assignment command");
if dup_assignment_flag {
tracing::warn!("duplicate split assignment found, wrap with pause and resume");
self.barrier_scheduler
.run_config_change_command_with_pause(command)
.await?;
} else {
self.barrier_scheduler.run_command(command).await?;
}
self.barrier_scheduler.run_command(command).await?;
}

Ok(())
Expand Down Expand Up @@ -1223,14 +1179,11 @@ mod tests {

use risingwave_common::types::JsonbVal;
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::source::test_source::TestSourceSplit;
use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
use risingwave_connector::source::{SplitId, SplitMetaData};
use serde::{Deserialize, Serialize};

use super::validate_assignment;
use crate::model::{ActorId, FragmentId};
use crate::stream::source_manager::{reassign_splits, SplitDiffOptions};
use crate::stream::SplitAssignment;

#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
struct TestSplit {
Expand Down Expand Up @@ -1351,51 +1304,6 @@ mod tests {
assert!(!diff.is_empty())
}

#[test]
fn test_validate_assignment() {
let mut fragment_assignment: SplitAssignment;
let test_assignment: HashMap<ActorId, Vec<SplitImpl>> = maplit::hashmap! {
0 => vec![SplitImpl::Test(
TestSourceSplit {id: "1".into(), properties: Default::default(), offset: Default::default()}
), SplitImpl::Test(
TestSourceSplit {id: "2".into(), properties: Default::default(), offset: Default::default()}
)],
1 => vec![SplitImpl::Test(
TestSourceSplit {id: "3".into(), properties: Default::default(), offset: Default::default()}
)],
2 => vec![SplitImpl::Test(
TestSourceSplit {id: "1".into(), properties: Default::default(), offset: Default::default()}
)],
};
fragment_assignment = maplit::hashmap! {
1 => test_assignment,
};

let dup_assignment_flag = fragment_assignment
.iter_mut()
.map(|(_, assignment)| validate_assignment(assignment))
.reduce(|a, b| a || b)
.unwrap_or(false);
assert!(dup_assignment_flag);
{
let mut split_to_actor = HashMap::new();
for actor_to_splits in fragment_assignment.values() {
for (actor_id, splits) in actor_to_splits {
let _ = splits.iter().map(|split| {
split_to_actor
.entry(split.id())
.or_insert_with(Vec::new)
.push(*actor_id)
});
}
}

for actor_ids in split_to_actor.values() {
assert_eq!(actor_ids.len(), 1);
}
}
}

#[test]
fn test_reassign_splits() {
let actor_splits = HashMap::new();
Expand Down

0 comments on commit 9c6cec1

Please sign in to comment.