From 6b449e02ea88f7be6d728cd79382807db2f8e21f Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 26 Nov 2024 01:05:43 +0800 Subject: [PATCH] fix scaling Signed-off-by: xxchan --- e2e_test/source_inline/kafka/issue_19563.slt | 13 +- proto/stream_plan.proto | 2 + src/ctl/src/cmd_impl/meta/cluster_info.rs | 15 +- src/meta/model/src/fragment.rs | 2 + src/meta/src/controller/fragment.rs | 37 +++- src/meta/src/manager/metadata.rs | 17 +- src/meta/src/model/stream.rs | 6 +- src/meta/src/stream/scale.rs | 25 +-- src/meta/src/stream/source_manager.rs | 43 ++--- src/prost/src/lib.rs | 14 +- src/tests/simulation/src/ctl_ext.rs | 9 +- .../integration_tests/scale/shared_source.rs | 173 +++++++++++++++--- 12 files changed, 255 insertions(+), 101 deletions(-) diff --git a/e2e_test/source_inline/kafka/issue_19563.slt b/e2e_test/source_inline/kafka/issue_19563.slt index 874dbaa98ea7..1a265e1e3c4a 100644 --- a/e2e_test/source_inline/kafka/issue_19563.slt +++ b/e2e_test/source_inline/kafka/issue_19563.slt @@ -1,7 +1,7 @@ control substitution on system ok -rpk topic create test-topic-19563 +rpk topic create test-topic-19563 -p 6 statement ok CREATE SOURCE kafkasource ( @@ -15,7 +15,7 @@ WITH ( timestamptz.handling.mode = 'utc_without_suffix' ); -# Note that StreamSourceScan is in the StreamDynamicFilter fragment, which has 2 upstream fragments. +# Note that StreamSourceScan is in the StreamDynamicFilter fragment, which has 3 upstream fragments. query T explain create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000; ---- @@ -31,6 +31,12 @@ StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_co └─StreamProject { exprs: [AddWithTimeZone(now, '730000 days':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } └─StreamNow + +query I +select array_length(upstream_fragment_ids) from rw_fragments where array_contains(flags, Array['SOURCE_SCAN']); +---- +3 + # The following test is adapted from `temporal_filter.slt`. # This statement should be correct for the next ~1000 years @@ -58,6 +64,9 @@ select * from mv1 order by v1; 3031-01-01 21:00:00+00:00 +halt + + statement ok DROP SOURCE kafkasource CASCADE; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 8438df75eae5..06d846828465 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -543,6 +543,8 @@ message HopWindowNode { } message MergeNode { + // Note: `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used. + // See `compose_fragment`. repeated uint32 upstream_actor_id = 1; uint32 upstream_fragment_id = 2; // Type of the upstream dispatcher. If there's always one upstream according to this diff --git a/src/ctl/src/cmd_impl/meta/cluster_info.rs b/src/ctl/src/cmd_impl/meta/cluster_info.rs index a05780c34ec1..649d25bf8804 100644 --- a/src/ctl/src/cmd_impl/meta/cluster_info.rs +++ b/src/ctl/src/cmd_impl/meta/cluster_info.rs @@ -63,13 +63,14 @@ pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow: for actor in &fragment.actors { if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) { + let num_splits = splits.len(); let splits = splits .iter() .map(|split| SplitImpl::try_from(split).unwrap()) .map(|split| split.id()) .collect_vec() .join(","); - actor_splits_map.insert(actor.actor_id, (splits.len(), splits)); + actor_splits_map.insert(actor.actor_id, (num_splits, splits)); } } } @@ -122,14 +123,12 @@ pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow: }, split_count, splits, - // FIXME: wrong if !actor.upstream_actor_id.is_empty() { - assert!( - actor.upstream_actor_id.len() == 1, - "should have only one upstream actor, got {actor:?}" - ); - let upstream_splits = - actor_splits_map.get(&actor.upstream_actor_id[0]).unwrap(); + let upstream_splits = actor + .upstream_actor_id + .iter() + .find_map(|id| actor_splits_map.get(id)) + .expect("should have one upstream source actor"); format!( " <- Upstream Actor{}: [{}]", if ignore_id { diff --git a/src/meta/model/src/fragment.rs b/src/meta/model/src/fragment.rs index 45f59ed3c0ae..5f7768d19176 100644 --- a/src/meta/model/src/fragment.rs +++ b/src/meta/model/src/fragment.rs @@ -26,6 +26,8 @@ pub struct Model { pub job_id: ObjectId, pub fragment_type_mask: i32, pub distribution_type: DistributionType, + /// Note: the `StreamNode` is different from the final plan node used by actors. + /// Specifically, `Merge` nodes' `upstream_actor_id` will be filled. (See `compose_fragment`) pub stream_node: StreamNode, pub state_table_ids: I32Array, pub upstream_fragment_id: I32Array, diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 7ccc0f060631..2e4a4acace64 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1292,12 +1292,22 @@ impl CatalogController { Ok(actors) } - /// Get the actor ids, and each actor's upstream actor ids of the fragment with `fragment_id` with `Running` status. - pub async fn get_running_actors_and_upstream_of_fragment( + /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status. + pub async fn get_running_actors_for_source_backfill( &self, fragment_id: FragmentId, - ) -> MetaResult> { + ) -> MetaResult> { let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + let fragment = Fragment::find_by_id(fragment_id) + .one(&txn) + .await? + .context(format!("fragment {} not found", fragment_id))?; + let (_source_id, upstream_source_fragment_id, _do_not_use_upstream_actor_id) = fragment + .stream_node + .to_protobuf() + .find_source_backfill() + .unwrap(); let actors: Vec<(ActorId, ActorUpstreamActors)> = Actor::find() .select_only() .column(actor::Column::ActorId) @@ -1305,9 +1315,24 @@ impl CatalogController { .filter(actor::Column::FragmentId.eq(fragment_id)) .filter(actor::Column::Status.eq(ActorStatus::Running)) .into_tuple() - .all(&inner.db) + .all(&txn) .await?; - Ok(actors) + Ok(actors + .into_iter() + .map(|(actor_id, upstream_actor_ids)| { + let upstream_source_actors = + &upstream_actor_ids.0[&(upstream_source_fragment_id as i32)]; + assert_eq!( + upstream_source_actors.len(), + 1, + "expect only one upstream source actor, but got {:?}, actor_id: {}, fragment_id: {}", + upstream_source_actors, + actor_id, + fragment_id + ); + (actor_id, upstream_source_actors[0]) + }) + .collect()) } pub async fn get_actors_by_job_ids(&self, job_ids: Vec) -> MetaResult> { @@ -1469,7 +1494,7 @@ impl CatalogController { let mut source_fragment_ids = HashMap::new(); for (fragment_id, _, stream_node) in fragments { - if let Some((source_id, upstream_source_fragment_id)) = + if let Some((source_id, upstream_source_fragment_id, _do_not_use_upstream_actor_id)) = stream_node.to_protobuf().find_source_backfill() { source_fragment_ids diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index f2510a5b7548..c54d4f4a9db4 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -571,26 +571,17 @@ impl MetadataManager { Ok(actor_ids.into_iter().map(|id| id as ActorId).collect()) } - pub async fn get_running_actors_and_upstream_actors_of_fragment( + pub async fn get_running_actors_for_source_backfill( &self, id: FragmentId, - ) -> MetaResult)>> { + ) -> MetaResult> { let actor_ids = self .catalog_controller - .get_running_actors_and_upstream_of_fragment(id as _) + .get_running_actors_for_source_backfill(id as _) .await?; Ok(actor_ids .into_iter() - .map(|(id, actors)| { - ( - id as ActorId, - actors - .into_inner() - .into_iter() - .flat_map(|(_, ids)| ids.into_iter().map(|id| id as ActorId)) - .collect(), - ) - }) + .map(|(id, upstream)| (id as ActorId, upstream as ActorId)) .collect()) } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 3c7d4329d3ec..78bf53a32cc0 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -423,6 +423,10 @@ impl StreamJobFragments { source_fragments } + /// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)). + /// + /// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments, + /// but only one of them is the upstream source fragment, which is what we return. pub fn source_backfill_fragments( &self, ) -> MetadataModelResult>> { @@ -430,7 +434,7 @@ impl StreamJobFragments { for fragment in self.fragments() { for actor in &fragment.actors { - if let Some((source_id, upstream_source_fragment_id)) = + if let Some((source_id, upstream_source_fragment_id, _upstream_actor_id)) = actor.nodes.as_ref().unwrap().find_source_backfill() { source_backfill_fragments diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index c4a97d25d739..79a96234d348 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -158,8 +158,8 @@ pub struct RescheduleContext { upstream_dispatchers: HashMap>, /// Fragments with `StreamSource` stream_source_fragment_ids: HashSet, - /// Fragments with `StreamSourceBackfill` - stream_source_backfill_fragment_ids: HashSet, + /// Fragments with `StreamSourceBackfill` and the corresponding upstream source fragment + stream_source_backfill_fragment_ids: HashMap, /// Target fragments in `NoShuffle` relation no_shuffle_target_fragment_ids: HashSet, /// Source fragments in `NoShuffle` relation @@ -696,7 +696,7 @@ impl ScaleController { } let mut stream_source_fragment_ids = HashSet::new(); - let mut stream_source_backfill_fragment_ids = HashSet::new(); + let mut stream_source_backfill_fragment_ids = HashMap::new(); let mut no_shuffle_reschedule = HashMap::new(); for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule { let fragment = fragment_map @@ -821,8 +821,14 @@ impl ScaleController { // SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source. if (fragment.get_fragment_type_mask() & FragmentTypeFlag::SourceScan as u32) != 0 { let stream_node = fragment.actor_template.nodes.as_ref().unwrap(); - if stream_node.find_source_backfill().is_some() { - stream_source_backfill_fragment_ids.insert(fragment.fragment_id); + if let Some(( + _source_id, + upstream_source_fragment_id, + _do_not_use_upstream_actor_id, + )) = stream_node.find_source_backfill() + { + stream_source_backfill_fragment_ids + .insert(fragment.fragment_id, upstream_source_fragment_id); } } } @@ -1257,17 +1263,14 @@ impl ScaleController { for fragment_id in reschedules.keys() { let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id]; - if ctx - .stream_source_backfill_fragment_ids - .contains(fragment_id) + if let Some(upstream_source_fragment_id) = + ctx.stream_source_backfill_fragment_ids.get(fragment_id) { - let fragment = &ctx.fragment_map[fragment_id]; - let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec(); let actor_splits = self.source_manager.migrate_splits_for_backfill_actors( *fragment_id, - &fragment.upstream_fragment_ids, + *upstream_source_fragment_id, &curr_actor_ids, &fragment_actor_splits, &no_shuffle_upstream_actor_map, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index fda2027bfee3..ddd65bf9352f 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -392,7 +392,7 @@ impl SourceManagerCore { }; let actors = match self .metadata_manager - .get_running_actors_and_upstream_actors_of_fragment(*fragment_id) + .get_running_actors_for_source_backfill(*fragment_id) .await { Ok(actors) => { @@ -653,20 +653,16 @@ where } fn align_backfill_splits( - backfill_actors: impl IntoIterator)>, + backfill_actors: impl IntoIterator, upstream_assignment: &HashMap>, fragment_id: FragmentId, - upstream_fragment_id: FragmentId, + upstream_source_fragment_id: FragmentId, ) -> anyhow::Result>> { backfill_actors .into_iter() .map(|(actor_id, upstream_actor_id)| { - // FIXME: wrong - let err = || anyhow::anyhow!("source backfill actor should have one upstream actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, actor_id: {actor_id}, upstream_assignment: {upstream_assignment:?}, upstream_actor_id: {upstream_actor_id:?}"); - if upstream_actor_id.len() != 1 { - return Err(err()); - } - let Some(splits) = upstream_assignment.get(&upstream_actor_id[0]) else { + let err = || anyhow::anyhow!("source backfill actor should have one upstream source actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_assignment: {upstream_assignment:?}, upstream_actor_id: {upstream_actor_id:?}"); + let Some(splits) = upstream_assignment.get(&upstream_actor_id) else { return Err(err()); }; Ok(( @@ -845,28 +841,29 @@ impl SourceManager { pub fn migrate_splits_for_backfill_actors( &self, fragment_id: FragmentId, - upstream_fragment_ids: &Vec, + upstream_source_fragment_id: FragmentId, curr_actor_ids: &[ActorId], fragment_actor_splits: &HashMap>>, no_shuffle_upstream_actor_map: &HashMap>, ) -> MetaResult>> { // align splits for backfill fragments with its upstream source fragment - debug_assert!(upstream_fragment_ids.len() == 1); - let upstream_fragment_id = upstream_fragment_ids[0]; let actors = no_shuffle_upstream_actor_map .iter() .filter(|(id, _)| curr_actor_ids.contains(id)) .map(|(id, upstream_fragment_actors)| { - debug_assert!(upstream_fragment_actors.len() == 1); ( *id, - vec![*upstream_fragment_actors.get(&upstream_fragment_id).unwrap()], + *upstream_fragment_actors + .get(&upstream_source_fragment_id) + .unwrap(), ) }); - let upstream_assignment = fragment_actor_splits.get(&upstream_fragment_id).unwrap(); + let upstream_assignment = fragment_actor_splits + .get(&upstream_source_fragment_id) + .unwrap(); tracing::info!( fragment_id, - upstream_fragment_id, + upstream_source_fragment_id, ?upstream_assignment, "migrate_splits_for_backfill_actors" ); @@ -874,7 +871,7 @@ impl SourceManager { actors, upstream_assignment, fragment_id, - upstream_fragment_id, + upstream_source_fragment_id, )?) } @@ -961,19 +958,19 @@ impl SourceManager { let mut assigned = HashMap::new(); for (_source_id, fragments) in source_backfill_fragments { - for (fragment_id, upstream_fragment_id) in fragments { + for (fragment_id, upstream_source_fragment_id) in fragments { let upstream_actors = core .metadata_manager - .get_running_actors_of_fragment(upstream_fragment_id) + .get_running_actors_of_fragment(upstream_source_fragment_id) .await?; let mut backfill_actors = vec![]; for upstream_actor in upstream_actors { if let Some(dispatchers) = dispatchers.get(&upstream_actor) { let err = || { anyhow::anyhow!( - "source backfill fragment's upstream fragment should have one dispatcher, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, upstream_actor: {upstream_actor}, dispatchers: {dispatchers:?}", + "source backfill fragment's upstream fragment should have one dispatcher, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, dispatchers: {dispatchers:?}", fragment_id = fragment_id, - upstream_fragment_id = upstream_fragment_id, + upstream_source_fragment_id = upstream_source_fragment_id, upstream_actor = upstream_actor, dispatchers = dispatchers ) @@ -983,7 +980,7 @@ impl SourceManager { } backfill_actors - .push((dispatchers[0].downstream_actor_id[0], vec![upstream_actor])); + .push((dispatchers[0].downstream_actor_id[0], upstream_actor)); } } assigned.insert( @@ -992,7 +989,7 @@ impl SourceManager { backfill_actors, upstream_assignment, fragment_id, - upstream_fragment_id, + upstream_source_fragment_id, )?, ); } diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 2c6bede75ee4..1a657927e3b0 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -289,19 +289,27 @@ impl stream_plan::StreamNode { /// Find the external stream source info inside the stream node, if any. /// - /// Returns (`source_id`, `upstream_source_fragment_id`). + /// Returns (`source_id`, `upstream_source_fragment_id`, `upstream_actor_id`). /// /// Note: we must get upstream fragment id from the merge node, not from the fragment's /// `upstream_fragment_ids`. e.g., DynamicFilter may have 2 upstream fragments, but only /// one is the upstream source fragment. - pub fn find_source_backfill(&self) -> Option<(u32, u32)> { + /// + /// Note: `merge.upstream_actor_id` need to be used with caution. + /// DO NOT USE if the `StreamNode` is from `Fragment` meta model. + /// OK to use if the `StreamNode` is from `TableFragments` proto. + pub fn find_source_backfill(&self) -> Option<(u32, u32, Vec)> { if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) = self.node_body.as_ref() { if let crate::stream_plan::stream_node::NodeBody::Merge(merge) = self.input[0].node_body.as_ref().unwrap() { - return Some((source.upstream_source_id, merge.upstream_fragment_id)); + return Some(( + source.upstream_source_id, + merge.upstream_fragment_id, + merge.upstream_actor_id.clone(), + )); } else { unreachable!( "source backfill must have a merge node as its input: {:?}", diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 2a022165c852..acb49a0df4d0 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -361,14 +361,12 @@ impl Cluster { Ok(response) } - /// `table_id -> actor_id -> splits` - pub async fn list_source_splits(&self) -> Result>> { + /// `actor_id -> splits` + pub async fn list_source_splits(&self) -> Result> { let info = self.get_cluster_info().await?; let mut res = BTreeMap::new(); for table in info.table_fragments { - let mut table_actor_splits = BTreeMap::new(); - for (actor_id, splits) in table.actor_splits { let splits = splits .splits @@ -377,9 +375,8 @@ impl Cluster { .map(|split| split.id()) .collect_vec() .join(","); - table_actor_splits.insert(actor_id, splits); + res.insert(actor_id, splits); } - res.insert(table.table_id, table_actor_splits); } Ok(res) diff --git a/src/tests/simulation/tests/integration_tests/scale/shared_source.rs b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs index 76aaae6a716a..4a048e752307 100644 --- a/src/tests/simulation/tests/integration_tests/scale/shared_source.rs +++ b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs @@ -29,11 +29,16 @@ CREATE SOURCE s(v1 int, v2 varchar) WITH ( topic='shared_source' ) FORMAT PLAIN ENCODE JSON;"#; -fn actor_upstream(fragment: &Fragment) -> Vec<(u32, Vec)> { +fn source_backfill_upstream(fragment: &Fragment) -> Vec<(u32, u32)> { fragment .actors .iter() - .map(|actor| (actor.actor_id, actor.upstream_actor_id.clone())) + .map(|actor| { + let (_, _source_fragment_id, source_actor_id) = + actor.get_nodes().unwrap().find_source_backfill().unwrap(); + assert!(source_actor_id.len() == 1); + (actor.actor_id, source_actor_id[0]) + }) .collect_vec() } @@ -43,7 +48,7 @@ async fn validate_splits_aligned(cluster: &mut Cluster) -> Result<()> { .await?; // The result of scaling is non-deterministic. // So we just print the result here, instead of asserting with a fixed value. - let actor_upstream = actor_upstream(&source_backfill_fragment.inner); + let actor_upstream = source_backfill_upstream(&source_backfill_fragment.inner); tracing::info!( "{}", actor_upstream @@ -53,18 +58,12 @@ async fn validate_splits_aligned(cluster: &mut Cluster) -> Result<()> { actor_id, upstream ))) ); - let splits = cluster.list_source_splits().await?; - tracing::info!("{:#?}", splits); - let actor_splits: BTreeMap = splits - .values() - .flat_map(|m| m.clone().into_iter()) - .collect(); + let actor_splits = cluster.list_source_splits().await?; + tracing::info!("{:#?}", actor_splits); for (actor, upstream) in actor_upstream { - assert!(upstream.len() == 1, "invalid upstream: {:?}", upstream); - let upstream_actor = upstream[0]; assert_eq!( actor_splits.get(&actor).unwrap(), - actor_splits.get(&upstream_actor).unwrap() + actor_splits.get(&upstream).unwrap() ); } Ok(()) @@ -112,10 +111,10 @@ async fn test_shared_source() -> Result<()> { .collect_vec(); validate_splits_aligned(&mut cluster).await?; expect_test::expect![[r#" - 1 6 HASH {7} {} {SOURCE} 6 256 - 2 8 HASH {9,8} {3} {MVIEW} 6 256 - 3 8 HASH {10} {1} {SOURCE_SCAN} 6 256"#]] - .assert_eq(&cluster.run("select * from rw_fragments;").await?); + 1 6 HASH {SOURCE} 6 256 + 2 8 HASH {MVIEW} 6 256 + 3 8 HASH {SOURCE_SCAN} 6 256"#]] + .assert_eq(&cluster.run("select fragment_id, table_id, distribution_type, flags, parallelism, max_parallelism from rw_fragments;").await?); expect_test::expect![[r#" 6 CREATED ADAPTIVE 256 8 CREATED ADAPTIVE 256"#]] @@ -137,10 +136,10 @@ async fn test_shared_source() -> Result<()> { .await .unwrap(); expect_test::expect![[r#" - 1 6 HASH {7} {} {SOURCE} 6 256 - 2 8 HASH {9,8} {3} {MVIEW} 5 256 - 3 8 HASH {10} {1} {SOURCE_SCAN} 6 256"#]] - .assert_eq(&cluster.run("select * from rw_fragments;").await?); + 1 6 HASH {} {SOURCE} 6 256 + 2 8 HASH {3} {MVIEW} 5 256 + 3 8 HASH {1} {SOURCE_SCAN} 6 256"#]] + .assert_eq(&cluster.run("select fragment_id, table_id, distribution_type, flags, parallelism, max_parallelism from rw_fragments;").await?); // source is the NoShuffle upstream. It can be scaled, and the downstream SourceBackfill will be scaled together. cluster @@ -156,10 +155,10 @@ async fn test_shared_source() -> Result<()> { .unwrap(); validate_splits_aligned(&mut cluster).await?; expect_test::expect![[r#" - 1 6 HASH {7} {} {SOURCE} 3 256 - 2 8 HASH {9,8} {3} {MVIEW} 5 256 - 3 8 HASH {10} {1} {SOURCE_SCAN} 3 256"#]] - .assert_eq(&cluster.run("select * from rw_fragments;").await?); + 1 6 HASH {} {SOURCE} 3 256 + 2 8 HASH {3} {MVIEW} 5 256 + 3 8 HASH {1} {SOURCE_SCAN} 3 256"#]] + .assert_eq(&cluster.run("select fragment_id, table_id, distribution_type, flags, parallelism, max_parallelism from rw_fragments;").await?); expect_test::expect![[r#" 6 CREATED CUSTOM 256 8 CREATED CUSTOM 256"#]] @@ -180,13 +179,131 @@ async fn test_shared_source() -> Result<()> { .unwrap(); validate_splits_aligned(&mut cluster).await?; expect_test::expect![[r#" - 1 6 HASH {7} {} {SOURCE} 7 256 - 2 8 HASH {9,8} {3} {MVIEW} 5 256 - 3 8 HASH {10} {1} {SOURCE_SCAN} 7 256"#]] - .assert_eq(&cluster.run("select * from rw_fragments;").await?); + 1 6 HASH {} {SOURCE} 7 256 + 2 8 HASH {3} {MVIEW} 5 256 + 3 8 HASH {1} {SOURCE_SCAN} 7 256"#]] + .assert_eq(&cluster.run("select fragment_id, table_id, distribution_type, flags, parallelism, max_parallelism from rw_fragments;").await?); expect_test::expect![[r#" 6 CREATED CUSTOM 256 8 CREATED CUSTOM 256"#]] .assert_eq(&cluster.run("select * from rw_table_fragments;").await?); Ok(()) } + +#[tokio::test] +async fn test_issue_19563() -> Result<()> { + tracing_subscriber::fmt::Subscriber::builder() + .with_max_level(tracing::Level::ERROR) + .with_env_filter("risingwave_stream::executor::source::source_backfill_executor=DEBUG,integration_tests=DEBUG") + .init(); + + let mut cluster = Cluster::start(Configuration::for_scale_shared_source()).await?; + cluster.create_kafka_topics(convert_args!(hashmap!( + "shared_source" => 4, + ))); + let mut session = cluster.start_session(); + + session.run("set rw_implicit_flush = true;").await?; + + session + .run( + r#" +CREATE SOURCE s(v1 timestamp with time zone) WITH ( + connector='kafka', + properties.bootstrap.server='192.168.11.1:29092', + topic='shared_source' +) FORMAT PLAIN ENCODE JSON;"#, + ) + .await?; + session + .run("create materialized view mv1 as select v1 from s where v1 between now() and now() + interval '1 day' * 365 * 2000;") + .await?; + let source_fragment = cluster + .locate_one_fragment([ + identity_contains("Source"), + no_identity_contains("StreamSourceScan"), + ]) + .await?; + let source_workers = source_fragment.all_worker_count().into_keys().collect_vec(); + let source_backfill_dynamic_filter_fragment = cluster + .locate_one_fragment([ + identity_contains("StreamSourceScan"), + identity_contains("StreamDynamicFilter"), + ]) + .await?; + let source_backfill_workers = source_backfill_dynamic_filter_fragment + .all_worker_count() + .into_keys() + .collect_vec(); + validate_splits_aligned(&mut cluster).await?; + expect_test::expect![[r#" + 1 6 HASH {SOURCE} 6 256 + 2 8 HASH {MVIEW,SOURCE_SCAN} 6 256 + 3 8 SINGLE {NOW} 1 1 + 4 8 SINGLE {NOW} 1 1"#]] + .assert_eq(&cluster.run("select fragment_id, table_id, distribution_type, flags, parallelism, max_parallelism from rw_fragments;").await?); + expect_test::expect![[r#" + 6 CREATED ADAPTIVE 256 + 8 CREATED ADAPTIVE 256"#]] + .assert_eq(&cluster.run("select * from rw_table_fragments;").await?); + + // SourceBackfill/DynamicFilter cannot be scaled because of NoShuffle. + assert!( + &cluster + .reschedule( + source_backfill_dynamic_filter_fragment + .reschedule([WorkerSlotId::new(source_backfill_workers[0], 0)], []), + ) + .await.unwrap_err().to_string().contains("rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"), + ); + + // source is the NoShuffle upstream. It can be scaled, and the downstream SourceBackfill/DynamicFilter will be scaled together. + cluster + .reschedule(source_fragment.reschedule( + [ + WorkerSlotId::new(source_workers[0], 0), + WorkerSlotId::new(source_workers[0], 1), + WorkerSlotId::new(source_workers[2], 0), + ], + [], + )) + .await + .unwrap(); + validate_splits_aligned(&mut cluster).await?; + expect_test::expect![[r#" + 1 6 HASH {SOURCE} 3 256 + 2 8 HASH {MVIEW,SOURCE_SCAN} 3 256 + 3 8 SINGLE {NOW} 1 1 + 4 8 SINGLE {NOW} 1 1"#]] + .assert_eq(&cluster.run("select fragment_id, table_id, distribution_type, flags, parallelism, max_parallelism from rw_fragments;").await?); + expect_test::expect![[r#" + 6 CREATED CUSTOM 256 + 8 CREATED ADAPTIVE 256"#]] + .assert_eq(&cluster.run("select * from rw_table_fragments;").await?); + + // resolve_no_shuffle for backfill fragment is OK, which will scale the upstream together. + cluster + .reschedule_resolve_no_shuffle(source_backfill_dynamic_filter_fragment.reschedule( + [], + [ + WorkerSlotId::new(source_workers[0], 0), + WorkerSlotId::new(source_workers[0], 1), + WorkerSlotId::new(source_workers[2], 0), + WorkerSlotId::new(source_workers[2], 1), + ], + )) + .await + .unwrap(); + validate_splits_aligned(&mut cluster).await?; + expect_test::expect![[r#" + 1 6 HASH {SOURCE} 7 256 + 2 8 HASH {MVIEW,SOURCE_SCAN} 7 256 + 3 8 SINGLE {NOW} 1 1 + 4 8 SINGLE {NOW} 1 1"#]] + .assert_eq(&cluster.run("select fragment_id, table_id, distribution_type, flags, parallelism, max_parallelism from rw_fragments;").await?); + expect_test::expect![[r#" + 6 CREATED CUSTOM 256 + 8 CREATED ADAPTIVE 256"#]] + .assert_eq(&cluster.run("select * from rw_table_fragments;").await?); + Ok(()) +}