diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index e5ba6503d79d..51778c7a71a0 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -394,6 +394,17 @@ impl FragmentManager { table_fragments.insert(table_id, table_fragment.clone()); + // Fragment replace map. + let fragment_replace_map: HashMap<_, _> = merge_updates + .iter() + .map(|update| { + ( + update.upstream_fragment_id, + update.new_upstream_fragment_id.unwrap(), + ) + }) + .collect(); + // Update downstream `Merge`s. let mut merge_updates: HashMap<_, _> = merge_updates .iter() @@ -416,24 +427,34 @@ impl FragmentManager { .get_mut(table_id) .with_context(|| format!("table_fragment not exist: id={}", table_id))?; - for actor in table_fragment - .fragments - .values_mut() - .flat_map(|f| &mut f.actors) - { - if let Some(merge_update) = merge_updates.remove(&actor.actor_id) { - assert!(merge_update.removed_upstream_actor_id.is_empty()); - assert!(merge_update.new_upstream_fragment_id.is_some()); - - let stream_node = actor.nodes.as_mut().unwrap(); - visit_stream_node(stream_node, |body| { - if let NodeBody::Merge(m) = body - && m.upstream_fragment_id == merge_update.upstream_fragment_id - { - m.upstream_fragment_id = merge_update.new_upstream_fragment_id.unwrap(); - m.upstream_actor_id = merge_update.added_upstream_actor_id.clone(); - } - }); + for fragment in table_fragment.fragments.values_mut() { + for actor in &mut fragment.actors { + if let Some(merge_update) = merge_updates.remove(&actor.actor_id) { + assert!(merge_update.removed_upstream_actor_id.is_empty()); + assert!(merge_update.new_upstream_fragment_id.is_some()); + + let stream_node = actor.nodes.as_mut().unwrap(); + let mut upstream_actor_ids = HashSet::new(); + visit_stream_node(stream_node, |body| { + if let NodeBody::Merge(m) = body { + if m.upstream_fragment_id == merge_update.upstream_fragment_id { + m.upstream_fragment_id = + merge_update.new_upstream_fragment_id.unwrap(); + m.upstream_actor_id = + merge_update.added_upstream_actor_id.clone(); + } + upstream_actor_ids.extend(m.upstream_actor_id.clone()); + } + }); + actor.upstream_actor_id = upstream_actor_ids.into_iter().collect(); + } + } + for upstream_fragment_id in &mut fragment.upstream_fragment_ids { + if let Some(new_upstream_fragment_id) = + fragment_replace_map.get(upstream_fragment_id) + { + *upstream_fragment_id = *new_upstream_fragment_id; + } } } } diff --git a/src/tests/simulation/tests/integration_tests/scale/table.rs b/src/tests/simulation/tests/integration_tests/scale/table.rs index d493f1ef7c2f..04293c1701bc 100644 --- a/src/tests/simulation/tests/integration_tests/scale/table.rs +++ b/src/tests/simulation/tests/integration_tests/scale/table.rs @@ -85,3 +85,44 @@ async fn test_mv_on_scaled_table() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_scale_on_schema_change() -> Result<()> { + let mut cluster = Cluster::start(Configuration::for_scale()).await?; + cluster.run(ROOT_TABLE_CREATE).await?; + + cluster.run(MV1).await?; + + let fragment = cluster + .locate_one_fragment([identity_contains("materialize"), identity_contains("union")]) + .await?; + + cluster + .reschedule(fragment.reschedule([0, 2, 4], [])) + .await?; + + insert_and_flush!(cluster); + + cluster.run("alter table t add column v2 int").await?; + + let fragment = cluster + .locate_one_fragment([ + identity_contains("materialize"), + identity_contains("StreamTableScan"), + ]) + .await?; + + cluster + .reschedule_resolve_no_shuffle(fragment.reschedule([1], [0, 4])) + .await?; + + let fragment = cluster + .locate_one_fragment([identity_contains("materialize"), identity_contains("union")]) + .await?; + let (_, used) = fragment.parallel_unit_usage(); + assert_eq!(used.len(), 4); + + insert_and_flush!(cluster); + + Ok(()) +}