diff --git a/e2e_test/backfill/adaptive-rate-limit/amplification-100.slt b/e2e_test/backfill/adaptive-rate-limit/amplification-100.slt deleted file mode 100644 index b3efafde373a..000000000000 --- a/e2e_test/backfill/adaptive-rate-limit/amplification-100.slt +++ /dev/null @@ -1,100 +0,0 @@ -statement ok -set streaming_parallelism = 2; - -statement ok -create table fact(v1 int); - -statement ok -create table dim(v1 int); - -# Total backfill = 50_000 * 100 = 5M records - -statement ok -insert into fact select 1 from generate_series(1, 250000); - -statement ok -insert into dim select 1 from generate_series(1, 2000); - -statement ok -flush; - -statement ok -set background_ddl = true; - -statement ok -create materialized view m1 as select fact.v1 from fact join dim on fact.v1 = dim.v1; - -statement ok -set background_ddl = false; - -# Let at least 16 barriers pass through -# Then we have 1 * 2^16 = 65536 - -skipif in-memory -sleep 10s - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -# statement ok -# drop sink s1; - -# statement ok -# drop table fact; - -# statement ok -# drop table dim; - -statement ok -set background_ddl = true; - -statement ok -create materialized view m2 as select fact.v1 from fact join dim on fact.v1 = dim.v1; - -statement ok -set background_ddl = false; \ No newline at end of file diff --git a/e2e_test/backfill/adaptive-rate-limit/amplification-10000.slt b/e2e_test/backfill/adaptive-rate-limit/amplification-10000.slt index 75a54ef30098..32e0f65502df 100644 --- a/e2e_test/backfill/adaptive-rate-limit/amplification-10000.slt +++ b/e2e_test/backfill/adaptive-rate-limit/amplification-10000.slt @@ -1,92 +1,31 @@ +statement ok +set streaming_parallelism = 2; + statement ok create table fact(v1 int); statement ok create table dim(v1 int); -statement ok -create table dim2(v1 int); +# Total backfill = 50_000 * 100 = 5M records statement ok -insert into fact select 1 from generate_series(1, 1000000); +insert into fact select v from generate_series(1, 100000) as v ; statement ok -insert into dim select 1 from generate_series(1, 100); - -statement ok -insert into dim2 select 1 from generate_series(1, 100); +insert into dim select v from generate_series(1, 1000) as v, generate_series(1, 10000); statement ok flush; -statement ok -set background_ddl = true; - -statement ok -create sink s1 as select fact.v1 from fact join dim on fact.v1 = dim.v1 join dim2 on fact.v1 = dim2.v1 with (connector = 'blackhole'); - statement ok set background_ddl = false; -# Let at least 16 barriers pass through -# Then we have 1 * 2^16 = 65536 - -skipif in-memory -sleep 10s - -statement ok -flush; - statement ok -flush; - -statement ok -flush; - -statement ok -flush; +set BACKFILL_RATE_LIMIT = 0; statement ok -flush; - -statement ok -flush; - -statement ok -flush; +create materialized view m1 as select fact.v1 from fact join dim on fact.v1 = dim.v1; statement ok flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -# statement ok -# drop sink s1; - -# statement ok -# drop table fact; - -# statement ok -# drop table dim; \ No newline at end of file diff --git a/e2e_test/backfill/adaptive-rate-limit/amplification-500.slt b/e2e_test/backfill/adaptive-rate-limit/amplification-500.slt deleted file mode 100644 index aefbc4ee90d7..000000000000 --- a/e2e_test/backfill/adaptive-rate-limit/amplification-500.slt +++ /dev/null @@ -1,89 +0,0 @@ -statement ok -create table fact(v1 int); - -statement ok -create table dim(v1 int); - -statement ok -create table dim2(v1 int); - -statement ok -insert into fact select 1 from generate_series(1, 1000000); - -statement ok -insert into dim select 1 from generate_series(1, 500); - -statement ok -flush; - -statement ok -set background_ddl = true; - -statement ok -create materialized view m1 as select fact.v1 from fact join dim on fact.v1 = dim.v1; - -statement ok -set background_ddl = false; - -# Let at least 16 barriers pass through -# Then we have 1 * 2^16 = 65536 - -skipif in-memory -sleep 10s - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -statement ok -flush; - -# statement ok -# drop sink s1; - -# statement ok -# drop table fact; - -# statement ok -# drop table dim; \ No newline at end of file diff --git a/src/stream/src/executor/backfill/rate_limiter.rs b/src/stream/src/executor/backfill/rate_limiter.rs index 947515ff79ba..b3dc6dcff866 100644 --- a/src/stream/src/executor/backfill/rate_limiter.rs +++ b/src/stream/src/executor/backfill/rate_limiter.rs @@ -264,12 +264,27 @@ impl AdaptiveRateLimiterInner { let now = Instant::now(); let dur = now.duration_since(self.last_update); + if dur.is_zero() { + return; + } + let real_rate = info.processed_snapshot_rows as f64 / dur.as_secs_f64(); let step = (self.rate * config.step_ratio).clamp(config.step_min, config.step_max); + + // FIXME(MrCroxx): remove this. + tracing::trace!( + "=======================> rate: {}, real: {}, step: {}, dur: {:?}", + self.rate, + real_rate, + step, + dur, + ); + if real_rate >= self.rate { self.rate += step; - } else { + } + if real_rate <= self.rate * 0.9 { self.rate -= step; } self.rate = self