Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): commit finish catalog in barrier manager #17428

Merged
merged 44 commits into from
Jul 16, 2024

Conversation

kwannoel
Copy link
Contributor

@kwannoel kwannoel commented Jun 24, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Screenshot 2024-06-27 at 3 14 47 PM

Move committing metadata for finished stream jobs into barrier manager. This will unify the paths for recovered and newly created stream jobs.

Then we can let stream manager directly subscribe to catalog manager. And we no longer need the finished channel in our notifiers.

Before

struct Notifier {
    pub started: Option<oneshot::Sender<BarrierInfo>>,
    pub collected: Option<oneshot::Sender<MetaResult<()>>>,
    pub finished: Option<oneshot::Sender<MetaResult<()>>>,
}

After

struct Notifier {
    pub started: Option<oneshot::Sender<BarrierInfo>>,
    pub collected: Option<oneshot::Sender<MetaResult<()>>>,
}

Then just wait for catalog to be committed / marked as finished.

This means that when run_command finishes, it means the initial barrier for the command is collected. With this info we can immediately commit the catalog, and make it visible to the fe at this point. This step will be done in a separate PR (making the catalog visible).

Unresolved:

  • In order for the barrier manager to be able to finish stream job, for v1 we need to pass the entire StreamingJob struct currently.
  • This is not necessary for v2, where we only need the streaming job id, and table to replace (if any). You may look at TrackingJob::pre_finish for these logics.
  • As such perhaps we should separate these parameters being passed to the Command::CreateStreamingJob, to avoid redundant cloning / moving of the StreamingJob struct.

credit: @wenym1 for this idea and the discussions.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@kwannoel
Copy link
Contributor Author

Still WIP. Can review the idea of the refactor and general implementation.
There's still some optimizations (e.g. lots of clone used) and missing functionality, so please ignore these first.

@kwannoel kwannoel changed the title commit finish catalog in barrier manager refactor(meta): commit finish catalog in barrier manager Jun 26, 2024
@kwannoel kwannoel force-pushed the kwannoel/commit-in-barrier branch 2 times, most recently from b84840b to 4c82637 Compare July 3, 2024 10:12
@kwannoel
Copy link
Contributor Author

kwannoel commented Jul 3, 2024

(Still WIP, just mark ready for review to test).

