-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(meta): commit finish catalog in barrier manager #17428
Conversation
166f111
to
4ef0b7b
Compare
Still WIP. Can review the idea of the refactor and general implementation. |
b84840b
to
4c82637
Compare
(Still WIP, just mark ready for review to test). |
e73f69a
to
0676389
Compare
6a90414
to
05f455b
Compare
e6f726a
to
50026d8
Compare
src/meta/src/rpc/ddl_controller.rs
Outdated
@@ -952,6 +954,10 @@ impl DdlController { | |||
&ctx.replace_table_job_info | |||
{ | |||
*target_table = Some((table.clone(), source.clone())); | |||
if let StreamingJob::Sink(_, ref mut target_table) = &mut ctx.streaming_job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to additionally set it here? Is it a previous bug, or just necessary for the feature implemented in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, we need to propagate the StreamingJob
struct into barrier manager, via the StreamingJobContext
. This creates 2 instances of the StreamingJob
object.
Hence both ctx.streaming_job
and streaming_job
itself need to be updated so they are both consistent. This is indeed a bad design, but the alternatives I have tried so far can't fix it.
I have tried to unify these, by giving ownership of streaming_job
to create_streaming_job
method.
However, it encounters stack overflow when I do this.
This should be fixed before the PR is merged. But I don't have the bandwidth at the moment, or any simple idea of how to fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The StreamingJob
struct is required by v1
metadata manager, in order to be able to commit the streaming job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please leave some comments here? Same for v2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just refactor the code. The original logic is moved to build_streaming_job
so that the logic of v1 and v2 can be consistent and can avoid duplicating the logic.
The ownership of StreamingJob
in param of related methods are refined accordingly.
@@ -97,6 +97,8 @@ pub struct ReplaceTablePlan { | |||
/// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about | |||
/// `backfill_splits`. | |||
pub init_split_assignment: SplitAssignment, | |||
pub streaming_job: StreamingJob, | |||
pub dummy_id: u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why it's named dummy_id
? What does it mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And any reason to add these information in the ReplaceTablePlan
command? I think in all commands that need to track the status of backfill, there should be a CreateStreamingJob
command, and we can get the StreamingJob
information there. What ddl would only have the ReplaceTablePlan
command and we have to store the StreamingJob
here in the ReplaceTablePlan
command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the dummy_id
the same as the id of new_table_fragments
?
src/meta/src/manager/catalog/mod.rs
Outdated
/// Catalogs which were marked as finished and committed. | ||
/// But the `DdlController` has not instantiated notification channel. | ||
/// Once notified, we can remove the entry from this map. | ||
pub table_id_to_version: HashMap<TableId, MetaResult<NotificationVersion>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this table_id_to_version
to store the pending info. table_id_to_tx
should be enough. We can check whether a streaming job has finished by directly checking job status in the DatabaseManager
.
Ideally things can work like this
- when register a tx on a table id, acquire lock and check the job status. If job is creating, store the tx. If the job has been created, return with ok. If the job does not exist, return err to tell the user about it.
- On
abort_all
, notify all tx with err. - Every drop/cancel streaming job ddl should remove the tx of the dropped/cancelled streaming job id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to propagate the result and notify the version still, because FE will wait on this version. Knowing that a job is done creating is insufficient.
It exists because the catalog manager may commit before stream manager calls wait_streaming_job
. In such a scenario, catalog manager needs to still notify the stream manager of the version and error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to propagate the result and notify the version still, because FE will wait on this version. Knowing that a job is done creating is insufficient.
Shall we just return the global latest notification version instead of the version at the moment the job turned created, so that we don't need to store the version per table. cc @yezizp2012
It exists because the catalog manager may commit before stream manager calls wait_streaming_job. In such a scenario, catalog manager needs to still notify the stream manager of the version and error.
I think this corner case can be covered in my comment above. If the catalog manager has committed before wait_streaming_job
, there are 3 possibilities:
- job has been created, and then we just return ok, with the global latest notification version
- failed to collect the first barrier. Stream manager should have got an error before calling
wait_streaming_job
, no need to handle. - First barrier collected, and job status must have been turned into
Creating
. Even though there can be error and recovery happened, we can just store the tx anyway. It will be notified eitherOk
when the job finishes backfill afterward, orErr
when a later error triggers a recovery and callabort_all
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @yezizp2012 @shanicky PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally LGTM, we might need more refactors later.
@@ -97,6 +97,8 @@ pub struct ReplaceTablePlan { | |||
/// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about | |||
/// `backfill_splits`. | |||
pub init_split_assignment: SplitAssignment, | |||
pub streaming_job: StreamingJob, | |||
pub dummy_id: u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the dummy_id
the same as the id of new_table_fragments
?
@@ -63,31 +60,10 @@ impl Notifier { | |||
} | |||
} | |||
|
|||
/// Notify when we have finished a barrier from all actors. This function consumes `self`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, we can change to consume the ownership of notifier for method notify_collected
because it is expected there won't be call on the notifier after calling it.
src/meta/src/rpc/ddl_controller.rs
Outdated
@@ -952,6 +954,10 @@ impl DdlController { | |||
&ctx.replace_table_job_info | |||
{ | |||
*target_table = Some((table.clone(), source.clone())); | |||
if let StreamingJob::Sink(_, ref mut target_table) = &mut ctx.streaming_job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please leave some comments here? Same for v2.
|
||
let version = mgr | ||
.catalog_controller | ||
.finish_streaming_job(stream_job_id as _, replace_table_job_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also update the doc on create
and finish
showing that their callers are different managers?
May I know the status after this PR? Do we notify the frontend after barrier collection or the creation finished? It appears that after #17484 the frontend can resolve the streaming jobs that are still being created. Also, is this task related to #12771? |
Yeah it will be required by #12771.
Nope we will still notify it immediately on committing the @wenym1 will be doing further work on meta, as part of supporting logstore based backfill. So we cleanup some code first, to make it easier for him to do this work. The main goal is to cleanup the logic of finishing stream jobs, and centralize that all in barrier manager, and also cleanup some logic (e.g. notification channels) which were only used to monitor stream jobs, since these can be monitored by stream manager instead. |
I think the behavior to frontend is the same as before, which is to wait for streaming job finish before return from @kwannoel Can you elaborate more on the motivation of the original PR? |
It's originally intended for create MV on creating MV. But I'm not really prioritizing it recently, because it's not that simple to implement, and may be further complicated with logstore based backfill. It can also cleanup some code in meta while doing so. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Move committing metadata for finished stream jobs into barrier manager. This will unify the paths for recovered and newly created stream jobs.
Then we can let stream manager directly subscribe to catalog manager. And we no longer need the
finished
channel in our notifiers.Before
After
Then just wait for catalog to be committed / marked as finished.
This means that when
run_command
finishes, it means the initial barrier for the command is collected. With this info we can immediately commit the catalog, and make it visible to the fe at this point. This step will be done in a separate PR (making the catalog visible).Unresolved:
finish
stream job, for v1 we need to pass the entireStreamingJob
struct currently.TrackingJob::pre_finish
for these logics.Command::CreateStreamingJob
, to avoid redundant cloning / moving of theStreamingJob
struct.credit: @wenym1 for this idea and the discussions.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.