Skip to content

Commit

Permalink
chore: Refactor FsFetchExecutorBuilder to handle rate limiting (#13006)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Oct 23, 2023
1 parent 903daed commit 2563300
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use risingwave_storage::StateStore;

use crate::error::StreamResult;
use crate::executor::{
BoxedExecutor, Executor, FsFetchExecutor, SourceStateTableHandler, StreamSourceCore,
BoxedExecutor, Executor, FlowControlExecutor, FsFetchExecutor, SourceStateTableHandler,
StreamSourceCore,
};
use crate::from_proto::ExecutorBuilder;
use crate::task::{ExecutorParams, LocalStreamManagerCore};
Expand Down Expand Up @@ -99,7 +100,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
state_table_handler,
);

Ok(FsFetchExecutor::new(
let executor = FsFetchExecutor::new(
params.actor_context,
schema,
params.pk_indices,
Expand All @@ -109,6 +110,11 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
source_ctrl_opts,
params.env.connector_params(),
)
.boxed())
.boxed();

if let Ok(rate_limit) = source.get_rate_limit() {
return Ok(FlowControlExecutor::new(executor, *rate_limit).boxed());
}
Ok(executor)
}
}

0 comments on commit 2563300

Please sign in to comment.