Skip to content

Commit

Permalink
make adaptive rate limit configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 31, 2024
1 parent e842dad commit ba5abf4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
5 changes: 4 additions & 1 deletion e2e_test/backfill/adaptive-rate-limit/amplification-100.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set streaming_parallelism = 2;

statement ok
create table fact(v1 int);

Expand All @@ -10,7 +13,7 @@ statement ok
insert into fact select 1 from generate_series(1, 250000);

statement ok
insert into dim select 1 from generate_series(1, 100);
insert into dim select 1 from generate_series(1, 2000);

statement ok
flush;
Expand Down
20 changes: 14 additions & 6 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ where
let upstream_table_id = self.upstream_table.table_id();
let mut upstream_table = self.upstream_table;
let vnodes = upstream_table.vnodes().clone();
let rate_limit = self.rate_limit;
self.chunk_size = 1;
let mut rate_limit = self.rate_limit;

// These builders will build data chunks.
// We must supply them with the full datatypes which correspond to
Expand Down Expand Up @@ -147,9 +146,18 @@ where
highest_barrier_latency * 2.0
}
};
tracing::debug!(target: "adaptive_rate_limit", highest_barrier_latency, threshold_barrier_latency, "initial configs");
let adaptive_rate_limit = true;
let mut rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT);
let adaptive_rate_limit = {
use std::env;
let key = "ADAPTIVE_RATE_LIMIT";
match env::var(key) {
Ok(val) => true,
Err(_) => false,
}
};
tracing::debug!(target: "adaptive_rate_limit", adaptive_rate_limit, highest_barrier_latency, threshold_barrier_latency, "initial configs");
if adaptive_rate_limit {
rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT);
}

// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;
Expand Down Expand Up @@ -557,7 +565,7 @@ where

// Adapt Rate Limit
if adaptive_rate_limit {
Self::adapt_rate_limit_3(
Self::adapt_rate_limit_2(
&self.actor_id,
&self.metrics,
threshold_barrier_latency,
Expand Down

0 comments on commit ba5abf4

Please sign in to comment.