Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support partial checkpoint based backfill (a.k.a snapshot backfill) #17735

Merged
merged 5 commits into from
Aug 15, 2024

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Jul 18, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

In this PR, we support a new type of backfill in creating streaming job. Unlike the current arrangement backfill that consumes the upstream new streaming data and the latest snapshot of the upstream table at the same time, this new type of backfill will keep consuming a fixed snapshot of upstream table, and then consume the upstream data epoch by epoch. For this reason, we will call it snapshot backfill.

User Facing Change

This feature is currently experimental and won't be enabled by default. To enable this feature, we need to explicitly set a session variable likeset streaming_use_snapshot_backfill = true;.

Besides, during consuming the fixed snapshot of upstream table, we will inject some fake barriers to the creating streaming graph at the rate of system.barrier_interval_ms. For the purpose of rate limiting, we can set the session variable backfill_rate_limit to set the maximum number of rows that can be yielded between two barriers during consuming the snapshot of upstream table.

Implementation Details

To specify the new backfill type, we add a new variant SnapshotBackfill in enum StreamScanType. In frontend, when we see that snapshot backfill is enabled by the session variable streaming_use_snapshot_backfill, we will set the stream scan type as StreamScanType::SnapshotBackfill. In meta node, in CreateStreamingJob command, we add a new field job_type to specify whether we will create the streaming job with snapshot backfill. Some snapshot backfill related information will be sent to the GlobalBarrierManager via the CreateStreamingJob command.

Global Barrier Manager

For streaming jobs to be created with snapshot backfill, the GlobalBarrierManager will handle them differently to other streaming jobs. For each newly added snapshot backfill job, we will have a CreatingStreamingJobControl struct assigned for it in the CheckpointControl. Basically, the struct has the following fields to store the inflight and collected barriers, and also the status of the creating streaming job.

struct CreatingStreamingJobControl {
    inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
    collected_barrier: Vec<(u64, Vec<BarrierCompleteResponse>)>,
    status: CreatingStreamingJobStatus,
    backfill_epoch: Epoch, // the epoch on which starts backfill
    ...
}

The status of the creating streaming job is as followed

enum CreatingStreamingJobStatus {
    ConsumingSnapshot {
        /// the fake physical time of the previous inject fake barrier
        prev_epoch_fake_physical_time: u64,
        /// the pending real barriers injected to upstream
        pending_commands: Vec<Arc<CommandContext>>,
        /// track the progress of snapshot backfill
        create_mview_tracker: CreateMviewProgressTracker,
        ...
    },
    ConsumingLogStore,
    ConsumingUpstream,
    Finishing,
}

A creating streaming job can be as 4 statuses.

  1. when the job is newly added, it will be at the ConsumingSnapshot status. The streaming job under this status is consuming the fixed snapshot of upstream. Fake barriers (with monotonically increasing fake epoch as 1, 2, 3, 4, 5, ...) will be injected, and all upstream real barrier command are buffered until the snapshots have finished consuming. The consume progress is tracked in the create_mview_tracker. Under this status, barriers are injected and collected separately in the streaming job of the single job, and independently to the upstream streaming graph.

  2. when we see that we have finished consuming the upstream table snapshot, a barrier with epoch {prev=lastly injected fake epoch, curr = backfill start epoch } will be injected to inform the backfill executor about the finish of consuming upstream snapshot, and then the status will transit to ConsumingLogStore, which means the backfill executor will start consuming the L0 log store of upstream mv tables. Like the ConsumingSnapshot status, barriers are injected and collected separately in the streaming job of the single job, and independently to the upstream streaming graph.

  3. when we see that the downstream creating job has caught up with the upstream (policy to be designed), we will send a barrier with a special mutation to inform the backfill executor to start consuming the upstream streaming data. After this barrier, the downstream graph will join the upstream global graph, and barrier will be injected and collected together. The status will transit to ConsumingUpstream and then Finishing (the two status has only slight difference, and get more context after review the code)

During this process, the synced sst data of table states in all epochs are only collected but won't be committed to hummock. Instead, data are stored epoch by epoch in the CreatingStreamingJobControl struct. After the streaming job has entered the Finishing status and has collected all the pending barriers, we will commit all the previous state table data to hummock all at once, and then mark the streaming job as created in catalog.

Snapshot Backfill Executor

A new SnapshotBackfillExecutor is implemented to support the snapshot backfill. It will be created when the StreamScanExecutorBuilder sees that the stream scan type is SnapshotBackfill.

