Skip to content

Commit

Permalink
apply adaptive rate limit immediately from first chunk
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jun 10, 2024
1 parent ad94e9b commit 570de3c
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,28 +113,6 @@ where
let vnodes = upstream_table.vnodes().clone();
let mut rate_limit = self.rate_limit;

// These builders will build data chunks.
// We must supply them with the full datatypes which correspond to
// pk + output_indices.
let snapshot_data_types = self
.upstream
.schema()
.fields()
.iter()
.map(|field| field.data_type.clone())
.collect_vec();
let mut builders: Builders = upstream_table
.vnodes()
.iter_vnodes()
.map(|vnode| {
let builder =
create_builder(rate_limit, self.chunk_size, snapshot_data_types.clone());
(vnode, builder)
})
.collect();

let mut upstream = self.upstream.execute();

// Query the current barrier latency from meta.
// Permit a 2x fluctuation in barrier latency. Set threshold to 15s.
let mut total_barrier_latency = Self::get_total_barrier_latency(&self.metrics);
Expand All @@ -159,6 +137,28 @@ where
rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT);
}

// These builders will build data chunks.
// We must supply them with the full datatypes which correspond to
// pk + output_indices.
let snapshot_data_types = self
.upstream
.schema()
.fields()
.iter()
.map(|field| field.data_type.clone())
.collect_vec();
let mut builders: Builders = upstream_table
.vnodes()
.iter_vnodes()
.map(|vnode| {
let builder =
create_builder(rate_limit, self.chunk_size, snapshot_data_types.clone());
(vnode, builder)
})
.collect();

let mut upstream = self.upstream.execute();

// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;
let mut paused = first_barrier.is_pause_on_startup();
Expand Down

0 comments on commit 570de3c

Please sign in to comment.