@kwannoel kwannoel marked this pull request as ready for review July 3, 2024 13:29
@kwannoel kwannoel force-pushed the kwannoel/commit-in-barrier branch 3 times, most recently from e73f69a to 0676389 Compare July 5, 2024 08:18
@kwannoel kwannoel force-pushed the kwannoel/commit-in-barrier branch from 6a90414 to 05f455b Compare July 5, 2024 10:28
@kwannoel kwannoel force-pushed the kwannoel/commit-in-barrier branch from e6f726a to 50026d8 Compare July 7, 2024 12:47
@@ -952,6 +954,10 @@ impl DdlController {
&ctx.replace_table_job_info
{
*target_table = Some((table.clone(), source.clone()));
if let StreamingJob::Sink(_, ref mut target_table) = &mut ctx.streaming_job
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to additionally set it here? Is it a previous bug, or just necessary for the feature implemented in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, we need to propagate the StreamingJob struct into barrier manager, via the StreamingJobContext. This creates 2 instances of the StreamingJob object.

Hence both ctx.streaming_job and streaming_job itself need to be updated so they are both consistent. This is indeed a bad design, but the alternatives I have tried so far can't fix it.

I have tried to unify these, by giving ownership of streaming_job to create_streaming_job method.
However, it encounters stack overflow when I do this.

This should be fixed before the PR is merged. But I don't have the bandwidth at the moment, or any simple idea of how to fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StreamingJob struct is required by v1 metadata manager, in order to be able to commit the streaming job.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please leave some comments here? Same for v2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just refactor the code. The original logic is moved to build_streaming_job so that the logic of v1 and v2 can be consistent and can avoid duplicating the logic.

The ownership of StreamingJob in param of related methods are refined accordingly.

src/meta/src/barrier/progress.rs Show resolved Hide resolved
@@ -97,6 +97,8 @@ pub struct ReplaceTablePlan {
/// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
/// `backfill_splits`.
pub init_split_assignment: SplitAssignment,
pub streaming_job: StreamingJob,
pub dummy_id: u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why it's named dummy_id? What does it mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And any reason to add these information in the ReplaceTablePlan command? I think in all commands that need to track the status of backfill, there should be a CreateStreamingJob command, and we can get the StreamingJob information there. What ddl would only have the ReplaceTablePlan command and we have to store the StreamingJob here in the ReplaceTablePlan command?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the dummy_id the same as the id of new_table_fragments?

/// Catalogs which were marked as finished and committed.
/// But the `DdlController` has not instantiated notification channel.
/// Once notified, we can remove the entry from this map.
pub table_id_to_version: HashMap<TableId, MetaResult<NotificationVersion>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this table_id_to_version to store the pending info. table_id_to_tx should be enough. We can check whether a streaming job has finished by directly checking job status in the DatabaseManager.

Ideally things can work like this

  1. when register a tx on a table id, acquire lock and check the job status. If job is creating, store the tx. If the job has been created, return with ok. If the job does not exist, return err to tell the user about it.
  2. On abort_all, notify all tx with err.
  3. Every drop/cancel streaming job ddl should remove the tx of the dropped/cancelled streaming job id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to propagate the result and notify the version still, because FE will wait on this version. Knowing that a job is done creating is insufficient.

It exists because the catalog manager may commit before stream manager calls wait_streaming_job. In such a scenario, catalog manager needs to still notify the stream manager of the version and error.

Copy link
Contributor

@wenym1 wenym1 Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to propagate the result and notify the version still, because FE will wait on this version. Knowing that a job is done creating is insufficient.

Shall we just return the global latest notification version instead of the version at the moment the job turned created, so that we don't need to store the version per table. cc @yezizp2012

It exists because the catalog manager may commit before stream manager calls wait_streaming_job. In such a scenario, catalog manager needs to still notify the stream manager of the version and error.

I think this corner case can be covered in my comment above. If the catalog manager has committed before wait_streaming_job, there are 3 possibilities:

  • job has been created, and then we just return ok, with the global latest notification version
  • failed to collect the first barrier. Stream manager should have got an error before calling wait_streaming_job, no need to handle.
  • First barrier collected, and job status must have been turned into Creating. Even though there can be error and recovery happened, we can just store the tx anyway. It will be notified either Ok when the job finishes backfill afterward, or Err when a later error triggers a recovery and call abort_all.

Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @yezizp2012 @shanicky PTAL

Copy link
Member

@yezizp2012 yezizp2012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally LGTM, we might need more refactors later.

@@ -97,6 +97,8 @@ pub struct ReplaceTablePlan {
/// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
/// `backfill_splits`.
pub init_split_assignment: SplitAssignment,
pub streaming_job: StreamingJob,
pub dummy_id: u32,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the dummy_id the same as the id of new_table_fragments?

@@ -63,31 +60,10 @@ impl Notifier {
}
}

/// Notify when we have finished a barrier from all actors. This function consumes `self`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we can change to consume the ownership of notifier for method notify_collected because it is expected there won't be call on the notifier after calling it.

src/meta/src/controller/streaming_job.rs Outdated Show resolved Hide resolved
src/meta/src/controller/catalog.rs Show resolved Hide resolved
src/meta/src/manager/catalog/database.rs Show resolved Hide resolved
src/meta/src/manager/streaming_job.rs Outdated Show resolved Hide resolved
@@ -952,6 +954,10 @@ impl DdlController {
&ctx.replace_table_job_info
{
*target_table = Some((table.clone(), source.clone()));
if let StreamingJob::Sink(_, ref mut target_table) = &mut ctx.streaming_job
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please leave some comments here? Same for v2.

src/meta/src/stream/stream_manager.rs Show resolved Hide resolved

let version = mgr
.catalog_controller
.finish_streaming_job(stream_job_id as _, replace_table_job_info)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also update the doc on create and finish showing that their callers are different managers?

@BugenZhao
Copy link
Member

With this info we can immediately commit the catalog, and make it visible to the fe at this point. This step will be done in a separate PR (making the catalog visible).

May I know the status after this PR? Do we notify the frontend after barrier collection or the creation finished? It appears that after #17484 the frontend can resolve the streaming jobs that are still being created.

Also, is this task related to #12771?

@kwannoel
Copy link
Contributor Author

With this info we can immediately commit the catalog, and make it visible to the fe at this point. This step will be done in a separate PR (making the catalog visible).

May I know the status after this PR? Do we notify the frontend after barrier collection or the creation finished? It appears that after #17484 the frontend can resolve the streaming jobs that are still being created.

Also, is this task related to #12771?

Yeah it will be required by #12771.

May I know the status after this PR? Do we notify the frontend after barrier collection or the creation finished? It appears that after #17484 the frontend can resolve the streaming jobs that are still being created.

Nope we will still notify it immediately on committing the creating catalogs at the start, when preparing stream jobs, not after barrier collection. This PR does not change that.

@wenym1 will be doing further work on meta, as part of supporting logstore based backfill. So we cleanup some code first, to make it easier for him to do this work. The main goal is to cleanup the logic of finishing stream jobs, and centralize that all in barrier manager, and also cleanup some logic (e.g. notification channels) which were only used to monitor stream jobs, since these can be monitored by stream manager instead.

@wenym1
Copy link
Contributor

wenym1 commented Jul 16, 2024

With this info we can immediately commit the catalog, and make it visible to the fe at this point. This step will be done in a separate PR (making the catalog visible).

May I know the status after this PR? Do we notify the frontend after barrier collection or the creation finished? It appears that after #17484 the frontend can resolve the streaming jobs that are still being created.

Also, is this task related to #12771?

I think the behavior to frontend is the same as before, which is to wait for streaming job finish before return from create_streaming_job. This PR only changes the way we get notified on the finish. It's an alternative implementation to #17297.

@kwannoel Can you elaborate more on the motivation of the original PR?

@kwannoel
Copy link
Contributor Author

kwannoel commented Jul 16, 2024

@kwannoel Can you elaborate more on the motivation of the original PR?

It's originally intended for create MV on creating MV. But I'm not really prioritizing it recently, because it's not that simple to implement, and may be further complicated with logstore based backfill.

It can also cleanup some code in meta while doing so.

@wenym1 wenym1 added this pull request to the merge queue Jul 16, 2024
Merged via the queue into main with commit 10220ed Jul 16, 2024
31 of 32 checks passed
@wenym1 wenym1 deleted the kwannoel/commit-in-barrier branch July 16, 2024 10:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants