Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support inject and collect barrier from partial graph #17758

Merged
merged 24 commits into from
Jul 30, 2024

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Jul 19, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Currently on CN, barriers are injected and collected together on a global streaming graph. When supporting partial checkpoint, we may need to support injecting and collecting barriers independently from different partial graphs.

We currently have ManagedBarrierState to manage the barrier injected and collected for the global streaming graph. To support the barrier management of different partial graphs, we can reuse this ManagedBarrierState and assign one to each partial graph. In this PR, this ManagedBarrierState is renamed to PartialGraphManagedBarrierState. The original ManagedBarrierState will change to hold the state of all partial graphs as HashMap<PartialGraphId, PartialGraphManagedBarrierState>. In later partial checkpoint based backfill implementation #17735, the PartialGraphId of the global streaming graph is u32::MAX, and the one of each creating streaming job is the table_id of the TableFragments.

Actors can be firstly in the partial graph of a creating streaming job, and then gets merged to the global partial graph when the streaming job is created, which means the partial graph that an actor belongs to is not fixed. Therefore, in this PR we introduce a struct InflightActorState, which is an extension to the previous ActorMutationSubscribers, and add a field inflight_barriers: BTreeMap<u64, (PartialGraphId, Option<Arc<Mutation>>)> to store the inflight barrier of the actor, and also the partial graph id that the actor belongs to in different epochs.

Previously the InjectBarrierRequest in the StreamingControlStream rpc carries the barrier collection information of the whole streaming graph. In this PR, we add a new field partial_graph_id in the InjectBarrierRequest, and the original barrier collection information will be carried per partial graph. This PR is just a refactor PR. On meta node global barrier manager, things are still managed on a single global streaming graph, and therefore the partial_graph_id is always u32::MAX.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@kwannoel
Copy link
Contributor

You can add a PR description of the conceptual and implementation design, and ping me when it's ready for review. Seems like there's still changes happening.

@wenym1 wenym1 changed the title feat: support inject and barrier from partial graph feat: support inject and collect barrier from partial graph Jul 26, 2024
@wenym1
Copy link
Contributor Author

wenym1 commented Jul 26, 2024

You can add a PR description of the conceptual and implementation design, and ping me when it's ready for review. Seems like there's still changes happening.

@kwannoel @BugenZhao @yezizp2012 The PR is ready for review now. I just updated the PR description to add some implementation-wise descriptions. PTAL.

@graphite-app graphite-app bot requested a review from a team July 26, 2024 06:54
@wenym1
Copy link
Contributor Author

wenym1 commented Jul 29, 2024

In later development, I found that we don't have scenarios to inject barrier to multiple partial graph in a single request. Therefore, I remove the newly introduced PartialGraphInfo, and put the fields back to InjectBarrierRequest and add a new field partial_graph_id to it.

@wenym1 wenym1 requested review from hzxa21 and kwannoel July 29, 2024 07:52
@@ -173,7 +173,7 @@ pub struct FragmentManager {
core: RwLock<FragmentManagerCore>,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct InflightFragmentInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some docs about the lifetime and usage of this struct?

Like when it is constructed, what it is used for, when it is dropped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The InflightFragmentInfo is just a plain info rather than some states, so it does not have any lifetime transition. Do you mean InflightActorState?

@kwannoel
Copy link
Contributor

In later partial checkpoint based backfill implementation #17735, the PartialGraphId of the global streaming graph is u32::MAX, and the one of each creating streaming job is the table_id of the TableFragments.

Why is there still a "global streaming graph", I thought after the partial ckpt based implementation there shouldn't be anymore global graph, and instead just a set of partial graphs?

@wenym1
Copy link
Contributor Author

wenym1 commented Jul 30, 2024

In later partial checkpoint based backfill implementation #17735, the PartialGraphId of the global streaming graph is u32::MAX, and the one of each creating streaming job is the table_id of the TableFragments.

Why is there still a "global streaming graph", I thought after the partial ckpt based implementation there shouldn't be anymore global graph, and instead just a set of partial graphs?

Since in frontend node we still manage a global snapshot, all the created streaming graphs still shares a single global streaming graph, and barriers are injected and collected together at this global streaming graph. The creating streaming job will do the backfill in an individual partial graph, and after finish creating, it will join the global streaming graph of the collected streaming jobs. Therefore, we still need this global streaming graph to maintain the global snapshot.

let mut node_to_collect = control_stream_manager.inject_barrier(
&command_ctx,
&info.fragment_infos,
Some(&info.fragment_infos),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we pass the same info twice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two parameters have different semantics. It just happens to be the same one in recovery.

"different stop epoch"
);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this change here in relation to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous assertion was hit in some previous commits and get fixed in a later commit.

This assertion was a insignificant sanity check, and I am concerned it might be too strict in production env and cause unnecessary panic, so I changed to having a warning log.

Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Contributor

@kwannoel kwannoel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM. @yezizp2012 could you PTAL as well?

@BugenZhao
Copy link
Member

I would also like to take a look later. 🙏

@wenym1
Copy link
Contributor Author

wenym1 commented Jul 30, 2024

I would also like to take a look later. 🙏

Will merge the PR first to unblock further development. You can leave some comments on the PR and I will address it in later PRs.

@wenym1 wenym1 added this pull request to the merge queue Jul 30, 2024
@yezizp2012
Copy link
Member

Rest LGTM. @yezizp2012 could you PTAL as well?

Yes sure. I will take a look tmr. 🫡

Merged via the queue into main with commit 2fb78f0 Jul 30, 2024
33 of 34 checks passed
@wenym1 wenym1 deleted the yiming/local-partial-graph-barrier-manager branch July 30, 2024 10:37
Copy link
Member

@yezizp2012 yezizp2012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants