From 820666e7c7316f217644aef11a6160ec3511c3bb Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 30 Oct 2023 19:40:42 +0800 Subject: [PATCH] add rate limit interface --- src/stream/src/executor/flow_control.rs | 80 ++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 3 deletions(-) diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs index 0db50fe1b2132..e449392c9799c 100644 --- a/src/stream/src/executor/flow_control.rs +++ b/src/stream/src/executor/flow_control.rs @@ -14,13 +14,86 @@ use std::fmt::{Debug, Formatter}; use std::num::NonZeroU32; +use std::time::Duration; use governor::clock::MonotonicClock; -use governor::{InsufficientCapacity, Quota, RateLimiter}; +use governor::state::{InMemoryState, NotKeyed}; +use governor::{InsufficientCapacity, Quota, RateLimiter as GovernorRateLimiter}; use risingwave_common::catalog::Schema; +use tokio::sync::Semaphore; +use tokio::time::sleep; use super::*; +/// Rate limiter. We can abstract this out when there's more usecases for it. +/// Otherwise for now we can keep it local to `flow_control`. +trait RateLimiter { + fn new(rate_limit: u32) -> Self; + async fn until_n_ready(&self, n: usize); +} + +struct DefaultRateLimiter { + inner: GovernorRateLimiter, + rate_limit: u32, +} + +impl RateLimiter for DefaultRateLimiter { + fn new(rate_limit: u32) -> Self { + let quota = Quota::per_second(NonZeroU32::new(rate_limit).unwrap()); + let clock = MonotonicClock; + DefaultRateLimiter { + inner: GovernorRateLimiter::direct_with_clock(quota, &clock), + rate_limit, + } + } + + async fn until_n_ready(&self, n: usize) { + let result = self + .inner + .until_n_ready(NonZeroU32::new(n as u32).unwrap()) + .await; + if let Err(InsufficientCapacity(n)) = result { + tracing::error!( + "Rate Limit {:?} smaller than chunk cardinality {n}", + self.rate_limit, + ); + } + } +} + +struct SimRateLimiter { + inner: Arc, + rate_limit: u32, +} + +impl RateLimiter for SimRateLimiter { + fn new(rate_limit: u32) -> Self { + Self { + inner: Arc::new(Semaphore::new(rate_limit as usize)), + rate_limit, + } + } + + async fn until_n_ready(&self, n: usize) { + if n > self.rate_limit as usize { + tracing::error!( + "Rate Limit {:?} smaller than chunk cardinality {n}", + self.rate_limit, + ); + return; + } + for _ in 0..n { + let semaphore_ref = self.inner.clone(); + tokio::spawn(async move { + if let Ok(permit) = semaphore_ref.acquire().await { + sleep(Duration::from_secs(1)).await; + permit.forget(); + } + }); + } + } +} + /// Flow Control Executor is used to control the rate of the input executor. /// /// Currently it is placed after the `BackfillExecutor`: @@ -48,7 +121,7 @@ impl FlowControlExecutor { let get_rate_limiter = |rate_limit: u32| { let quota = Quota::per_second(NonZeroU32::new(rate_limit).unwrap()); let clock = MonotonicClock; - RateLimiter::direct_with_clock(quota, &clock) + GovernorRateLimiter::direct_with_clock(quota, &clock) }; #[cfg(not(madsim))] let rate_limiter = self.rate_limit.map(get_rate_limiter); @@ -90,7 +163,8 @@ impl FlowControlExecutor { ); } } - rate_limit_quota = rate_limit_quota.saturating_sub(chunk_cardinality as u32); + rate_limit_quota = + rate_limit_quota.saturating_sub(chunk_cardinality as u32); } } yield Message::Chunk(chunk);