Skip to content

Commit

Permalink
feat: refactor algo
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Dec 16, 2024
1 parent ade0464 commit f9b4348
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 259 deletions.
100 changes: 0 additions & 100 deletions e2e_test/backfill/adaptive-rate-limit/amplification-100.slt

This file was deleted.

77 changes: 8 additions & 69 deletions e2e_test/backfill/adaptive-rate-limit/amplification-10000.slt
Original file line number Diff line number Diff line change
@@ -1,92 +1,31 @@
statement ok
set streaming_parallelism = 2;

statement ok
create table fact(v1 int);

statement ok
create table dim(v1 int);

statement ok
create table dim2(v1 int);
# Total backfill = 50_000 * 100 = 5M records

statement ok
insert into fact select 1 from generate_series(1, 1000000);
insert into fact select v from generate_series(1, 100000) as v ;

statement ok
insert into dim select 1 from generate_series(1, 100);

statement ok
insert into dim2 select 1 from generate_series(1, 100);
insert into dim select v from generate_series(1, 1000) as v, generate_series(1, 10000);

statement ok
flush;

statement ok
set background_ddl = true;

statement ok
create sink s1 as select fact.v1 from fact join dim on fact.v1 = dim.v1 join dim2 on fact.v1 = dim2.v1 with (connector = 'blackhole');

statement ok
set background_ddl = false;

# Let at least 16 barriers pass through
# Then we have 1 * 2^16 = 65536

skipif in-memory
sleep 10s

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;
set BACKFILL_RATE_LIMIT = 0;

statement ok
flush;

statement ok
flush;

statement ok
flush;
create materialized view m1 as select fact.v1 from fact join dim on fact.v1 = dim.v1;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
flush;

# statement ok
# drop sink s1;

# statement ok
# drop table fact;

# statement ok
# drop table dim;
89 changes: 0 additions & 89 deletions e2e_test/backfill/adaptive-rate-limit/amplification-500.slt

This file was deleted.

17 changes: 16 additions & 1 deletion src/stream/src/executor/backfill/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,27 @@ impl AdaptiveRateLimiterInner {
let now = Instant::now();
let dur = now.duration_since(self.last_update);

if dur.is_zero() {
return;
}

let real_rate = info.processed_snapshot_rows as f64 / dur.as_secs_f64();

let step = (self.rate * config.step_ratio).clamp(config.step_min, config.step_max);

// FIXME(MrCroxx): remove this.
tracing::trace!(
"=======================> rate: {}, real: {}, step: {}, dur: {:?}",
self.rate,
real_rate,
step,
dur,
);

if real_rate >= self.rate {
self.rate += step;
} else {
}
if real_rate <= self.rate * 0.9 {
self.rate -= step;
}
self.rate = self
Expand Down

0 comments on commit f9b4348

Please sign in to comment.