Skip to content

Commit

Permalink
fix log level
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jun 10, 2024
1 parent ba5abf4 commit ad94e9b
Showing 1 changed file with 3 additions and 18 deletions.
21 changes: 3 additions & 18 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,22 +646,6 @@ where
}
}

// 2x rate limit indefinitely. Backpressure will kick in and slowdown the ingestion rate.
fn adapt_rate_limit_3(
actor_id: &ActorId,
metrics: &StreamingMetrics,
threshold_barrier_latency: f64,
highest_barrier_latency: &mut f64,
total_barrier_latency: &mut f64,
rate_limit: &mut Option<usize>,
rate_limiter: &mut Option<BackfillRateLimiter>,
) {
if let Some(rate_limit_setting) = rate_limit {
*rate_limit_setting = (*rate_limit_setting * 2).clamp(1, 100000);
*rate_limiter = create_limiter(*rate_limit_setting)
}
}

fn adapt_rate_limit_2(
actor_id: &ActorId,
metrics: &StreamingMetrics,
Expand All @@ -679,6 +663,7 @@ where
new_barrier_latency,
);
let new_rate_limit = if *highest_barrier_latency > 2_f64 * threshold_barrier_latency {
*highest_barrier_latency = 0.0;
Some(INITIAL_ADAPTIVE_RATE_LIMIT)
} else if *highest_barrier_latency > threshold_barrier_latency
&& let Some(rate_limit_set) = rate_limit
Expand All @@ -691,7 +676,7 @@ where
} else if new_total_barrier_latency > *total_barrier_latency
&& let Some(rate_limit_set) = rate_limit
{
let scaling_factor = 1.1_f64;
let scaling_factor = 1.01_f64;
let scaled_rate_limit = (*rate_limit_set as f64) * scaling_factor;
let new_rate_limit = scaled_rate_limit.ceil() as usize;
Some(new_rate_limit)
Expand All @@ -704,7 +689,7 @@ where
&& let Some(rate_limit_setting) = new_rate_limit
{
*rate_limit = new_rate_limit;
tracing::trace!(
tracing::debug!(
target: "adaptive_rate_limit",
actor_id,
?rate_limit,
Expand Down

0 comments on commit ad94e9b

Please sign in to comment.