diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs index b601cbde176c9..bab77215c30bf 100644 --- a/src/stream/src/executor/flow_control.rs +++ b/src/stream/src/executor/flow_control.rs @@ -40,8 +40,6 @@ type RateLimitConfig = Option; impl FlowControlExecutor { #[allow(clippy::too_many_arguments)] pub fn new(input: Box, rate_limit: Option) -> Self { - #[cfg(madsim)] - tracing::warn!("FlowControlExecutor rate limiter is disabled in madsim as it will spawn system threads"); Self { input, rate_limit } } @@ -52,6 +50,7 @@ impl FlowControlExecutor { let mut input_stream = input.execute(); loop { match &rate_limit { + // If no rate limit, just need to handle cfg change. None => { #[for_await] @@ -74,10 +73,13 @@ impl FlowControlExecutor { } } } + // If rate limit, need to handle cfg change, and rate limit. Some(rate_limit_value) => { let quota = Quota::per_second(NonZeroU32::new(*rate_limit_value).unwrap()); let clock = MonotonicClock; let rate_limiter = RateLimiter::direct_with_clock(quota, &clock); + #[cfg(madsim)] + let mut rate_limit_quota = *rate_limit_value; #[for_await] for msg in &mut input_stream { let msg = msg?; @@ -97,6 +99,25 @@ impl FlowControlExecutor { ); } } + #[cfg(madsim)] + { + if rate_limit_quota < chunk.cardinality() as u32 { + tokio::time::sleep(std::time::Duration::from_secs(1)); + rate_limit_quota = *rate_limit_value; + // If after reset, chunk cardinality is still greater than + // rate limit, we cannot apply rate limit to it. + if rate_limit_quota < chunk.cardinality() as u32 { + tracing::error!( + "Rate Limit {:?} smaller than chunk cardinality {n}", + rate_limit_value, + ); + } else { + rate_limit_quota -= chunk.cardinality() as u32; + } + } else { + rate_limit_quota -= chunk.cardinality() as u32; + } + } yield msg; } Message::Barrier(ref b) => {