Skip to content

Commit

Permalink
Refactor unwrapping to safe check in scale.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Feb 4, 2024
1 parent eeed1e9 commit 28f0936
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1732,16 +1732,21 @@ impl ScaleController {
"no shuffle should have exactly one downstream actor id",
);

let downstream_fragment_id = actor_fragment_id_map_for_check
.get(downstream_actor_id)
.unwrap();

// dispatcher_id of dispatcher should be exactly same as downstream fragment id
// but we need to check it to make sure
debug_assert_eq!(
*downstream_fragment_id,
dispatcher.dispatcher_id as FragmentId
);
if let Some(downstream_fragment_id) =
actor_fragment_id_map_for_check.get(downstream_actor_id)
{
// dispatcher_id of dispatcher should be exactly same as downstream fragment id
// but we need to check it to make sure
debug_assert_eq!(
*downstream_fragment_id,
dispatcher.dispatcher_id as FragmentId
);
} else {
tracing::warn!(
"downstream actor id {} not found in fragment_actor_id_map",
downstream_actor_id
);
}

no_shuffle_target_fragment_ids
.insert(dispatcher.dispatcher_id as FragmentId);
Expand Down

0 comments on commit 28f0936

Please sign in to comment.