In this PR, during the process of snapshot backfill, though barriers are injected and collected independently, the physical graph of upstream and downstream are connected, which means the SnapshotBackfillExecutor will still receives all the upstream message, including both stream chunks and barriers. In the stages of consuming upstream snapshot and upstream log store data, the SnapshotBackfillExecutor will concurrently consume the upstream streaming data so that it won't cause back-pressure to the upstream. It will discard the stream chunks and buffer the received barriers.

Besides, the SnapshotBackfillExecutor also registers barrier sender to the local barrier worker. Therefore, the executor receives barriers from both upstream streaming data, and also directly from the local barrier worker. And then the workflow of the executor will be as followed.

  1. when launched, receive the first barrier from both upstream and barrier sender. If the two barriers have the same epoch, it means that the streaming job has been created, and the executor can just skip backfill and keep consuming the upstream streaming data.
  2. If the barrier received from upstream has an epoch greater than the barrier received from barrier sender, it means we should do the backfill. It will take the barrier.epoch.prev of the first barrier received from upstream to consume the snapshot of upstream table at this epoch. It will keep yielding stream chunk while receiving the fake barriers from the barrier sender. Meanwhile, the backfill progress will be reported. After finish consuming the upstream snapshot, it will keep receiving barriers from the sender, and yield no data, until receive a barrier with barrier.epoch.curr as the epoch we used to consume the upstream snapshot.
  3. After we receive the barrier with this epoch, we will start consuming the upstream log store. We still receive barriers from the sender, and we take the barrier.epoch.prev as the epoch to consume the upstream log store. After we finish consuming the log on this epoch, we will then yield the barrier.
  4. We will keep consuming the log store until we receive a barrier with special mutation (described in later section) to inform that we should start consuming upstream streaming data. From this barrier on, we will just simply consume the upstream streaming data, and yield it, and snapshot backfill finishes.

Enable/Disable Upstream MView Table Log Store

The downstream consume the upstream log store in the same way as subscription. Therefore, the creating streaming job will be added as a subscriber to the upstream mv table. The subscriber id will be the table fragments id of the creating streaming job. When GlobalBarrierManager handles a CreateStreamingJob command with backfill snapshot enabled, it will inject a barrier with add mutation to the upstream graph, which is similar to the normal command. The only difference is that it will add the subscriber info in the subscriptions_to_add field added to AddMutation in #17897. After receiving this barrier, the mview executor will be aware of the newly added subscriber, and will enable log store and start writing old value.

When we finish consuming the snapshot and the progress of consuming log store has almost caught with upstream, we will merge the downstream to the upstream, and the downstream will start consuming the upstream streaming data. Since after this the downstream will stop consuming the log store, the subscription on upstream mv table should be removed. Therefore, we will inject a barrier with mutation as DropSubscriptions to inform the mview executor to remove the subscriber to possibly disable log store and stop writing old value. The SnapshotBackfillExecutor will also leverage this barrier to get notified on stop consuming log store and start consuming upstream streaming data.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

We are going to support a new type of backfill in creating streaming job. Unlike the current arrangement backfill that consumes the upstream new streaming data and the latest snapshot of the upstream table at the same time, this new type of backfill will keep consuming a fixed snapshot of upstream table, and then consume the upstream data epoch by epoch. For this reason, we will call it snapshot backfill.

During snapshot backfill, barriers are injected and collected independently in the downstream creating jobs, and therefore the checkpointing of downstream jobs are somehow decoupled with the upstream created jobs, and furthermore during backfill, the workload are isolated between the backing downstream jobs and the upstream created jobs.

This feature is currently experimental and won't be enabled by default. To enable this feature, we need to explicitly set a session variable likeset streaming_use_snapshot_backfill = true;.

Besides, during consuming the fixed snapshot of upstream table, we will inject some fake barriers to the creating streaming graph at the rate of system.barrier_interval_ms. For the purpose of rate limiting, we can set the session variable backfill_rate_limit to set the maximum number of rows that can be yielded between two barriers during consuming the snapshot of upstream table.

@@ -1074,6 +1195,59 @@ impl CommandContext {
.await;
}

Command::CreateSnapshotBackfillStreamingJob { info, .. } => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does Backfill Streaming job need its own command? Why can't it just reuse that of normal streaming jobs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously introduce a new command so that I can be aware of how the new snapshot backfill will affect the current logic. But it seems to be more elegant to reuse the original CreateStreamingJob command, so I change to reuse it in the latest commit.

I will update the PR description soon for easier review.

