Skip to content

Commit

Permalink
fix(source): persist rate_limit to source catalog (#16472)
Browse files Browse the repository at this point in the history
Co-authored-by: August <[email protected]>
  • Loading branch information
kwannoel and yezizp2012 authored Aug 8, 2024
1 parent a4d890e commit de32dab
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 15 deletions.
7 changes: 3 additions & 4 deletions e2e_test/source/basic/alter/rate_limit_source_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ create source kafka_source (v1 int) with (
topic = 'kafka_source',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
source_rate_limit = 0,
) FORMAT PLAIN ENCODE JSON

statement ok
Expand All @@ -43,8 +44,9 @@ sleep 3s

############## Create MV on source

# This should be ignored.
statement ok
SET SOURCE_RATE_LIMIT=0;
SET SOURCE_RATE_LIMIT=1000;

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;
Expand All @@ -55,9 +57,6 @@ create materialized view rl_mv2 as select count(*) from kafka_source;
statement ok
create materialized view rl_mv3 as select count(*) from kafka_source;

statement ok
SET SOURCE_RATE_LIMIT=default;

############## MVs should have 0 records, since source has (rate_limit = 0)

statement ok
Expand Down
24 changes: 20 additions & 4 deletions e2e_test/source/basic/alter/rate_limit_table_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ create sink kafka_sink
from
kafka_seed_data with (
properties.bootstrap.server = 'message_queue:29092',
topic = 'kafka_source',
topic = 'rate_limit_source_kafka_0',
type = 'append-only',
force_append_only='true',
connector = 'kafka'
);

# topic may not be created yet
sleep 4s

############## Source from kafka (rate_limit = 0)

statement ok
create table kafka_source (v1 int) with (
connector = 'kafka',
topic = 'kafka_source',
topic = 'rate_limit_source_kafka_0',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
source_rate_limit = 0
Expand Down Expand Up @@ -63,21 +66,34 @@ skipif in-memory
query I
alter table kafka_source set source_rate_limit to 1000;

skipif in-memory
sleep 3s

skipif in-memory
query I
alter table kafka_source set source_rate_limit to default;
select count(*) > 1 from kafka_source;
----
t

############## New MV created should have rate limit = 1000.

statement ok
create materialized view mv as select * from kafka_source;

skipif in-memory
sleep 3s

skipif in-memory
query I
select count(*) > 1 from kafka_source;
select count(*) > 1 from mv;
----
t

############## Cleanup

statement ok
drop materialized view mv;

statement ok
drop table kafka_source;

Expand Down
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ message Source {

// Per-source catalog version, used by schema change.
uint64 version = 100;

optional uint32 rate_limit = 101;
}

enum SinkType {
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct SourceCatalog {
pub version: SourceVersionId,
pub created_at_cluster_version: Option<String>,
pub initialized_at_cluster_version: Option<String>,
pub rate_limit: Option<u32>,
}

impl SourceCatalog {
Expand Down Expand Up @@ -77,6 +78,7 @@ impl SourceCatalog {
created_at_cluster_version: self.created_at_cluster_version.clone(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
secret_refs,
rate_limit: self.rate_limit,
}
}

Expand Down Expand Up @@ -121,6 +123,7 @@ impl From<&PbSource> for SourceCatalog {
let version = prost.version;

let connection_id = prost.connection_id;
let rate_limit = prost.rate_limit;

Self {
id,
Expand All @@ -141,6 +144,7 @@ impl From<&PbSource> for SourceCatalog {
version,
created_at_cluster_version: prost.created_at_cluster_version.clone(),
initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
rate_limit,
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
use crate::session::SessionImpl;
use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options};
use crate::utils::{
resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options, OverwriteOptions,
};
use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions, WithOptionsSecResolved};

pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector";
Expand Down Expand Up @@ -1441,6 +1443,7 @@ pub async fn bind_create_source_or_table_with_connector(
col_id_gen: &mut ColumnIdGenerator,
// `true` for "create source", `false` for "create table with connector"
is_create_source: bool,
source_rate_limit: Option<u32>,
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
let session = &handler_args.session;
let db_name: &str = session.database();
Expand Down Expand Up @@ -1581,15 +1584,17 @@ pub async fn bind_create_source_or_table_with_connector(
version: INITIAL_SOURCE_VERSION_ID,
created_at_cluster_version: None,
initialized_at_cluster_version: None,
rate_limit: source_rate_limit,
};
Ok((source, database_id, schema_id))
}

pub async fn handle_create_source(
handler_args: HandlerArgs,
mut handler_args: HandlerArgs,
stmt: CreateSourceStatement,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();
let overwrite_options = OverwriteOptions::new(&mut handler_args);

if let Either::Right(resp) = session.check_relation_name_duplicated(
stmt.source_name.clone(),
Expand Down Expand Up @@ -1638,6 +1643,7 @@ pub async fn handle_create_source(
stmt.include_column_options,
&mut col_id_gen,
true,
overwrite_options.source_rate_limit,
)
.await?;

Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
use crate::session::SessionImpl;
use crate::stream_fragmenter::build_graph;
use crate::utils::OverwriteOptions;
use crate::{Binder, TableCatalog, WithOptions};

/// Column ID generator for a new table or a new version of an existing table to alter.
Expand Down Expand Up @@ -459,7 +460,7 @@ pub fn bind_pk_and_row_id_on_relation(
/// stream source.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn gen_create_table_plan_with_source(
handler_args: HandlerArgs,
mut handler_args: HandlerArgs,
explain_options: ExplainOptions,
table_name: ObjectName,
column_defs: Vec<ColumnDef>,
Expand Down Expand Up @@ -490,6 +491,8 @@ pub(crate) async fn gen_create_table_plan_with_source(
let (columns_from_resolve_source, source_info) =
bind_columns_from_source(session, &source_schema, Either::Left(&with_properties)).await?;

let overwrite_options = OverwriteOptions::new(&mut handler_args);
let rate_limit = overwrite_options.source_rate_limit;
let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector(
handler_args.clone(),
table_name,
Expand All @@ -504,6 +507,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
include_column_options,
&mut col_id_gen,
false,
rate_limit,
)
.await?;

Expand Down Expand Up @@ -673,6 +677,7 @@ fn gen_table_plan_inner(
let session = context.session_ctx().clone();
let retention_seconds = context.with_options().retention_seconds();
let is_external_source = source_catalog.is_some();

let source_node: PlanRef = LogicalSource::new(
source_catalog.map(|source| Rc::new(source.clone())),
columns.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl StreamNode for StreamSource {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().source_rate_limit,
rate_limit: source_catalog.rate_limit,
secret_refs,
}
});
Expand Down
4 changes: 4 additions & 0 deletions src/meta/model_v2/migration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@
```sh
cargo run -- status
```

## Adding a migration

- Add a new column to some catalogs. You can checkout the migration [m20240617_070131_index_column_properties.rs](src/m20240617_070131_index_column_properties.rs) as a reference.
6 changes: 4 additions & 2 deletions src/meta/model_v2/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod m20240630_131430_remove_parallel_unit;
mod m20240701_060504_hummock_time_travel;
mod m20240702_080451_system_param_value;
mod m20240702_084927_unnecessary_fk;
mod m20240806_143329_add_rate_limit_to_source_catalog;

pub struct Migrator;

Expand All @@ -34,13 +35,14 @@ impl MigratorTrait for Migrator {
Box::new(m20240418_142249_function_runtime::Migration),
Box::new(m20240506_112555_subscription_partial_ckpt::Migration),
Box::new(m20240525_090457_secret::Migration),
Box::new(m20240618_072634_function_compressed_binary::Migration),
Box::new(m20240617_070131_index_column_properties::Migration),
Box::new(m20240617_071625_sink_into_table_column::Migration),
Box::new(m20240618_072634_function_compressed_binary::Migration),
Box::new(m20240630_131430_remove_parallel_unit::Migration),
Box::new(m20240701_060504_hummock_time_travel::Migration),
Box::new(m20240702_080451_system_param_value::Migration),
Box::new(m20240702_084927_unnecessary_fk::Migration),
Box::new(m20240701_060504_hummock_time_travel::Migration),
Box::new(m20240806_143329_add_rate_limit_to_source_catalog::Migration),
]
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Source::Table)
.add_column(ColumnDef::new(Source::RateLimit).integer())
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Source::Table)
.drop_column(Source::RateLimit)
.to_owned(),
)
.await
}
}

#[derive(DeriveIden)]
enum Source {
Table,
RateLimit,
}
2 changes: 2 additions & 0 deletions src/meta/model_v2/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct Model {
pub version: i64,
// `secret_ref` stores the mapping info mapping from property name to secret id and type.
pub secret_ref: Option<SecretRef>,
pub rate_limit: Option<i32>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -104,6 +105,7 @@ impl From<PbSource> for ActiveModel {
connection_id: Set(source.connection_id.map(|id| id as _)),
version: Set(source.version as _),
secret_ref: Set(Some(SecretRef::from(source.secret_refs))),
rate_limit: Set(source.rate_limit.map(|id| id as _)),
}
}
}
1 change: 1 addition & 0 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl From<ObjectModel<source::Model>> for PbSource {
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
secret_refs: secret_ref_map,
rate_limit: value.0.rate_limit.map(|v| v as _),
}
}
}
Expand Down
26 changes: 25 additions & 1 deletion src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,12 +1214,23 @@ impl CatalogController {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;

let source = Source::find_by_id(source_id)
{
let active_source = source::ActiveModel {
source_id: Set(source_id),
rate_limit: Set(rate_limit.map(|v| v as i32)),
..Default::default()
};
active_source.update(&txn).await?;
}

let (source, obj) = Source::find_by_id(source_id)
.find_also_related(Object)
.one(&txn)
.await?
.ok_or_else(|| {
MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
})?;

let streaming_job_ids: Vec<ObjectId> =
if let Some(table_id) = source.optional_associated_table_id {
vec![table_id]
Expand Down Expand Up @@ -1295,6 +1306,19 @@ impl CatalogController {

txn.commit().await?;

let relation_info = PbRelationInfo::Source(ObjectModel(source, obj.unwrap()).into());
let relation = PbRelation {
relation_info: Some(relation_info),
};
let _version = self
.notify_frontend(
NotificationOperation::Update,
NotificationInfo::RelationGroup(PbRelationGroup {
relations: vec![relation],
}),
)
.await;

Ok(fragment_actors)
}

Expand Down
Loading

0 comments on commit de32dab

Please sign in to comment.