diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index b95aa87f0f415..d8db3bc4efbd9 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -646,22 +646,6 @@ where } } - // 2x rate limit indefinitely. Backpressure will kick in and slowdown the ingestion rate. - fn adapt_rate_limit_3( - actor_id: &ActorId, - metrics: &StreamingMetrics, - threshold_barrier_latency: f64, - highest_barrier_latency: &mut f64, - total_barrier_latency: &mut f64, - rate_limit: &mut Option, - rate_limiter: &mut Option, - ) { - if let Some(rate_limit_setting) = rate_limit { - *rate_limit_setting = (*rate_limit_setting * 2).clamp(1, 100000); - *rate_limiter = create_limiter(*rate_limit_setting) - } - } - fn adapt_rate_limit_2( actor_id: &ActorId, metrics: &StreamingMetrics, @@ -679,6 +663,7 @@ where new_barrier_latency, ); let new_rate_limit = if *highest_barrier_latency > 2_f64 * threshold_barrier_latency { + *highest_barrier_latency = 0.0; Some(INITIAL_ADAPTIVE_RATE_LIMIT) } else if *highest_barrier_latency > threshold_barrier_latency && let Some(rate_limit_set) = rate_limit @@ -691,7 +676,7 @@ where } else if new_total_barrier_latency > *total_barrier_latency && let Some(rate_limit_set) = rate_limit { - let scaling_factor = 1.1_f64; + let scaling_factor = 1.01_f64; let scaled_rate_limit = (*rate_limit_set as f64) * scaling_factor; let new_rate_limit = scaled_rate_limit.ceil() as usize; Some(new_rate_limit) @@ -704,7 +689,7 @@ where && let Some(rate_limit_setting) = new_rate_limit { *rate_limit = new_rate_limit; - tracing::trace!( + tracing::debug!( target: "adaptive_rate_limit", actor_id, ?rate_limit,