@wenym1 wenym1 changed the base branch from main to li0k/storage_commit_multi_epoch July 31, 2024 09:42
Base automatically changed from li0k/storage_commit_multi_epoch to main August 2, 2024 07:06
@wenym1 wenym1 changed the base branch from main to yiming/create-sub-via-add-mutation August 2, 2024 07:10
@wenym1 wenym1 marked this pull request as ready for review August 2, 2024 10:36
Base automatically changed from yiming/create-sub-via-add-mutation to main August 5, 2024 06:54
@wenym1 wenym1 force-pushed the yiming/snapshot-backfill branch from 5f65382 to f9246af Compare August 5, 2024 10:21
@wenym1 wenym1 changed the title feat: (wip) partial checkpoint backfill feat: support partial checkpoint based backfill (a.k.a snapshot backfill) Aug 6, 2024
@wenym1 wenym1 added the user-facing-changes Contains changes that are visible to users label Aug 6, 2024
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some general comments

proto/stream_service.proto Show resolved Hide resolved
@@ -634,7 +640,8 @@ impl PlanRoot {
) -> Result<StreamMaterialize> {
assert_eq!(self.phase, PlanPhase::Logical);
assert_eq!(self.plan.convention(), Convention::Logical);
let stream_plan = self.gen_optimized_stream_plan(false)?;
// Snapshot backfill is not allowed for create table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any blocker for supporting snapshot backfill for create table/index or we just want to gradually roll-out this feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use snapshot backfill for mv, index and normal sink. It's disabled for table, source and sink-into-table.

);
for notifier in notifiers {
notifier
.notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that:

  1. cancel job is only appliciable to non-snapshot-backfill create mv?
  2. we can use RECOVER command to cancel snapshot-backfill create mv?
  3. snapshot-backfill create mv is non-recoverable on transient failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Currently snapshot backfill is non-recoverable, and cancelling the snapshot backfill job may require extra logic to inject some special barriers to stop the job, so here we just simply reject the cancel command.

definition = info.definition,
"new creating job"
);
let mut create_mview_tracker = CreateMviewProgressTracker::default();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-snapshot-backfill streaming job creation, we have one CreateMviewProgressTracker for all streaming jobs while for snapshot-backfill streaming job creation, we have one CreateMviewProgressTracker for each job. Is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is expected. Here we just want to reuse the logic of CreateMviewProgressTracker so that we don't need to develop extra logic to track the backfill progress of snapshot backfill.


