Skip to content

Commit

Permalink
refactor: always apply flow control on source/chain (#13057)
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored Oct 26, 2023
1 parent 71851d6 commit 0f9e783
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 27 deletions.
31 changes: 18 additions & 13 deletions src/stream/src/executor/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,42 @@ use super::*;
/// It is used to throttle problematic MVs that are consuming too much resources.
pub struct FlowControlExecutor {
input: BoxedExecutor,
rate_limit: u32,
rate_limit: Option<u32>,
}

impl FlowControlExecutor {
#[allow(clippy::too_many_arguments)]
pub fn new(input: Box<dyn Executor>, rate_limit: u32) -> Self {
pub fn new(input: Box<dyn Executor>, rate_limit: Option<u32>) -> Self {
#[cfg(madsim)]
println!("FlowControlExecutor rate limiter is disabled in madsim as it will spawn system threads");
Self { input, rate_limit }
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(self) {
let quota = Quota::per_second(NonZeroU32::new(self.rate_limit).unwrap());
let clock = MonotonicClock;
let rate_limiter = RateLimiter::direct_with_clock(quota, &clock);
let get_rate_limiter = |rate_limit: u32| {
let quota = Quota::per_second(NonZeroU32::new(rate_limit).unwrap());
let clock = MonotonicClock;
RateLimiter::direct_with_clock(quota, &clock)
};
let rate_limiter = self.rate_limit.map(get_rate_limiter);
#[for_await]
for msg in self.input.execute() {
let msg = msg?;
match msg {
Message::Chunk(chunk) => {
#[cfg(not(madsim))]
{
let result = rate_limiter
.until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap())
.await;
if let Err(InsufficientCapacity(n)) = result {
tracing::error!(
"Rate Limit {} smaller than chunk cardinality {n}",
self.rate_limit,
);
if let Some(rate_limiter) = &rate_limiter {
let result = rate_limiter
.until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap())
.await;
if let Err(InsufficientCapacity(n)) = result {
tracing::error!(
"Rate Limit {:?} smaller than chunk cardinality {n}",
self.rate_limit,
);
}
}
}
yield Message::Chunk(chunk);
Expand Down
7 changes: 2 additions & 5 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,7 @@ impl ExecutorBuilder for ChainExecutorBuilder {
}
ChainType::ChainUnspecified => unreachable!(),
};
if let Ok(rate_limit) = node.get_rate_limit() {
Ok(FlowControlExecutor::new(executor, *rate_limit).boxed())
} else {
Ok(executor)
}
let rate_limit = node.get_rate_limit().cloned().ok();
Ok(FlowControlExecutor::new(executor, rate_limit).boxed())
}
}
6 changes: 2 additions & 4 deletions src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
)
.boxed();

if let Ok(rate_limit) = source.get_rate_limit() {
return Ok(FlowControlExecutor::new(executor, *rate_limit).boxed());
}
Ok(executor)
let rate_limit = source.get_rate_limit().cloned().ok();
Ok(FlowControlExecutor::new(executor, rate_limit).boxed())
}
}
7 changes: 2 additions & 5 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,8 @@ impl ExecutorBuilder for SourceExecutorBuilder {
}
}
};
if let Ok(rate_limit) = source.get_rate_limit() {
Ok(FlowControlExecutor::new(executor, *rate_limit).boxed())
} else {
Ok(executor)
}
let rate_limit = source.get_rate_limit().cloned().ok();
Ok(FlowControlExecutor::new(executor, rate_limit).boxed())
} else {
// If there is no external stream source, then no data should be persisted. We pass a
// `PanicStateStore` type here for indication.
Expand Down

0 comments on commit 0f9e783

Please sign in to comment.