-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support partial checkpoint based backfill (a.k.a snapshot backfill) #17735
Conversation
src/meta/src/barrier/command.rs
Outdated
@@ -1074,6 +1195,59 @@ impl CommandContext { | |||
.await; | |||
} | |||
|
|||
Command::CreateSnapshotBackfillStreamingJob { info, .. } => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does Backfill Streaming job need its own command? Why can't it just reuse that of normal streaming jobs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously introduce a new command so that I can be aware of how the new snapshot backfill will affect the current logic. But it seems to be more elegant to reuse the original CreateStreamingJob command, so I change to reuse it in the latest commit.
I will update the PR description soon for easier review.
5f65382
to
f9246af
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some general comments
@@ -634,7 +640,8 @@ impl PlanRoot { | |||
) -> Result<StreamMaterialize> { | |||
assert_eq!(self.phase, PlanPhase::Logical); | |||
assert_eq!(self.plan.convention(), Convention::Logical); | |||
let stream_plan = self.gen_optimized_stream_plan(false)?; | |||
// Snapshot backfill is not allowed for create table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any blocker for supporting snapshot backfill for create table/index or we just want to gradually roll-out this feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use snapshot backfill for mv, index and normal sink. It's disabled for table, source and sink-into-table.
); | ||
for notifier in notifiers { | ||
notifier | ||
.notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that:
- cancel job is only appliciable to non-snapshot-backfill create mv?
- we can use
RECOVER
command to cancel snapshot-backfill create mv? - snapshot-backfill create mv is non-recoverable on transient failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Currently snapshot backfill is non-recoverable, and cancelling the snapshot backfill job may require extra logic to inject some special barriers to stop the job, so here we just simply reject the cancel command.
definition = info.definition, | ||
"new creating job" | ||
); | ||
let mut create_mview_tracker = CreateMviewProgressTracker::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non-snapshot-backfill streaming job creation, we have one CreateMviewProgressTracker for all streaming jobs while for snapshot-backfill streaming job creation, we have one CreateMviewProgressTracker for each job. Is this expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is expected. Here we just want to reuse the logic of CreateMviewProgressTracker
so that we don't need to develop extra logic to track the backfill progress of snapshot backfill.
|
||
async fn consume_upstream(&mut self) -> StreamExecutorError { | ||
while !self.is_finished { | ||
let result = self.consume_until_next_barrier().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SnapshotBackfillExecutor will concurrently consume the upstream streaming data so that it won't cause back-pressure to the upstream. It will discard the stream chunks and buffer the received barriers.
Will there be any shuffle between upstream and the snapshot backfill actor? I am thinking whether the overhead of passing the stream chunks to snapshot backfill is high. Also, will it be cleaner if the streaming graph is physically disconnected when the job status is ConsumingSnapshot / ConsumingLogStore and only add the downstream back into the streaming graph after ConsumingLogStore (for example on DropSubscriptions mutation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally this will be nice, but for simplicity of the current implementation, the current PR just reuses the original logic, and keep the streaming graph physically connected.
Furthermore, for the general partial ckpt implementation in the future, the streaming graph shall remain connected, so that we can dynamically decide whether to consume upstream data or log store data, and meanwhile introduce back-pressure to upstream when needed. For the future implementation, when the downstream is lagging too much behind, though the graph is physically connected, we can ask the dispatcher not to dispatch the stream chunk to reduce the traffic.
src/meta/src/barrier/mod.rs
Outdated
| CreatingStreamingJobStatus::ConsumingLogStore { .. } => {} | ||
CreatingStreamingJobStatus::Finishing(prev_epoch) => { | ||
if command_ctx.kind.is_checkpoint() | ||
&& prev_epoch != command_ctx.prev_epoch.value().0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can we transition the job status from Finishing to Finished on prev_epoch != command_ctx.prev_epoch.value().0
? I thought the transitioning should be done in finish collecting the prev_epoch
in Finishing
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The status actually reflect the status after injecting the latest barrier and does not directly relate to collecting barriers. For example, even though we have entered the Finishing
status after injecting a barrier, theoretically there can be still some uncollected barriers injected at status ConsumingSnapshot
.
BTW, the naming of the status was confusing. I have changed the original Finishing
and Finished
to ConsumingUpstream
and Finishing
.
I'll leave a review on monday. |
src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also need to change the fragmenter logic to follow arrangement backfill. Otherwise it will follow no-shuffle fragmentation.
Can search for meta/src/stream/stream_graph/fragment.rs::has_arrangement_backfill
I think we can do this in a subsequent PR, since the current one is rather large.
yield Message::Barrier(first_barrier); | ||
} | ||
} | ||
// Phase 3: consume upstream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a trace an info
log here to indicate that backfill is done on the CN side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean something like an info
log so that we can know that the backfill is finished from the log? Or there is currently some special trace collection mechanism that leverage a trace here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean something like an info log so that we can know that the backfill is finished from the log? Or there is currently some special trace collection mechanism that leverage a trace here?
Sorry I mean an info
log.
Not sure if I'm misreading this. Will it mean we buffer everything in-memory, inside the |
The things we buffer was only the SST metadata Moreover, in the latest commit, I have changed to commit the data of backfilling mv epoch by epoch instead of all at once. I will update the PR description accordingly. |
Does this mean when consuming log store has almost caught up with upstream, we will merge the upstream stream, and changelog stream, then afterwards only read from the upstream stream once changelog is fully consumed? If not how do we know consuming log store is "done" and it's time to switch to consuming upstream? What's the signal for that. |
Will update the doc for the implemented policy to ensure the backfill can catch up in time. In brief, when starting consuming log store, we will store the current upstream epoch named Maybe we can further replace the current policy of |
We may need some mechanism in the future to concurrently process upstream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM once tests pass (including enabled main-cron tests).
c59e800
to
d4b5646
Compare
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
In this PR, we support a new type of backfill in creating streaming job. Unlike the current arrangement backfill that consumes the upstream new streaming data and the latest snapshot of the upstream table at the same time, this new type of backfill will keep consuming a fixed snapshot of upstream table, and then consume the upstream data epoch by epoch. For this reason, we will call it
snapshot backfill
.User Facing Change
This feature is currently experimental and won't be enabled by default. To enable this feature, we need to explicitly set a session variable like
set streaming_use_snapshot_backfill = true;
.Besides, during consuming the fixed snapshot of upstream table, we will inject some fake barriers to the creating streaming graph at the rate of
system.barrier_interval_ms
. For the purpose of rate limiting, we can set the session variablebackfill_rate_limit
to set the maximum number of rows that can be yielded between two barriers during consuming the snapshot of upstream table.Implementation Details
To specify the new backfill type, we add a new variant
SnapshotBackfill
in enumStreamScanType
. In frontend, when we see that snapshot backfill is enabled by the session variablestreaming_use_snapshot_backfill
, we will set the stream scan type asStreamScanType::SnapshotBackfill
. In meta node, inCreateStreamingJob
command, we add a new fieldjob_type
to specify whether we will create the streaming job with snapshot backfill. Some snapshot backfill related information will be sent to theGlobalBarrierManager
via theCreateStreamingJob
command.Global Barrier Manager
For streaming jobs to be created with snapshot backfill, the
GlobalBarrierManager
will handle them differently to other streaming jobs. For each newly added snapshot backfill job, we will have aCreatingStreamingJobControl
struct assigned for it in theCheckpointControl
. Basically, the struct has the following fields to store the inflight and collected barriers, and also the status of the creating streaming job.The status of the creating streaming job is as followed
A creating streaming job can be as 4 statuses.
when the job is newly added, it will be at the
ConsumingSnapshot
status. The streaming job under this status is consuming the fixed snapshot of upstream. Fake barriers (with monotonically increasing fake epoch as 1, 2, 3, 4, 5, ...) will be injected, and all upstream real barrier command are buffered until the snapshots have finished consuming. The consume progress is tracked in thecreate_mview_tracker
. Under this status, barriers are injected and collected separately in the streaming job of the single job, and independently to the upstream streaming graph.when we see that we have finished consuming the upstream table snapshot, a barrier with
epoch {prev=lastly injected fake epoch, curr = backfill start epoch }
will be injected to inform the backfill executor about the finish of consuming upstream snapshot, and then the status will transit toConsumingLogStore
, which means the backfill executor will start consuming the L0 log store of upstream mv tables. Like theConsumingSnapshot
status, barriers are injected and collected separately in the streaming job of the single job, and independently to the upstream streaming graph.when we see that the downstream creating job has caught up with the upstream (policy to be designed), we will send a barrier with a special mutation to inform the backfill executor to start consuming the upstream streaming data. After this barrier, the downstream graph will join the upstream global graph, and barrier will be injected and collected together. The status will transit to
ConsumingUpstream
and thenFinishing
(the two status has only slight difference, and get more context after review the code)During this process, the synced sst data of table states in all epochs are only collected but won't be committed to hummock. Instead, data are stored epoch by epoch in theCreatingStreamingJobControl
struct. After the streaming job has entered theFinishing
status and has collected all the pending barriers, we will commit all the previous state table data to hummock all at once, and then mark the streaming job as created in catalog.Snapshot Backfill Executor
A new
SnapshotBackfillExecutor
is implemented to support the snapshot backfill. It will be created when theStreamScanExecutorBuilder
sees that the stream scan type isSnapshotBackfill
.In this PR, during the process of snapshot backfill, though barriers are injected and collected independently, the physical graph of upstream and downstream are connected, which means the
SnapshotBackfillExecutor
will still receives all the upstream message, including both stream chunks and barriers. In the stages of consuming upstream snapshot and upstream log store data, theSnapshotBackfillExecutor
will concurrently consume the upstream streaming data so that it won't cause back-pressure to the upstream. It will discard the stream chunks and buffer the received barriers.Besides, the
SnapshotBackfillExecutor
also registers barrier sender to the local barrier worker. Therefore, the executor receives barriers from both upstream streaming data, and also directly from the local barrier worker. And then the workflow of the executor will be as followed.barrier.epoch.prev
of the first barrier received from upstream to consume the snapshot of upstream table at this epoch. It will keep yielding stream chunk while receiving the fake barriers from the barrier sender. Meanwhile, the backfill progress will be reported. After finish consuming the upstream snapshot, it will keep receiving barriers from the sender, and yield no data, until receive a barrier withbarrier.epoch.curr
as the epoch we used to consume the upstream snapshot.barrier.epoch.prev
as the epoch to consume the upstream log store. After we finish consuming the log on this epoch, we will then yield the barrier.Enable/Disable Upstream MView Table Log Store
The downstream consume the upstream log store in the same way as subscription. Therefore, the creating streaming job will be added as a subscriber to the upstream mv table. The subscriber id will be the table fragments id of the creating streaming job. When
GlobalBarrierManager
handles aCreateStreamingJob
command with backfill snapshot enabled, it will inject a barrier with add mutation to the upstream graph, which is similar to the normal command. The only difference is that it will add the subscriber info in thesubscriptions_to_add
field added toAddMutation
in #17897. After receiving this barrier, the mview executor will be aware of the newly added subscriber, and will enable log store and start writing old value.When we finish consuming the snapshot and the progress of consuming log store has almost caught with upstream, we will merge the downstream to the upstream, and the downstream will start consuming the upstream streaming data. Since after this the downstream will stop consuming the log store, the subscription on upstream mv table should be removed. Therefore, we will inject a barrier with mutation as
DropSubscriptions
to inform the mview executor to remove the subscriber to possibly disable log store and stop writing old value. TheSnapshotBackfillExecutor
will also leverage this barrier to get notified on stop consuming log store and start consuming upstream streaming data.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
We are going to support a new type of backfill in creating streaming job. Unlike the current arrangement backfill that consumes the upstream new streaming data and the latest snapshot of the upstream table at the same time, this new type of backfill will keep consuming a fixed snapshot of upstream table, and then consume the upstream data epoch by epoch. For this reason, we will call it
snapshot backfill
.During
snapshot backfill
, barriers are injected and collected independently in the downstream creating jobs, and therefore the checkpointing of downstream jobs are somehow decoupled with the upstream created jobs, and furthermore during backfill, the workload are isolated between the backing downstream jobs and the upstream created jobs.This feature is currently experimental and won't be enabled by default. To enable this feature, we need to explicitly set a session variable like
set streaming_use_snapshot_backfill = true;
.Besides, during consuming the fixed snapshot of upstream table, we will inject some fake barriers to the creating streaming graph at the rate of
system.barrier_interval_ms
. For the purpose of rate limiting, we can set the session variablebackfill_rate_limit
to set the maximum number of rows that can be yielded between two barriers during consuming the snapshot of upstream table.