-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support inject and collect barrier from partial graph #17758
Conversation
You can add a PR description of the conceptual and implementation design, and ping me when it's ready for review. Seems like there's still changes happening. |
@kwannoel @BugenZhao @yezizp2012 The PR is ready for review now. I just updated the PR description to add some implementation-wise descriptions. PTAL. |
In later development, I found that we don't have scenarios to inject barrier to multiple partial graph in a single request. Therefore, I remove the newly introduced |
@@ -173,7 +173,7 @@ pub struct FragmentManager { | |||
core: RwLock<FragmentManagerCore>, | |||
} | |||
|
|||
#[derive(Clone)] | |||
#[derive(Clone, Debug)] | |||
pub struct InflightFragmentInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add some docs about the lifetime and usage of this struct?
Like when it is constructed, what it is used for, when it is dropped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The InflightFragmentInfo
is just a plain info rather than some states, so it does not have any lifetime transition. Do you mean InflightActorState
?
Why is there still a "global streaming graph", I thought after the partial ckpt based implementation there shouldn't be anymore global graph, and instead just a set of partial graphs? |
Since in frontend node we still manage a global snapshot, all the created streaming graphs still shares a single global streaming graph, and barriers are injected and collected together at this global streaming graph. The creating streaming job will do the backfill in an individual partial graph, and after finish creating, it will join the global streaming graph of the collected streaming jobs. Therefore, we still need this global streaming graph to maintain the global snapshot. |
let mut node_to_collect = control_stream_manager.inject_barrier( | ||
&command_ctx, | ||
&info.fragment_infos, | ||
Some(&info.fragment_infos), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we pass the same info twice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two parameters have different semantics. It just happens to be the same one in recovery.
"different stop epoch" | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this change here in relation to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous assertion was hit in some previous commits and get fixed in a later commit.
This assertion was a insignificant sanity check, and I am concerned it might be too strict in production env and cause unnecessary panic, so I changed to having a warning log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM. @yezizp2012 could you PTAL as well?
I would also like to take a look later. 🙏 |
Will merge the PR first to unblock further development. You can leave some comments on the PR and I will address it in later PRs. |
Yes sure. I will take a look tmr. 🫡 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Currently on CN, barriers are injected and collected together on a global streaming graph. When supporting partial checkpoint, we may need to support injecting and collecting barriers independently from different partial graphs.
We currently have
ManagedBarrierState
to manage the barrier injected and collected for the global streaming graph. To support the barrier management of different partial graphs, we can reuse thisManagedBarrierState
and assign one to each partial graph. In this PR, thisManagedBarrierState
is renamed toPartialGraphManagedBarrierState
. The originalManagedBarrierState
will change to hold the state of all partial graphs asHashMap<PartialGraphId, PartialGraphManagedBarrierState>
. In later partial checkpoint based backfill implementation #17735, thePartialGraphId
of the global streaming graph isu32::MAX
, and the one of each creating streaming job is thetable_id
of theTableFragments
.Actors can be firstly in the partial graph of a creating streaming job, and then gets merged to the global partial graph when the streaming job is created, which means the partial graph that an actor belongs to is not fixed. Therefore, in this PR we introduce a struct
InflightActorState
, which is an extension to the previousActorMutationSubscribers
, and add a fieldinflight_barriers: BTreeMap<u64, (PartialGraphId, Option<Arc<Mutation>>)>
to store the inflight barrier of the actor, and also the partial graph id that the actor belongs to in different epochs.Previously the
InjectBarrierRequest
in theStreamingControlStream
rpc carries the barrier collection information of the whole streaming graph. In this PR, we add a new fieldpartial_graph_id
in theInjectBarrierRequest
, and the original barrier collection information will be carried per partial graph. This PR is just a refactor PR. On meta node global barrier manager, things are still managed on a single global streaming graph, and therefore thepartial_graph_id
is alwaysu32::MAX
.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.