Skip to content

Commit

Permalink
Renamed DDL vars for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jul 9, 2024
1 parent 50c27b6 commit d579465
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
1 change: 0 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1361,7 +1361,6 @@ impl FragmentManager {
let mut actor_to_vnode_bitmap = HashMap::with_capacity(fragment.actors.len());
for actor in &fragment.actors {
let actor_status = &actor_status[&actor.actor_id];
// let parallel_unit_id = actor_status.parallel_unit.as_ref().unwrap().id;
let worker_id = actor_status.parallel_unit.as_ref().unwrap().worker_node_id;
actor_to_worker.insert(actor.actor_id, worker_id);

Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1955,7 +1955,7 @@ impl DdlController {
};

// Map the column indices in the dispatchers with the given mapping.
let (downstream_fragments, downstream_actor_locations) = self
let (downstream_fragments, downstream_actor_location) = self
.metadata_manager
.get_downstream_chain_fragments(id)
.await?;
Expand All @@ -1981,22 +1981,24 @@ impl DdlController {
fragment_graph,
original_table_fragment.fragment_id,
downstream_fragments,
downstream_actor_locations,
downstream_actor_location,
ddl_type,
)?,

TableJobType::SharedCdcSource => {
// get the upstream fragment which should be the cdc source
let (upstream_root_fragments, existing_actor_location) = self
let (upstream_root_fragments, upstream_actor_location) = self
.metadata_manager
.get_upstream_root_fragments(fragment_graph.dependent_table_ids())
.await?;

CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
fragment_graph,
upstream_root_fragments,
upstream_actor_location,
original_table_fragment.fragment_id,
downstream_fragments,
existing_actor_location,
downstream_actor_location,
ddl_type,
)?
}
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,21 +620,22 @@ impl CompleteStreamFragmentGraph {
pub fn with_upstreams_and_downstreams(
graph: StreamFragmentGraph,
upstream_root_fragments: HashMap<TableId, Fragment>,
upstream_actor_location: HashMap<ActorId, u32>,
original_table_fragment_id: FragmentId,
downstream_fragments: Vec<(DispatchStrategy, Fragment)>,
existing_actor_location: HashMap<ActorId, u32>,
downstream_actor_location: HashMap<ActorId, u32>,
ddl_type: DdlType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Some(FragmentGraphUpstreamContext {
upstream_root_fragments,
upstream_actor_location: existing_actor_location.clone(),
upstream_actor_location,
}),
Some(FragmentGraphDownstreamContext {
original_table_fragment_id,
downstream_fragments,
downstream_actor_location: existing_actor_location,
downstream_actor_location,
}),
ddl_type,
)
Expand Down

0 comments on commit d579465

Please sign in to comment.