Skip to content

Commit

Permalink
add rate limit interface
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 30, 2023
1 parent d444c64 commit 820666e
Showing 1 changed file with 77 additions and 3 deletions.
80 changes: 77 additions & 3 deletions src/stream/src/executor/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,86 @@

use std::fmt::{Debug, Formatter};
use std::num::NonZeroU32;
use std::time::Duration;

use governor::clock::MonotonicClock;
use governor::{InsufficientCapacity, Quota, RateLimiter};
use governor::state::{InMemoryState, NotKeyed};
use governor::{InsufficientCapacity, Quota, RateLimiter as GovernorRateLimiter};
use risingwave_common::catalog::Schema;
use tokio::sync::Semaphore;
use tokio::time::sleep;

use super::*;

/// Rate limiter. We can abstract this out when there's more usecases for it.
/// Otherwise for now we can keep it local to `flow_control`.
trait RateLimiter {
fn new(rate_limit: u32) -> Self;
async fn until_n_ready(&self, n: usize);
}

struct DefaultRateLimiter {
inner: GovernorRateLimiter<NotKeyed, InMemoryState, MonotonicClock>,
rate_limit: u32,
}

impl RateLimiter for DefaultRateLimiter {
fn new(rate_limit: u32) -> Self {
let quota = Quota::per_second(NonZeroU32::new(rate_limit).unwrap());
let clock = MonotonicClock;
DefaultRateLimiter {
inner: GovernorRateLimiter::direct_with_clock(quota, &clock),
rate_limit,
}
}

async fn until_n_ready(&self, n: usize) {
let result = self
.inner
.until_n_ready(NonZeroU32::new(n as u32).unwrap())
.await;
if let Err(InsufficientCapacity(n)) = result {
tracing::error!(
"Rate Limit {:?} smaller than chunk cardinality {n}",
self.rate_limit,
);
}
}
}

struct SimRateLimiter {
inner: Arc<Semaphore>,
rate_limit: u32,
}

impl RateLimiter for SimRateLimiter {
fn new(rate_limit: u32) -> Self {
Self {
inner: Arc::new(Semaphore::new(rate_limit as usize)),
rate_limit,
}
}

async fn until_n_ready(&self, n: usize) {
if n > self.rate_limit as usize {
tracing::error!(
"Rate Limit {:?} smaller than chunk cardinality {n}",
self.rate_limit,
);
return;
}
for _ in 0..n {
let semaphore_ref = self.inner.clone();
tokio::spawn(async move {
if let Ok(permit) = semaphore_ref.acquire().await {
sleep(Duration::from_secs(1)).await;
permit.forget();
}
});
}
}
}

/// Flow Control Executor is used to control the rate of the input executor.
///
/// Currently it is placed after the `BackfillExecutor`:
Expand Down Expand Up @@ -48,7 +121,7 @@ impl FlowControlExecutor {
let get_rate_limiter = |rate_limit: u32| {
let quota = Quota::per_second(NonZeroU32::new(rate_limit).unwrap());
let clock = MonotonicClock;
RateLimiter::direct_with_clock(quota, &clock)
GovernorRateLimiter::direct_with_clock(quota, &clock)
};
#[cfg(not(madsim))]
let rate_limiter = self.rate_limit.map(get_rate_limiter);
Expand Down Expand Up @@ -90,7 +163,8 @@ impl FlowControlExecutor {
);
}
}
rate_limit_quota = rate_limit_quota.saturating_sub(chunk_cardinality as u32);
rate_limit_quota =
rate_limit_quota.saturating_sub(chunk_cardinality as u32);
}
}
yield Message::Chunk(chunk);
Expand Down

0 comments on commit 820666e

Please sign in to comment.