diff --git a/e2e_test/source/basic/alter/rate_limit_source_kafka.slt b/e2e_test/source/basic/alter/rate_limit_source_kafka.slt index e76cf72c0220a..ac56a7f48e3a5 100644 --- a/e2e_test/source/basic/alter/rate_limit_source_kafka.slt +++ b/e2e_test/source/basic/alter/rate_limit_source_kafka.slt @@ -31,6 +31,7 @@ create source kafka_source (v1 int) with ( topic = 'kafka_source', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest', + source_rate_limit = 0, ) FORMAT PLAIN ENCODE JSON statement ok @@ -43,8 +44,9 @@ sleep 3s ############## Create MV on source +# This should be ignored. statement ok -SET SOURCE_RATE_LIMIT=0; +SET SOURCE_RATE_LIMIT=1000; statement ok create materialized view rl_mv1 as select count(*) from kafka_source; @@ -55,9 +57,6 @@ create materialized view rl_mv2 as select count(*) from kafka_source; statement ok create materialized view rl_mv3 as select count(*) from kafka_source; -statement ok -SET SOURCE_RATE_LIMIT=default; - ############## MVs should have 0 records, since source has (rate_limit = 0) statement ok diff --git a/e2e_test/source/basic/alter/rate_limit_table_kafka.slt b/e2e_test/source/basic/alter/rate_limit_table_kafka.slt index d93687799cac7..1a23bbf1009de 100644 --- a/e2e_test/source/basic/alter/rate_limit_table_kafka.slt +++ b/e2e_test/source/basic/alter/rate_limit_table_kafka.slt @@ -13,18 +13,21 @@ create sink kafka_sink from kafka_seed_data with ( properties.bootstrap.server = 'message_queue:29092', - topic = 'kafka_source', + topic = 'rate_limit_source_kafka_0', type = 'append-only', force_append_only='true', connector = 'kafka' ); +# topic may not be created yet +sleep 4s + ############## Source from kafka (rate_limit = 0) statement ok create table kafka_source (v1 int) with ( connector = 'kafka', - topic = 'kafka_source', + topic = 'rate_limit_source_kafka_0', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest', source_rate_limit = 0 @@ -63,21 +66,34 @@ skipif in-memory query I alter table kafka_source set source_rate_limit to 1000; +skipif in-memory +sleep 3s + skipif in-memory query I -alter table kafka_source set source_rate_limit to default; +select count(*) > 1 from kafka_source; +---- +t + +############## New MV created should have rate limit = 1000. + +statement ok +create materialized view mv as select * from kafka_source; skipif in-memory sleep 3s skipif in-memory query I -select count(*) > 1 from kafka_source; +select count(*) > 1 from mv; ---- t ############## Cleanup +statement ok +drop materialized view mv; + statement ok drop table kafka_source; diff --git a/proto/catalog.proto b/proto/catalog.proto index d407dbe3936a4..e13a99ac3c2f9 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -135,6 +135,8 @@ message Source { // Per-source catalog version, used by schema change. uint64 version = 100; + + optional uint32 rate_limit = 101; } enum SinkType { diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index f060fad72c78d..8e64a6db4e2b9 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -44,6 +44,7 @@ pub struct SourceCatalog { pub version: SourceVersionId, pub created_at_cluster_version: Option, pub initialized_at_cluster_version: Option, + pub rate_limit: Option, } impl SourceCatalog { @@ -77,6 +78,7 @@ impl SourceCatalog { created_at_cluster_version: self.created_at_cluster_version.clone(), initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), secret_refs, + rate_limit: self.rate_limit, } } @@ -121,6 +123,7 @@ impl From<&PbSource> for SourceCatalog { let version = prost.version; let connection_id = prost.connection_id; + let rate_limit = prost.rate_limit; Self { id, @@ -141,6 +144,7 @@ impl From<&PbSource> for SourceCatalog { version, created_at_cluster_version: prost.created_at_cluster_version.clone(), initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(), + rate_limit, } } } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 32628a1abe8e0..1c9d0b5893421 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -85,7 +85,9 @@ use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; use crate::session::SessionImpl; -use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options}; +use crate::utils::{ + resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options, OverwriteOptions, +}; use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions, WithOptionsSecResolved}; pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; @@ -1441,6 +1443,7 @@ pub async fn bind_create_source_or_table_with_connector( col_id_gen: &mut ColumnIdGenerator, // `true` for "create source", `false` for "create table with connector" is_create_source: bool, + source_rate_limit: Option, ) -> Result<(SourceCatalog, DatabaseId, SchemaId)> { let session = &handler_args.session; let db_name: &str = session.database(); @@ -1581,15 +1584,17 @@ pub async fn bind_create_source_or_table_with_connector( version: INITIAL_SOURCE_VERSION_ID, created_at_cluster_version: None, initialized_at_cluster_version: None, + rate_limit: source_rate_limit, }; Ok((source, database_id, schema_id)) } pub async fn handle_create_source( - handler_args: HandlerArgs, + mut handler_args: HandlerArgs, stmt: CreateSourceStatement, ) -> Result { let session = handler_args.session.clone(); + let overwrite_options = OverwriteOptions::new(&mut handler_args); if let Either::Right(resp) = session.check_relation_name_duplicated( stmt.source_name.clone(), @@ -1638,6 +1643,7 @@ pub async fn handle_create_source( stmt.include_column_options, &mut col_id_gen, true, + overwrite_options.source_rate_limit, ) .await?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index ac58bb808867d..6e57ab52c87fe 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -67,6 +67,7 @@ use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot}; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; +use crate::utils::OverwriteOptions; use crate::{Binder, TableCatalog, WithOptions}; /// Column ID generator for a new table or a new version of an existing table to alter. @@ -459,7 +460,7 @@ pub fn bind_pk_and_row_id_on_relation( /// stream source. #[allow(clippy::too_many_arguments)] pub(crate) async fn gen_create_table_plan_with_source( - handler_args: HandlerArgs, + mut handler_args: HandlerArgs, explain_options: ExplainOptions, table_name: ObjectName, column_defs: Vec, @@ -490,6 +491,8 @@ pub(crate) async fn gen_create_table_plan_with_source( let (columns_from_resolve_source, source_info) = bind_columns_from_source(session, &source_schema, Either::Left(&with_properties)).await?; + let overwrite_options = OverwriteOptions::new(&mut handler_args); + let rate_limit = overwrite_options.source_rate_limit; let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector( handler_args.clone(), table_name, @@ -504,6 +507,7 @@ pub(crate) async fn gen_create_table_plan_with_source( include_column_options, &mut col_id_gen, false, + rate_limit, ) .await?; @@ -673,6 +677,7 @@ fn gen_table_plan_inner( let session = context.session_ctx().clone(); let retention_seconds = context.with_options().retention_seconds(); let is_external_source = source_catalog.is_some(); + let source_node: PlanRef = LogicalSource::new( source_catalog.map(|source| Rc::new(source.clone())), columns.clone(), diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 159748bbdc18a..d7808e4be51ce 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -112,7 +112,7 @@ impl StreamNode for StreamSource { .map(|c| c.to_protobuf()) .collect_vec(), with_properties, - rate_limit: self.base.ctx().overwrite_options().source_rate_limit, + rate_limit: source_catalog.rate_limit, secret_refs, } }); diff --git a/src/meta/model_v2/migration/README.md b/src/meta/model_v2/migration/README.md index c7677b2272f84..2ed136b8d60f1 100644 --- a/src/meta/model_v2/migration/README.md +++ b/src/meta/model_v2/migration/README.md @@ -50,3 +50,7 @@ ```sh cargo run -- status ``` + +## Adding a migration + +- Add a new column to some catalogs. You can checkout the migration [m20240617_070131_index_column_properties.rs](src/m20240617_070131_index_column_properties.rs) as a reference. \ No newline at end of file diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 79cf3c05e7e08..dc1f3dd3f9808 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -18,6 +18,7 @@ mod m20240630_131430_remove_parallel_unit; mod m20240701_060504_hummock_time_travel; mod m20240702_080451_system_param_value; mod m20240702_084927_unnecessary_fk; +mod m20240806_143329_add_rate_limit_to_source_catalog; pub struct Migrator; @@ -34,13 +35,14 @@ impl MigratorTrait for Migrator { Box::new(m20240418_142249_function_runtime::Migration), Box::new(m20240506_112555_subscription_partial_ckpt::Migration), Box::new(m20240525_090457_secret::Migration), - Box::new(m20240618_072634_function_compressed_binary::Migration), Box::new(m20240617_070131_index_column_properties::Migration), Box::new(m20240617_071625_sink_into_table_column::Migration), + Box::new(m20240618_072634_function_compressed_binary::Migration), Box::new(m20240630_131430_remove_parallel_unit::Migration), + Box::new(m20240701_060504_hummock_time_travel::Migration), Box::new(m20240702_080451_system_param_value::Migration), Box::new(m20240702_084927_unnecessary_fk::Migration), - Box::new(m20240701_060504_hummock_time_travel::Migration), + Box::new(m20240806_143329_add_rate_limit_to_source_catalog::Migration), ] } } diff --git a/src/meta/model_v2/migration/src/m20240806_143329_add_rate_limit_to_source_catalog.rs b/src/meta/model_v2/migration/src/m20240806_143329_add_rate_limit_to_source_catalog.rs new file mode 100644 index 0000000000000..7049e40e6d66c --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240806_143329_add_rate_limit_to_source_catalog.rs @@ -0,0 +1,35 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Source::Table) + .add_column(ColumnDef::new(Source::RateLimit).integer()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Source::Table) + .drop_column(Source::RateLimit) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Source { + Table, + RateLimit, +} diff --git a/src/meta/model_v2/src/source.rs b/src/meta/model_v2/src/source.rs index 9dd4bb5b1a11a..39c7dc556cf3e 100644 --- a/src/meta/model_v2/src/source.rs +++ b/src/meta/model_v2/src/source.rs @@ -41,6 +41,7 @@ pub struct Model { pub version: i64, // `secret_ref` stores the mapping info mapping from property name to secret id and type. pub secret_ref: Option, + pub rate_limit: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -104,6 +105,7 @@ impl From for ActiveModel { connection_id: Set(source.connection_id.map(|id| id as _)), version: Set(source.version as _), secret_ref: Set(Some(SecretRef::from(source.secret_refs))), + rate_limit: Set(source.rate_limit.map(|id| id as _)), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 7228ddbb36eb9..43078ea812723 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -200,6 +200,7 @@ impl From> for PbSource { initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, secret_refs: secret_ref_map, + rate_limit: value.0.rate_limit.map(|v| v as _), } } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 6447eccbb7abe..f5d97eca1c564 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1214,12 +1214,23 @@ impl CatalogController { let inner = self.inner.read().await; let txn = inner.db.begin().await?; - let source = Source::find_by_id(source_id) + { + let active_source = source::ActiveModel { + source_id: Set(source_id), + rate_limit: Set(rate_limit.map(|v| v as i32)), + ..Default::default() + }; + active_source.update(&txn).await?; + } + + let (source, obj) = Source::find_by_id(source_id) + .find_also_related(Object) .one(&txn) .await? .ok_or_else(|| { MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id) })?; + let streaming_job_ids: Vec = if let Some(table_id) = source.optional_associated_table_id { vec![table_id] @@ -1295,6 +1306,19 @@ impl CatalogController { txn.commit().await?; + let relation_info = PbRelationInfo::Source(ObjectModel(source, obj.unwrap()).into()); + let relation = PbRelation { + relation_info: Some(relation_info), + }; + let _version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::RelationGroup(PbRelationGroup { + relations: vec![relation], + }), + ) + .await; + Ok(fragment_actors) } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 360529e58b72a..25c96dd4ec630 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -4847,4 +4847,36 @@ impl CatalogManager { } users_need_update } + + pub async fn update_source_rate_limit_by_source_id( + &self, + source_id: SourceId, + rate_limit: Option, + ) -> MetaResult<()> { + let source_relation: PbSource; + { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let mut sources = BTreeMapTransaction::new(&mut database_core.sources); + let mut source = sources.get_mut(source_id); + let Some(source_catalog) = source.as_mut() else { + bail!("source {} not found", source_id) + }; + source_relation = source_catalog.clone(); + source_catalog.rate_limit = rate_limit; + commit_meta!(self, sources)?; + } + + let _version = self + .notify_frontend( + Operation::Update, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: RelationInfo::Source(source_relation).into(), + }], + }), + ) + .await; + Ok(()) + } } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index e99259810b117..f634024a3a605 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -802,6 +802,9 @@ impl MetadataManager { ) -> MetaResult>> { match self { MetadataManager::V1(mgr) => { + mgr.catalog_manager + .update_source_rate_limit_by_source_id(source_id as u32, rate_limit) + .await?; mgr.fragment_manager .update_source_rate_limit_by_source_id(source_id, rate_limit) .await