diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs index e449392c9799c..880d9149827d3 100644 --- a/src/stream/src/executor/flow_control.rs +++ b/src/stream/src/executor/flow_control.rs @@ -25,7 +25,12 @@ use tokio::time::sleep; use super::*; -/// Rate limiter. We can abstract this out when there's more usecases for it. +#[cfg(not(madsim))] +type RateLimiterImpl = DefaultRateLimiter; +#[cfg(madsim)] +type RateLimiterImpl = SimRateLimiter; + +/// Rate limiter. We can abstract this out when there's more use-cases for it. /// Otherwise for now we can keep it local to `flow_control`. trait RateLimiter { fn new(rate_limit: u32) -> Self; @@ -118,54 +123,14 @@ impl FlowControlExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(self) { - let get_rate_limiter = |rate_limit: u32| { - let quota = Quota::per_second(NonZeroU32::new(rate_limit).unwrap()); - let clock = MonotonicClock; - GovernorRateLimiter::direct_with_clock(quota, &clock) - }; - #[cfg(not(madsim))] - let rate_limiter = self.rate_limit.map(get_rate_limiter); - #[cfg(madsim)] - let mut rate_limit_quota = self.rate_limit.unwrap_or(0); + let rate_limiter = self.rate_limit.map(RateLimiterImpl::new); #[for_await] for msg in self.input.execute() { let msg = msg?; match msg { Message::Chunk(chunk) => { - #[cfg(not(madsim))] - { - if let Some(rate_limiter) = &rate_limiter { - let result = rate_limiter - .until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap()) - .await; - if let Err(InsufficientCapacity(n)) = result { - tracing::error!( - "Rate Limit {:?} smaller than chunk cardinality {n}", - self.rate_limit, - ); - } - } - } - #[cfg(madsim)] - { - if let Some(rate_limit_value) = self.rate_limit { - let chunk_cardinality = chunk.cardinality(); - if rate_limit_quota < chunk_cardinality as u32 { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - rate_limit_quota = rate_limit_value; - // If after reset, chunk cardinality is still greater than - // rate limit, we cannot apply rate limit to it. - if rate_limit_quota < chunk_cardinality as u32 { - tracing::error!( - "Rate Limit {:?} smaller than chunk cardinality {}", - rate_limit_value, - chunk_cardinality, - ); - } - } - rate_limit_quota = - rate_limit_quota.saturating_sub(chunk_cardinality as u32); - } + if let Some(rate_limiter) = &rate_limiter { + rate_limiter.until_n_ready(chunk.cardinality()).await; } yield Message::Chunk(chunk); }