Skip to content

Commit

Permalink
feat(stream): Support pause_on_next_bootstrap for snapshot read (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Jan 29, 2024
1 parent e1e6964 commit 3a49045
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 21 deletions.
41 changes: 39 additions & 2 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ where

// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;
let mut paused = first_barrier.is_pause_on_startup();
let first_epoch = first_barrier.epoch;
self.state_table.init_epoch(first_barrier.epoch);

Expand Down Expand Up @@ -225,12 +226,13 @@ where
{
let left_upstream = upstream.by_ref().map(Either::Left);

let right_snapshot = pin!(Self::snapshot_read_per_vnode(
let right_snapshot = pin!(Self::make_snapshot_stream(
&upstream_table,
backfill_state.clone(), // FIXME: Use mutable reference instead.
&mut builders,
paused,
)
.map(Either::Right),);
.map(Either::Right));

// Prefer to select upstream, so we can stop snapshot stream as soon as the
// barrier comes.
Expand Down Expand Up @@ -323,10 +325,25 @@ where
};

// Process barrier:
// - handle mutations
// - consume snapshot rows left in builder.
// - consume upstream buffer chunk
// - switch snapshot

// handle mutations
if let Some(mutation) = barrier.mutation.as_deref() {
use crate::executor::Mutation;
match mutation {
Mutation::Pause => {
paused = true;
}
Mutation::Resume => {
paused = false;
}
_ => (),
}
}

// consume snapshot rows left in builder.
// NOTE(kwannoel): `zip_eq_debug` does not work here,
// we encounter "higher-ranked lifetime error".
Expand Down Expand Up @@ -509,6 +526,26 @@ where
}
}

#[try_stream(ok = Option<(VirtualNode, StreamChunk)>, error = StreamExecutorError)]
async fn make_snapshot_stream<'a>(
upstream_table: &'a ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
builders: &'a mut [DataChunkBuilder],
paused: bool,
) {
if paused {
#[for_await]
for _ in tokio_stream::pending() {
yield None;
}
} else {
#[for_await]
for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state, builders) {
yield r?;
}
}
}

/// Read snapshot per vnode.
/// These streams should be sorted in storage layer.
/// 1. Get row iterator / vnode.
Expand Down
70 changes: 51 additions & 19 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ where

// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;
let mut paused = first_barrier.is_pause_on_startup();
let init_epoch = first_barrier.epoch.prev;
if let Some(state_table) = self.state_table.as_mut() {
state_table.init_epoch(first_barrier.epoch);
Expand Down Expand Up @@ -232,12 +233,13 @@ where
{
let left_upstream = upstream.by_ref().map(Either::Left);

let right_snapshot = pin!(Self::snapshot_read(
let right_snapshot = pin!(Self::make_snapshot_stream(
&self.upstream_table,
snapshot_read_epoch,
current_pos.clone(),
paused
)
.map(Either::Right),);
.map(Either::Right));

// Prefer to select upstream, so we can stop snapshot stream as soon as the
// barrier comes.
Expand Down Expand Up @@ -328,7 +330,8 @@ where
// Before processing barrier, if did not snapshot read,
// do a snapshot read first.
// This is so we don't lose the tombstone iteration progress.
if !has_snapshot_read {
// If paused, we also can't read any snapshot records.
if !has_snapshot_read && !paused {
assert!(builder.is_empty());
let (_, snapshot) = backfill_stream.into_inner();
#[for_await]
Expand Down Expand Up @@ -447,23 +450,32 @@ where
);

// Update snapshot read chunk builder.
if let Some(mutation) = barrier.mutation.as_ref() {
if let Mutation::Throttle(actor_to_apply) = mutation.as_ref() {
let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
if let Some(new_rate_limit) = new_rate_limit_entry {
rate_limit = new_rate_limit.as_ref().map(|x| *x as _);
tracing::info!(
id = self.actor_id,
new_rate_limit = ?self.rate_limit,
"actor rate limit changed",
);
assert!(builder.is_empty());
builder = create_builder(
rate_limit,
self.chunk_size,
self.upstream_table.schema().data_types(),
);
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => {
paused = true;
}
Mutation::Resume => {
paused = false;
}
Mutation::Throttle(actor_to_apply) => {
let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
if let Some(new_rate_limit) = new_rate_limit_entry {
rate_limit = new_rate_limit.as_ref().map(|x| *x as _);
tracing::info!(
id = self.actor_id,
new_rate_limit = ?self.rate_limit,
"actor rate limit changed",
);
assert!(builder.is_empty());
builder = create_builder(
rate_limit,
self.chunk_size,
self.upstream_table.schema().data_types(),
);
}
}
_ => (),
}
}

Expand Down Expand Up @@ -614,6 +626,26 @@ where
}
}

#[try_stream(ok = Option<OwnedRow>, error = StreamExecutorError)]
async fn make_snapshot_stream(
upstream_table: &StorageTable<S>,
epoch: u64,
current_pos: Option<OwnedRow>,
paused: bool,
) {
if paused {
#[for_await]
for _ in tokio_stream::pending() {
yield None;
}
} else {
#[for_await]
for r in Self::snapshot_read(upstream_table, epoch, current_pos) {
yield r?;
}
}
}

/// Snapshot read the upstream mv.
/// The rows from upstream snapshot read will be buffered inside the `builder`.
/// If snapshot is dropped before its rows are consumed,
Expand Down

0 comments on commit 3a49045

Please sign in to comment.