Skip to content

Commit

Permalink
refactor to use the rate limiter interface
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 30, 2023
1 parent 820666e commit dc9b33d
Showing 1 changed file with 9 additions and 44 deletions.
53 changes: 9 additions & 44 deletions src/stream/src/executor/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ use tokio::time::sleep;

use super::*;

/// Rate limiter. We can abstract this out when there's more usecases for it.
#[cfg(not(madsim))]
type RateLimiterImpl = DefaultRateLimiter;
#[cfg(madsim)]
type RateLimiterImpl = SimRateLimiter;

/// Rate limiter. We can abstract this out when there's more use-cases for it.
/// Otherwise for now we can keep it local to `flow_control`.
trait RateLimiter {
fn new(rate_limit: u32) -> Self;
Expand Down Expand Up @@ -118,54 +123,14 @@ impl FlowControlExecutor {

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(self) {
let get_rate_limiter = |rate_limit: u32| {
let quota = Quota::per_second(NonZeroU32::new(rate_limit).unwrap());
let clock = MonotonicClock;
GovernorRateLimiter::direct_with_clock(quota, &clock)
};
#[cfg(not(madsim))]
let rate_limiter = self.rate_limit.map(get_rate_limiter);
#[cfg(madsim)]
let mut rate_limit_quota = self.rate_limit.unwrap_or(0);
let rate_limiter = self.rate_limit.map(RateLimiterImpl::new);
#[for_await]
for msg in self.input.execute() {
let msg = msg?;
match msg {
Message::Chunk(chunk) => {
#[cfg(not(madsim))]
{
if let Some(rate_limiter) = &rate_limiter {
let result = rate_limiter
.until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap())
.await;
if let Err(InsufficientCapacity(n)) = result {
tracing::error!(
"Rate Limit {:?} smaller than chunk cardinality {n}",
self.rate_limit,
);
}
}
}
#[cfg(madsim)]
{
if let Some(rate_limit_value) = self.rate_limit {
let chunk_cardinality = chunk.cardinality();
if rate_limit_quota < chunk_cardinality as u32 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
rate_limit_quota = rate_limit_value;
// If after reset, chunk cardinality is still greater than
// rate limit, we cannot apply rate limit to it.
if rate_limit_quota < chunk_cardinality as u32 {
tracing::error!(
"Rate Limit {:?} smaller than chunk cardinality {}",
rate_limit_value,
chunk_cardinality,
);
}
}
rate_limit_quota =
rate_limit_quota.saturating_sub(chunk_cardinality as u32);
}
if let Some(rate_limiter) = &rate_limiter {
rate_limiter.until_n_ready(chunk.cardinality()).await;
}
yield Message::Chunk(chunk);
}
Expand Down

0 comments on commit dc9b33d

Please sign in to comment.