Skip to content

Commit

Permalink
remove _do_not_use
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 27, 2024
1 parent 2e06afe commit 2cfb26b
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 28 deletions.
4 changes: 3 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,9 @@ message HopWindowNode {
}

message MergeNode {
// Note: `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used.
// **WARNING**: Use this field with caution.
//
// `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used.
// See `compose_fragment`.
repeated uint32 upstream_actor_id = 1;
uint32 upstream_fragment_id = 2;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,7 @@ impl CatalogController {
.one(&txn)
.await?
.context(format!("fragment {} not found", fragment_id))?;
let (_source_id, upstream_source_fragment_id, _do_not_use_upstream_actor_id) = fragment
let (_source_id, upstream_source_fragment_id) = fragment
.stream_node
.to_protobuf()
.find_source_backfill()
Expand Down Expand Up @@ -1428,7 +1428,7 @@ impl CatalogController {

let mut source_fragment_ids = HashMap::new();
for (fragment_id, _, stream_node) in fragments {
if let Some((source_id, upstream_source_fragment_id, _do_not_use_upstream_actor_id)) =
if let Some((source_id, upstream_source_fragment_id)) =
stream_node.to_protobuf().find_source_backfill()
{
source_fragment_ids
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ impl TableFragments {

for fragment in self.fragments() {
for actor in &fragment.actors {
if let Some((source_id, upstream_source_fragment_id, _upstream_actor_id)) =
if let Some((source_id, upstream_source_fragment_id)) =
actor.nodes.as_ref().unwrap().find_source_backfill()
{
source_backfill_fragments
Expand Down
7 changes: 2 additions & 5 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,11 +821,8 @@ impl ScaleController {
// SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source.
if (fragment.get_fragment_type_mask() & FragmentTypeFlag::SourceScan as u32) != 0 {
let stream_node = fragment.actor_template.nodes.as_ref().unwrap();
if let Some((
_source_id,
upstream_source_fragment_id,
_do_not_use_upstream_actor_id,
)) = stream_node.find_source_backfill()
if let Some((_source_id, upstream_source_fragment_id)) =
stream_node.find_source_backfill()
{
stream_source_backfill_fragment_ids
.insert(fragment.fragment_id, upstream_source_fragment_id);
Expand Down
16 changes: 5 additions & 11 deletions src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,27 +284,21 @@ impl stream_plan::StreamNode {

/// Find the external stream source info inside the stream node, if any.
///
/// Returns (`source_id`, `upstream_source_fragment_id`, `upstream_actor_id`).
/// Returns (`source_id`, `upstream_source_fragment_id`).
///
/// Note: we must get upstream fragment id from the merge node, not from the fragment's
/// `upstream_fragment_ids`. e.g., DynamicFilter may have 2 upstream fragments, but only
/// one is the upstream source fragment.
///
/// Note: `merge.upstream_actor_id` need to be used with caution.
/// DO NOT USE if the `StreamNode` is from `Fragment` meta model.
/// OK to use if the `StreamNode` is from `TableFragments` proto.
pub fn find_source_backfill(&self) -> Option<(u32, u32, Vec<u32>)> {
pub fn find_source_backfill(&self) -> Option<(u32, u32)> {
if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
self.node_body.as_ref()
{
if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
self.input[0].node_body.as_ref().unwrap()
{
return Some((
source.upstream_source_id,
merge.upstream_fragment_id,
merge.upstream_actor_id.clone(),
));
// Note: avoid using `merge.upstream_actor_id` to prevent misuse.
// See comments there for details.
return Some((source.upstream_source_id, merge.upstream_fragment_id));
} else {
unreachable!(
"source backfill must have a merge node as its input: {:?}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use anyhow::Result;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use risingwave_common::hash::WorkerSlotId;
use risingwave_pb::meta::table_fragments::Fragment;
use risingwave_pb::stream_plan::DispatcherType;
use risingwave_simulation::cluster::{Cluster, Configuration};
use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains};

Expand All @@ -27,15 +30,37 @@ CREATE SOURCE s(v1 int, v2 varchar) WITH (
topic='shared_source'
) FORMAT PLAIN ENCODE JSON;"#;

fn source_backfill_upstream(fragment: &Fragment) -> Vec<(u32, u32)> {
fragment
/// `Ve<(backfill_fragment_id, source_fragment_id)>`
fn source_backfill_upstream(
source_backfill_fragment: &Fragment,
source_fragment: &Fragment,
) -> Vec<(u32, u32)> {
let mut no_shuffle_downstream_to_upstream = HashMap::new();
for source_actor in &source_fragment.actors {
for dispatcher in &source_actor.dispatcher {
if dispatcher.r#type == DispatcherType::NoShuffle as i32 {
assert_eq!(dispatcher.downstream_actor_id.len(), 1);
let downstream_actor_id = dispatcher.downstream_actor_id[0];
no_shuffle_downstream_to_upstream
.insert(downstream_actor_id, source_actor.actor_id);
}
}
}

source_backfill_fragment
.actors
.iter()
.map(|actor| {
let (_, _source_fragment_id, source_actor_id) =
actor.get_nodes().unwrap().find_source_backfill().unwrap();
assert!(source_actor_id.len() == 1);
(actor.actor_id, source_actor_id[0])
.map(|backfill_actor| {
let (_, source_fragment_id) = backfill_actor
.get_nodes()
.unwrap()
.find_source_backfill()
.unwrap();
assert_eq!(source_fragment.fragment_id, source_fragment_id);
(
backfill_actor.actor_id,
no_shuffle_downstream_to_upstream[&backfill_actor.actor_id],
)
})
.collect_vec()
}
Expand All @@ -44,9 +69,16 @@ async fn validate_splits_aligned(cluster: &mut Cluster) -> Result<()> {
let source_backfill_fragment = cluster
.locate_one_fragment([identity_contains("StreamSourceScan")])
.await?;
let source_fragment = cluster
.locate_one_fragment([
identity_contains("Source"),
no_identity_contains("StreamSourceScan"),
])
.await?;
// The result of scaling is non-deterministic.
// So we just print the result here, instead of asserting with a fixed value.
let actor_upstream = source_backfill_upstream(&source_backfill_fragment.inner);
let actor_upstream =
source_backfill_upstream(&source_backfill_fragment.inner, &source_fragment.inner);
tracing::info!(
"{}",
actor_upstream
Expand Down

0 comments on commit 2cfb26b

Please sign in to comment.