Skip to content

Commit

Permalink
fix scaling
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 25, 2024
1 parent 860e859 commit 6b449e0
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 101 deletions.
13 changes: 11 additions & 2 deletions e2e_test/source_inline/kafka/issue_19563.slt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
control substitution on

system ok
rpk topic create test-topic-19563
rpk topic create test-topic-19563 -p 6

statement ok
CREATE SOURCE kafkasource (
Expand All @@ -15,7 +15,7 @@ WITH (
timestamptz.handling.mode = 'utc_without_suffix'
);

# Note that StreamSourceScan is in the StreamDynamicFilter fragment, which has 2 upstream fragments.
# Note that StreamSourceScan is in the StreamDynamicFilter fragment, which has 3 upstream fragments.
query T
explain create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000;
----
Expand All @@ -31,6 +31,12 @@ StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_co
└─StreamProject { exprs: [AddWithTimeZone(now, '730000 days':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└─StreamNow


query I
select array_length(upstream_fragment_ids) from rw_fragments where array_contains(flags, Array['SOURCE_SCAN']);
----
3

# The following test is adapted from `temporal_filter.slt`.

# This statement should be correct for the next ~1000 years
Expand Down Expand Up @@ -58,6 +64,9 @@ select * from mv1 order by v1;
3031-01-01 21:00:00+00:00


halt


statement ok
DROP SOURCE kafkasource CASCADE;

Expand Down
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ message HopWindowNode {
}

message MergeNode {
// Note: `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used.
// See `compose_fragment`.
repeated uint32 upstream_actor_id = 1;
uint32 upstream_fragment_id = 2;
// Type of the upstream dispatcher. If there's always one upstream according to this
Expand Down
15 changes: 7 additions & 8 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow:

for actor in &fragment.actors {
if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) {
let num_splits = splits.len();
let splits = splits
.iter()
.map(|split| SplitImpl::try_from(split).unwrap())
.map(|split| split.id())
.collect_vec()
.join(",");
actor_splits_map.insert(actor.actor_id, (splits.len(), splits));
actor_splits_map.insert(actor.actor_id, (num_splits, splits));
}
}
}
Expand Down Expand Up @@ -122,14 +123,12 @@ pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow:
},
split_count,
splits,
// FIXME: wrong
if !actor.upstream_actor_id.is_empty() {
assert!(
actor.upstream_actor_id.len() == 1,
"should have only one upstream actor, got {actor:?}"
);
let upstream_splits =
actor_splits_map.get(&actor.upstream_actor_id[0]).unwrap();
let upstream_splits = actor
.upstream_actor_id
.iter()
.find_map(|id| actor_splits_map.get(id))
.expect("should have one upstream source actor");
format!(
" <- Upstream Actor{}: [{}]",
if ignore_id {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct Model {
pub job_id: ObjectId,
pub fragment_type_mask: i32,
pub distribution_type: DistributionType,
/// Note: the `StreamNode` is different from the final plan node used by actors.
/// Specifically, `Merge` nodes' `upstream_actor_id` will be filled. (See `compose_fragment`)
pub stream_node: StreamNode,
pub state_table_ids: I32Array,
pub upstream_fragment_id: I32Array,
Expand Down
37 changes: 31 additions & 6 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1292,22 +1292,47 @@ impl CatalogController {
Ok(actors)
}

/// Get the actor ids, and each actor's upstream actor ids of the fragment with `fragment_id` with `Running` status.
pub async fn get_running_actors_and_upstream_of_fragment(
/// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
pub async fn get_running_actors_for_source_backfill(
&self,
fragment_id: FragmentId,
) -> MetaResult<Vec<(ActorId, ActorUpstreamActors)>> {
) -> MetaResult<Vec<(ActorId, ActorId)>> {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;
let fragment = Fragment::find_by_id(fragment_id)
.one(&txn)
.await?
.context(format!("fragment {} not found", fragment_id))?;
let (_source_id, upstream_source_fragment_id, _do_not_use_upstream_actor_id) = fragment
.stream_node
.to_protobuf()
.find_source_backfill()
.unwrap();
let actors: Vec<(ActorId, ActorUpstreamActors)> = Actor::find()
.select_only()
.column(actor::Column::ActorId)
.column(actor::Column::UpstreamActorIds)
.filter(actor::Column::FragmentId.eq(fragment_id))
.filter(actor::Column::Status.eq(ActorStatus::Running))
.into_tuple()
.all(&inner.db)
.all(&txn)
.await?;
Ok(actors)
Ok(actors
.into_iter()
.map(|(actor_id, upstream_actor_ids)| {
let upstream_source_actors =
&upstream_actor_ids.0[&(upstream_source_fragment_id as i32)];
assert_eq!(
upstream_source_actors.len(),
1,
"expect only one upstream source actor, but got {:?}, actor_id: {}, fragment_id: {}",
upstream_source_actors,
actor_id,
fragment_id
);
(actor_id, upstream_source_actors[0])
})
.collect())
}

pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
Expand Down Expand Up @@ -1469,7 +1494,7 @@ impl CatalogController {

let mut source_fragment_ids = HashMap::new();
for (fragment_id, _, stream_node) in fragments {
if let Some((source_id, upstream_source_fragment_id)) =
if let Some((source_id, upstream_source_fragment_id, _do_not_use_upstream_actor_id)) =
stream_node.to_protobuf().find_source_backfill()
{
source_fragment_ids
Expand Down
17 changes: 4 additions & 13 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,26 +571,17 @@ impl MetadataManager {
Ok(actor_ids.into_iter().map(|id| id as ActorId).collect())
}

pub async fn get_running_actors_and_upstream_actors_of_fragment(
pub async fn get_running_actors_for_source_backfill(
&self,
id: FragmentId,
) -> MetaResult<HashSet<(ActorId, Vec<ActorId>)>> {
) -> MetaResult<HashSet<(ActorId, ActorId)>> {
let actor_ids = self
.catalog_controller
.get_running_actors_and_upstream_of_fragment(id as _)
.get_running_actors_for_source_backfill(id as _)
.await?;
Ok(actor_ids
.into_iter()
.map(|(id, actors)| {
(
id as ActorId,
actors
.into_inner()
.into_iter()
.flat_map(|(_, ids)| ids.into_iter().map(|id| id as ActorId))
.collect(),
)
})
.map(|(id, upstream)| (id as ActorId, upstream as ActorId))
.collect())
}

Expand Down
6 changes: 5 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,14 +423,18 @@ impl StreamJobFragments {
source_fragments
}

/// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)).
///
/// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments,
/// but only one of them is the upstream source fragment, which is what we return.
pub fn source_backfill_fragments(
&self,
) -> MetadataModelResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
let mut source_backfill_fragments = HashMap::new();

for fragment in self.fragments() {
for actor in &fragment.actors {
if let Some((source_id, upstream_source_fragment_id)) =
if let Some((source_id, upstream_source_fragment_id, _upstream_actor_id)) =
actor.nodes.as_ref().unwrap().find_source_backfill()
{
source_backfill_fragments
Expand Down
25 changes: 14 additions & 11 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ pub struct RescheduleContext {
upstream_dispatchers: HashMap<ActorId, Vec<(FragmentId, DispatcherId, DispatcherType)>>,
/// Fragments with `StreamSource`
stream_source_fragment_ids: HashSet<FragmentId>,
/// Fragments with `StreamSourceBackfill`
stream_source_backfill_fragment_ids: HashSet<FragmentId>,
/// Fragments with `StreamSourceBackfill` and the corresponding upstream source fragment
stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
/// Target fragments in `NoShuffle` relation
no_shuffle_target_fragment_ids: HashSet<FragmentId>,
/// Source fragments in `NoShuffle` relation
Expand Down Expand Up @@ -696,7 +696,7 @@ impl ScaleController {
}

let mut stream_source_fragment_ids = HashSet::new();
let mut stream_source_backfill_fragment_ids = HashSet::new();
let mut stream_source_backfill_fragment_ids = HashMap::new();
let mut no_shuffle_reschedule = HashMap::new();
for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
let fragment = fragment_map
Expand Down Expand Up @@ -821,8 +821,14 @@ impl ScaleController {
// SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source.
if (fragment.get_fragment_type_mask() & FragmentTypeFlag::SourceScan as u32) != 0 {
let stream_node = fragment.actor_template.nodes.as_ref().unwrap();
if stream_node.find_source_backfill().is_some() {
stream_source_backfill_fragment_ids.insert(fragment.fragment_id);
if let Some((
_source_id,
upstream_source_fragment_id,
_do_not_use_upstream_actor_id,
)) = stream_node.find_source_backfill()
{
stream_source_backfill_fragment_ids
.insert(fragment.fragment_id, upstream_source_fragment_id);
}
}
}
Expand Down Expand Up @@ -1257,17 +1263,14 @@ impl ScaleController {
for fragment_id in reschedules.keys() {
let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];

if ctx
.stream_source_backfill_fragment_ids
.contains(fragment_id)
if let Some(upstream_source_fragment_id) =
ctx.stream_source_backfill_fragment_ids.get(fragment_id)
{
let fragment = &ctx.fragment_map[fragment_id];

let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();

let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
*fragment_id,
&fragment.upstream_fragment_ids,
*upstream_source_fragment_id,
&curr_actor_ids,
&fragment_actor_splits,
&no_shuffle_upstream_actor_map,
Expand Down
43 changes: 20 additions & 23 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl SourceManagerCore {
};
let actors = match self
.metadata_manager
.get_running_actors_and_upstream_actors_of_fragment(*fragment_id)
.get_running_actors_for_source_backfill(*fragment_id)
.await
{
Ok(actors) => {
Expand Down Expand Up @@ -653,20 +653,16 @@ where
}

fn align_backfill_splits(
backfill_actors: impl IntoIterator<Item = (ActorId, Vec<ActorId>)>,
backfill_actors: impl IntoIterator<Item = (ActorId, ActorId)>,
upstream_assignment: &HashMap<ActorId, Vec<SplitImpl>>,
fragment_id: FragmentId,
upstream_fragment_id: FragmentId,
upstream_source_fragment_id: FragmentId,
) -> anyhow::Result<HashMap<ActorId, Vec<SplitImpl>>> {
backfill_actors
.into_iter()
.map(|(actor_id, upstream_actor_id)| {
// FIXME: wrong
let err = || anyhow::anyhow!("source backfill actor should have one upstream actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, actor_id: {actor_id}, upstream_assignment: {upstream_assignment:?}, upstream_actor_id: {upstream_actor_id:?}");
if upstream_actor_id.len() != 1 {
return Err(err());
}
let Some(splits) = upstream_assignment.get(&upstream_actor_id[0]) else {
let err = || anyhow::anyhow!("source backfill actor should have one upstream source actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_assignment: {upstream_assignment:?}, upstream_actor_id: {upstream_actor_id:?}");
let Some(splits) = upstream_assignment.get(&upstream_actor_id) else {
return Err(err());
};
Ok((
Expand Down Expand Up @@ -845,36 +841,37 @@ impl SourceManager {
pub fn migrate_splits_for_backfill_actors(
&self,
fragment_id: FragmentId,
upstream_fragment_ids: &Vec<FragmentId>,
upstream_source_fragment_id: FragmentId,
curr_actor_ids: &[ActorId],
fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>,
no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
// align splits for backfill fragments with its upstream source fragment
debug_assert!(upstream_fragment_ids.len() == 1);
let upstream_fragment_id = upstream_fragment_ids[0];
let actors = no_shuffle_upstream_actor_map
.iter()
.filter(|(id, _)| curr_actor_ids.contains(id))
.map(|(id, upstream_fragment_actors)| {
debug_assert!(upstream_fragment_actors.len() == 1);
(
*id,
vec![*upstream_fragment_actors.get(&upstream_fragment_id).unwrap()],
*upstream_fragment_actors
.get(&upstream_source_fragment_id)
.unwrap(),
)
});
let upstream_assignment = fragment_actor_splits.get(&upstream_fragment_id).unwrap();
let upstream_assignment = fragment_actor_splits
.get(&upstream_source_fragment_id)
.unwrap();
tracing::info!(
fragment_id,
upstream_fragment_id,
upstream_source_fragment_id,
?upstream_assignment,
"migrate_splits_for_backfill_actors"
);
Ok(align_backfill_splits(
actors,
upstream_assignment,
fragment_id,
upstream_fragment_id,
upstream_source_fragment_id,
)?)
}

Expand Down Expand Up @@ -961,19 +958,19 @@ impl SourceManager {
let mut assigned = HashMap::new();

for (_source_id, fragments) in source_backfill_fragments {
for (fragment_id, upstream_fragment_id) in fragments {
for (fragment_id, upstream_source_fragment_id) in fragments {
let upstream_actors = core
.metadata_manager
.get_running_actors_of_fragment(upstream_fragment_id)
.get_running_actors_of_fragment(upstream_source_fragment_id)
.await?;
let mut backfill_actors = vec![];
for upstream_actor in upstream_actors {
if let Some(dispatchers) = dispatchers.get(&upstream_actor) {
let err = || {
anyhow::anyhow!(
"source backfill fragment's upstream fragment should have one dispatcher, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, upstream_actor: {upstream_actor}, dispatchers: {dispatchers:?}",
"source backfill fragment's upstream fragment should have one dispatcher, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, dispatchers: {dispatchers:?}",
fragment_id = fragment_id,
upstream_fragment_id = upstream_fragment_id,
upstream_source_fragment_id = upstream_source_fragment_id,
upstream_actor = upstream_actor,
dispatchers = dispatchers
)
Expand All @@ -983,7 +980,7 @@ impl SourceManager {
}

backfill_actors
.push((dispatchers[0].downstream_actor_id[0], vec![upstream_actor]));
.push((dispatchers[0].downstream_actor_id[0], upstream_actor));
}
}
assigned.insert(
Expand All @@ -992,7 +989,7 @@ impl SourceManager {
backfill_actors,
upstream_assignment,
fragment_id,
upstream_fragment_id,
upstream_source_fragment_id,
)?,
);
}
Expand Down
Loading

0 comments on commit 6b449e0

Please sign in to comment.