diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index ecf242f044b04..b6df84c8560e4 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -23,7 +23,8 @@ use risingwave_storage::StateStore; use crate::error::StreamResult; use crate::executor::{ - BoxedExecutor, Executor, FsFetchExecutor, SourceStateTableHandler, StreamSourceCore, + BoxedExecutor, Executor, FlowControlExecutor, FsFetchExecutor, SourceStateTableHandler, + StreamSourceCore, }; use crate::from_proto::ExecutorBuilder; use crate::task::{ExecutorParams, LocalStreamManagerCore}; @@ -99,7 +100,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { state_table_handler, ); - Ok(FsFetchExecutor::new( + let executor = FsFetchExecutor::new( params.actor_context, schema, params.pk_indices, @@ -109,6 +110,11 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { source_ctrl_opts, params.env.connector_params(), ) - .boxed()) + .boxed(); + + if let Ok(rate_limit) = source.get_rate_limit() { + return Ok(FlowControlExecutor::new(executor, *rate_limit).boxed()); + } + Ok(executor) } }