From 2cfb26b463fa2a03c2f5cb22858525c1b3bc8abe Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 27 Nov 2024 16:00:55 +0800 Subject: [PATCH] remove _do_not_use Signed-off-by: xxchan --- proto/stream_plan.proto | 4 +- src/meta/src/controller/fragment.rs | 4 +- src/meta/src/model/stream.rs | 2 +- src/meta/src/stream/scale.rs | 7 +-- src/prost/src/lib.rs | 16 ++----- .../integration_tests/scale/shared_source.rs | 48 +++++++++++++++---- 6 files changed, 53 insertions(+), 28 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 05f2dc3ddbf2..379a6001d9d5 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -541,7 +541,9 @@ message HopWindowNode { } message MergeNode { - // Note: `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used. + // **WARNING**: Use this field with caution. + // + // `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used. // See `compose_fragment`. repeated uint32 upstream_actor_id = 1; uint32 upstream_fragment_id = 2; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index f6979202b615..d4dd44359050 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1237,7 +1237,7 @@ impl CatalogController { .one(&txn) .await? .context(format!("fragment {} not found", fragment_id))?; - let (_source_id, upstream_source_fragment_id, _do_not_use_upstream_actor_id) = fragment + let (_source_id, upstream_source_fragment_id) = fragment .stream_node .to_protobuf() .find_source_backfill() @@ -1428,7 +1428,7 @@ impl CatalogController { let mut source_fragment_ids = HashMap::new(); for (fragment_id, _, stream_node) in fragments { - if let Some((source_id, upstream_source_fragment_id, _do_not_use_upstream_actor_id)) = + if let Some((source_id, upstream_source_fragment_id)) = stream_node.to_protobuf().find_source_backfill() { source_fragment_ids diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 3b18b59dc249..75282682e620 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -450,7 +450,7 @@ impl TableFragments { for fragment in self.fragments() { for actor in &fragment.actors { - if let Some((source_id, upstream_source_fragment_id, _upstream_actor_id)) = + if let Some((source_id, upstream_source_fragment_id)) = actor.nodes.as_ref().unwrap().find_source_backfill() { source_backfill_fragments diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index a49c3348120e..f0fd113f3cf9 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -821,11 +821,8 @@ impl ScaleController { // SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source. if (fragment.get_fragment_type_mask() & FragmentTypeFlag::SourceScan as u32) != 0 { let stream_node = fragment.actor_template.nodes.as_ref().unwrap(); - if let Some(( - _source_id, - upstream_source_fragment_id, - _do_not_use_upstream_actor_id, - )) = stream_node.find_source_backfill() + if let Some((_source_id, upstream_source_fragment_id)) = + stream_node.find_source_backfill() { stream_source_backfill_fragment_ids .insert(fragment.fragment_id, upstream_source_fragment_id); diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index c9f7033e8f2b..80eea383f576 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -284,27 +284,21 @@ impl stream_plan::StreamNode { /// Find the external stream source info inside the stream node, if any. /// - /// Returns (`source_id`, `upstream_source_fragment_id`, `upstream_actor_id`). + /// Returns (`source_id`, `upstream_source_fragment_id`). /// /// Note: we must get upstream fragment id from the merge node, not from the fragment's /// `upstream_fragment_ids`. e.g., DynamicFilter may have 2 upstream fragments, but only /// one is the upstream source fragment. - /// - /// Note: `merge.upstream_actor_id` need to be used with caution. - /// DO NOT USE if the `StreamNode` is from `Fragment` meta model. - /// OK to use if the `StreamNode` is from `TableFragments` proto. - pub fn find_source_backfill(&self) -> Option<(u32, u32, Vec)> { + pub fn find_source_backfill(&self) -> Option<(u32, u32)> { if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) = self.node_body.as_ref() { if let crate::stream_plan::stream_node::NodeBody::Merge(merge) = self.input[0].node_body.as_ref().unwrap() { - return Some(( - source.upstream_source_id, - merge.upstream_fragment_id, - merge.upstream_actor_id.clone(), - )); + // Note: avoid using `merge.upstream_actor_id` to prevent misuse. + // See comments there for details. + return Some((source.upstream_source_id, merge.upstream_fragment_id)); } else { unreachable!( "source backfill must have a merge node as its input: {:?}", diff --git a/src/tests/simulation/tests/integration_tests/scale/shared_source.rs b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs index c4473b722afd..905a6ddfb39a 100644 --- a/src/tests/simulation/tests/integration_tests/scale/shared_source.rs +++ b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use anyhow::Result; use itertools::Itertools; use maplit::{convert_args, hashmap}; use risingwave_common::hash::WorkerSlotId; use risingwave_pb::meta::table_fragments::Fragment; +use risingwave_pb::stream_plan::DispatcherType; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; @@ -27,15 +30,37 @@ CREATE SOURCE s(v1 int, v2 varchar) WITH ( topic='shared_source' ) FORMAT PLAIN ENCODE JSON;"#; -fn source_backfill_upstream(fragment: &Fragment) -> Vec<(u32, u32)> { - fragment +/// `Ve<(backfill_fragment_id, source_fragment_id)>` +fn source_backfill_upstream( + source_backfill_fragment: &Fragment, + source_fragment: &Fragment, +) -> Vec<(u32, u32)> { + let mut no_shuffle_downstream_to_upstream = HashMap::new(); + for source_actor in &source_fragment.actors { + for dispatcher in &source_actor.dispatcher { + if dispatcher.r#type == DispatcherType::NoShuffle as i32 { + assert_eq!(dispatcher.downstream_actor_id.len(), 1); + let downstream_actor_id = dispatcher.downstream_actor_id[0]; + no_shuffle_downstream_to_upstream + .insert(downstream_actor_id, source_actor.actor_id); + } + } + } + + source_backfill_fragment .actors .iter() - .map(|actor| { - let (_, _source_fragment_id, source_actor_id) = - actor.get_nodes().unwrap().find_source_backfill().unwrap(); - assert!(source_actor_id.len() == 1); - (actor.actor_id, source_actor_id[0]) + .map(|backfill_actor| { + let (_, source_fragment_id) = backfill_actor + .get_nodes() + .unwrap() + .find_source_backfill() + .unwrap(); + assert_eq!(source_fragment.fragment_id, source_fragment_id); + ( + backfill_actor.actor_id, + no_shuffle_downstream_to_upstream[&backfill_actor.actor_id], + ) }) .collect_vec() } @@ -44,9 +69,16 @@ async fn validate_splits_aligned(cluster: &mut Cluster) -> Result<()> { let source_backfill_fragment = cluster .locate_one_fragment([identity_contains("StreamSourceScan")]) .await?; + let source_fragment = cluster + .locate_one_fragment([ + identity_contains("Source"), + no_identity_contains("StreamSourceScan"), + ]) + .await?; // The result of scaling is non-deterministic. // So we just print the result here, instead of asserting with a fixed value. - let actor_upstream = source_backfill_upstream(&source_backfill_fragment.inner); + let actor_upstream = + source_backfill_upstream(&source_backfill_fragment.inner, &source_fragment.inner); tracing::info!( "{}", actor_upstream