Skip to content

Commit

Permalink
support rate limiting in madsim
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 30, 2023
1 parent f05ab4a commit b321e12
Showing 1 changed file with 25 additions and 2 deletions.
27 changes: 25 additions & 2 deletions src/stream/src/executor/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ type RateLimitConfig = Option<u32>;
impl FlowControlExecutor {
#[allow(clippy::too_many_arguments)]
pub fn new(input: Box<dyn Executor>, rate_limit: Option<u32>) -> Self {
#[cfg(madsim)]
tracing::warn!("FlowControlExecutor rate limiter is disabled in madsim as it will spawn system threads");
Self { input, rate_limit }
}

Expand All @@ -52,6 +50,7 @@ impl FlowControlExecutor {
let mut input_stream = input.execute();
loop {
match &rate_limit {
// If no rate limit, just need to handle cfg change.
None =>
{
#[for_await]
Expand All @@ -74,10 +73,13 @@ impl FlowControlExecutor {
}
}
}
// If rate limit, need to handle cfg change, and rate limit.
Some(rate_limit_value) => {
let quota = Quota::per_second(NonZeroU32::new(*rate_limit_value).unwrap());
let clock = MonotonicClock;
let rate_limiter = RateLimiter::direct_with_clock(quota, &clock);
#[cfg(madsim)]
let mut rate_limit_quota = *rate_limit_value;
#[for_await]
for msg in &mut input_stream {
let msg = msg?;
Expand All @@ -97,6 +99,27 @@ impl FlowControlExecutor {
);
}
}
#[cfg(madsim)]
{
let chunk_cardinality = chunk.cardinality();
if rate_limit_quota < chunk_cardinality as u32 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
rate_limit_quota = *rate_limit_value;
// If after reset, chunk cardinality is still greater than
// rate limit, we cannot apply rate limit to it.
if rate_limit_quota < chunk_cardinality as u32 {
tracing::error!(
"Rate Limit {:?} smaller than chunk cardinality {}",
rate_limit_value,
chunk_cardinality,
);
} else {
rate_limit_quota -= chunk_cardinality as u32;
}
} else {
rate_limit_quota -= chunk_cardinality as u32;
}
}
yield msg;
}
Message::Barrier(ref b) => {
Expand Down

0 comments on commit b321e12

Please sign in to comment.