async fn consume_upstream(&mut self) -> StreamExecutorError {
while !self.is_finished {
let result = self.consume_until_next_barrier().await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SnapshotBackfillExecutor will concurrently consume the upstream streaming data so that it won't cause back-pressure to the upstream. It will discard the stream chunks and buffer the received barriers.

Will there be any shuffle between upstream and the snapshot backfill actor? I am thinking whether the overhead of passing the stream chunks to snapshot backfill is high. Also, will it be cleaner if the streaming graph is physically disconnected when the job status is ConsumingSnapshot / ConsumingLogStore and only add the downstream back into the streaming graph after ConsumingLogStore (for example on DropSubscriptions mutation).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this will be nice, but for simplicity of the current implementation, the current PR just reuses the original logic, and keep the streaming graph physically connected.

Furthermore, for the general partial ckpt implementation in the future, the streaming graph shall remain connected, so that we can dynamically decide whether to consume upstream data or log store data, and meanwhile introduce back-pressure to upstream when needed. For the future implementation, when the downstream is lagging too much behind, though the graph is physically connected, we can ask the dispatcher not to dispatch the stream chunk to reduce the traffic.

src/meta/src/barrier/mod.rs Outdated Show resolved Hide resolved
src/meta/src/barrier/mod.rs Outdated Show resolved Hide resolved
| CreatingStreamingJobStatus::ConsumingLogStore { .. } => {}
CreatingStreamingJobStatus::Finishing(prev_epoch) => {
if command_ctx.kind.is_checkpoint()
&& prev_epoch != command_ctx.prev_epoch.value().0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can we transition the job status from Finishing to Finished on prev_epoch != command_ctx.prev_epoch.value().0? I thought the transitioning should be done in finish collecting the prev_epoch in Finishing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status actually reflect the status after injecting the latest barrier and does not directly relate to collecting barriers. For example, even though we have entered the Finishing status after injecting a barrier, theoretically there can be still some uncollected barriers injected at status ConsumingSnapshot.

BTW, the naming of the status was confusing. I have changed the original Finishing and Finished to ConsumingUpstream and Finishing.

@kwannoel
Copy link
Contributor

I'll leave a review on monday.

Copy link
Contributor

@kwannoel kwannoel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to change the fragmenter logic to follow arrangement backfill. Otherwise it will follow no-shuffle fragmentation.

Can search for meta/src/stream/stream_graph/fragment.rs::has_arrangement_backfill

I think we can do this in a subsequent PR, since the current one is rather large.

yield Message::Barrier(first_barrier);
}
}
// Phase 3: consume upstream
Copy link
Contributor

@kwannoel kwannoel Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a trace an info log here to indicate that backfill is done on the CN side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean something like an info log so that we can know that the backfill is finished from the log? Or there is currently some special trace collection mechanism that leverage a trace here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean something like an info log so that we can know that the backfill is finished from the log? Or there is currently some special trace collection mechanism that leverage a trace here?

Sorry I mean an info log.

@wenym1 wenym1 requested review from hzxa21 and kwannoel August 14, 2024 10:19
@kwannoel
Copy link
Contributor

During this process, the synced sst data of table states in all epochs are only collected but won't be committed to hummock. Instead, data are stored epoch by epoch in the CreatingStreamingJobControl struct. After the streaming job has entered the Finishing status and has collected all the pending barriers, we will commit all the previous state table data to hummock all at once, and then mark the streaming job as created in catalog.

Not sure if I'm misreading this. Will it mean we buffer everything in-memory, inside the CreatingStreamJobControl struct? Won't we risk OOM? Seems like I missed something.

@wenym1
Copy link
Contributor Author

wenym1 commented Aug 15, 2024

During this process, the synced sst data of table states in all epochs are only collected but won't be committed to hummock. Instead, data are stored epoch by epoch in the CreatingStreamingJobControl struct. After the streaming job has entered the Finishing status and has collected all the pending barriers, we will commit all the previous state table data to hummock all at once, and then mark the streaming job as created in catalog.

Not sure if I'm misreading this. Will it mean we buffer everything in-memory, inside the CreatingStreamJobControl struct? Won't we risk OOM? Seems like I missed something.

The things we buffer was only the SST metadata SstableInfo rather the actual data. The size of the metadata is relatively small, so it should be fine to buffer all of them in memory.

Moreover, in the latest commit, I have changed to commit the data of backfilling mv epoch by epoch instead of all at once. I will update the PR description accordingly.

@kwannoel
Copy link
Contributor

kwannoel commented Aug 15, 2024

the progress of consuming log store has almost caught with upstream, we will merge the downstream to the upstream,

Does this mean when consuming log store has almost caught up with upstream, we will merge the upstream stream, and changelog stream, then afterwards only read from the upstream stream once changelog is fully consumed?

If not how do we know consuming log store is "done" and it's time to switch to consuming upstream? What's the signal for that.

@wenym1
Copy link
Contributor Author

wenym1 commented Aug 15, 2024

the progress of consuming log store has almost caught with upstream, we will merge the downstream to the upstream,

Does this mean when consuming log store has almost caught up with upstream, we will merge the upstream stream, and changelog stream, then afterwards only read from the upstream stream once changelog is fully consumed?

If not how do we know consuming log store is "done" and it's time to switch to consuming upstream? What's the signal for that.

Will update the doc for the implemented policy to ensure the backfill can catch up in time.

In brief, when starting consuming log store, we will store the current upstream epoch named start_consume_log_store_epoch. When we consume the log store till this start_consume_log_store_epoch, we will switch to consuming upstream. Besides, when we consuming upstream, we have a mechanism to ensure that the creating job to catch up with the upstream eventually. For example, let's say the downstream has to consume epoch 1-10, and the upstream is at epoch 11. To ensure the downstream can catch up eventually, we will attach downstream epoch 5 to upstream epoch 11, which means that the upstream epoch 11 should wait for downstream to finish consuming epoch 5. And then we attach downstream epoch 10 to upstream epoch 12, and then attach downstream epoch 13 to upstream epoch 13. When there is no unattached downstream epoch, we will transit to Finishing, and then all subsequent barrier will be injected and collected together in upstream and downstream.

Maybe we can further replace the current policy of start_consume_log_store_epoch with this mechanism when consuming log store.

@kwannoel
Copy link
Contributor

SnapshotBackfillExecutor will still receives all the upstream message

We may need some mechanism in the future to concurrently process upstream Message::Barrier still, because cancel MV, alter rate limit all rely on barrier commands to signal the executor.

Copy link
Contributor

@kwannoel kwannoel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM once tests pass (including enabled main-cron tests).

Base automatically changed from yiming/commit-partial-table to main August 15, 2024 08:59
@wenym1 wenym1 force-pushed the yiming/snapshot-backfill branch from c59e800 to d4b5646 Compare August 15, 2024 09:07
@wenym1 wenym1 added this pull request to the merge queue Aug 15, 2024
Merged via the queue into main with commit cfea9f3 Aug 15, 2024
35 of 36 checks passed
@wenym1 wenym1 deleted the yiming/snapshot-backfill branch August 15, 2024 16:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants