Skip to content

Commit

Permalink
feat(streaming): support alter rate limit for arrangement backfill (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Apr 16, 2024
1 parent 2f560ca commit 18de4d1
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 25 deletions.
77 changes: 54 additions & 23 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ where
let upstream_table_id = self.upstream_table.table_id();
let mut upstream_table = self.upstream_table;
let vnodes = upstream_table.vnodes().clone();
let mut rate_limit = self.rate_limit;

// These builders will build data chunks.
// We must supply them with the full datatypes which correspond to
Expand All @@ -132,11 +133,8 @@ where
.vnodes()
.iter_vnodes()
.map(|vnode| {
let builder = create_builder(
self.rate_limit,
self.chunk_size,
snapshot_data_types.clone(),
);
let builder =
create_builder(rate_limit, self.chunk_size, snapshot_data_types.clone());
(vnode, builder)
})
.collect();
Expand Down Expand Up @@ -215,8 +213,7 @@ where
let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
let mut pending_barrier: Option<Barrier> = None;

let rate_limiter = self.rate_limit.and_then(create_limiter);
let rate_limit = self.rate_limit;
let mut rate_limiter = rate_limit.and_then(create_limiter);

let backfill_snapshot_read_row_count_metric = self
.metrics
Expand Down Expand Up @@ -401,25 +398,11 @@ where
};

// Process barrier:
// - handle mutations
// - consume snapshot rows left in builder.
// - consume upstream buffer chunk
// - handle mutations
// - switch snapshot

// handle mutations
if let Some(mutation) = barrier.mutation.as_deref() {
use crate::executor::Mutation;
match mutation {
Mutation::Pause => {
paused = true;
}
Mutation::Resume => {
paused = false;
}
_ => (),
}
}

// consume snapshot rows left in builder.
// NOTE(kwannoel): `zip_eq_debug` does not work here,
// we encounter "higher-ranked lifetime error".
Expand Down Expand Up @@ -507,6 +490,54 @@ where
"barrier persisted"
);

// handle mutations
if let Some(mutation) = barrier.mutation.as_deref() {
use crate::executor::Mutation;
match mutation {
Mutation::Pause => {
paused = true;
}
Mutation::Resume => {
paused = false;
}
Mutation::Throttle(actor_to_apply) => {
let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
if let Some(new_rate_limit) = new_rate_limit_entry {
let new_rate_limit = new_rate_limit.as_ref().map(|x| *x as _);
if new_rate_limit != rate_limit {
rate_limit = new_rate_limit;
tracing::info!(
id = self.actor_id,
new_rate_limit = ?rate_limit,
"actor rate limit changed",
);
// The builder is emptied above via `DataChunkBuilder::consume_all`.
for (_, builder) in builders {
assert!(
builder.is_empty(),
"builder should already be emptied"
);
}
builders = upstream_table
.vnodes()
.iter_vnodes()
.map(|vnode| {
let builder = create_builder(
rate_limit,
self.chunk_size,
snapshot_data_types.clone(),
);
(vnode, builder)
})
.collect();
rate_limiter = new_rate_limit.and_then(create_limiter);
}
}
}
_ => {}
}
}

yield Message::Barrier(barrier);

// We will switch snapshot at the start of the next iteration of the backfill loop.
Expand Down Expand Up @@ -601,7 +632,7 @@ where
if paused {
#[for_await]
for _ in tokio_stream::pending() {
yield None;
bail!("BUG: paused stream should not yield");
}
} else {
// Checked the rate limit is not zero.
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ where
rate_limit = new_rate_limit;
tracing::info!(
id = self.actor_id,
new_rate_limit = ?self.rate_limit,
new_rate_limit = ?rate_limit,
"actor rate limit changed",
);
// The builder is emptied above via `DataChunkBuilder::consume_all`.
Expand Down Expand Up @@ -656,7 +656,7 @@ where
if paused {
#[for_await]
for _ in tokio_stream::pending() {
yield None;
bail!("BUG: paused stream should not yield");
}
} else {
// Checked the rate limit is not zero.
Expand Down

0 comments on commit 18de4d1

Please sign in to comment.