diff --git a/e2e_test/source_inline/fs/posix_fs.slt b/e2e_test/source_inline/fs/posix_fs.slt index 79980737a0cc..96fb18e97e66 100644 --- a/e2e_test/source_inline/fs/posix_fs.slt +++ b/e2e_test/source_inline/fs/posix_fs.slt @@ -11,8 +11,19 @@ CREATE TABLE diamonds ( connector = 'posix_fs', match_pattern = 'data*.csv', posix_fs.root = 'e2e_test/source_inline/fs/data', + source_rate_limit = 0 ) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ','); +sleep 1s + +# no output due to rate limit +query TTTT rowsort +select * from diamonds; +---- + +statement ok +ALTER TABLE diamonds SET source_rate_limit TO DEFAULT; + sleep 10s query TTTT rowsort diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 3f8c89d7def1..9d3f46ca77cc 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -119,7 +119,7 @@ impl StreamNode for StreamFsFetch { .map(|c| c.to_protobuf()) .collect_vec(), with_properties, - rate_limit: self.base.ctx().overwrite_options().source_rate_limit, + rate_limit: source_catalog.rate_limit, secret_refs, } }); diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 8496904d44c0..2da096268cff 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -21,6 +21,7 @@ use risingwave_common::hash::VnodeCountCompat; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::{bail, current_cluster_version}; +use risingwave_connector::WithPropertiesExt; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::actor_dispatcher::DispatcherType; use risingwave_meta_model::object::ObjectType; @@ -1276,6 +1277,7 @@ impl CatalogController { MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id) })?; + let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector(); let streaming_job_ids: Vec = if let Some(table_id) = source.optional_associated_table_id { vec![table_id] @@ -1330,6 +1332,20 @@ impl CatalogController { } }); } + if is_fs_source && *fragment_type_mask == PbFragmentTypeFlag::FragmentUnspecified as i32 + { + // when create table with fs connector, the fragment type is unspecified + visit_stream_node(stream_node, |node| { + if let PbNodeBody::StreamFsFetch(node) = node { + if let Some(node_inner) = &mut node.node_inner + && node_inner.source_id == source_id as u32 + { + node_inner.rate_limit = rate_limit; + found = true; + } + } + }); + } found }); diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index c2ccd4300ccf..fcd3a2c5d185 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -156,6 +156,15 @@ impl BuildingFragment { dml_node.table_id = job_id; dml_node.table_version_id = job.table_version_id().unwrap(); } + NodeBody::StreamFsFetch(fs_fetch_node) => { + if let StreamingJob::Table(table_source, _, _) = job { + if let Some(node_inner) = fs_fetch_node.node_inner.as_mut() + && let Some(source) = table_source + { + node_inner.source_id = source.id; + } + } + } NodeBody::Source(source_node) => { match job { // Note: For table without connector, it has a dummy Source node. diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 766c42a5e2c8..19c83702a837 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -258,6 +258,11 @@ impl FsFetchExecutor { actor_to_apply.get(&self.actor_ctx.id) && *new_rate_limit != self.rate_limit_rps { + tracing::debug!( + "updating rate limit from {:?} to {:?}", + self.rate_limit_rps, + *new_rate_limit + ); self.rate_limit_rps = *new_rate_limit; need_rebuild_reader = true; } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 6e583caf739a..6fb72fe15ea3 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -608,6 +608,11 @@ impl SourceExecutor { if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id) && *new_rate_limit != self.rate_limit_rps { + tracing::debug!( + "updating rate limit from {:?} to {:?}", + self.rate_limit_rps, + *new_rate_limit + ); self.rate_limit_rps = *new_rate_limit; // recreate from latest_split_info self.rebuild_stream_reader(&source_desc, &mut stream)