Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(frontend): rename streaming_rate_limit to backfill_rate_limit for mv backfilling and source_rate_limit for source updates #17796

Merged
merged 7 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e_test/backfill/sink/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
create table t (v1 int);

statement ok
SET STREAMING_RATE_LIMIT = 500;
SET BACKFILL_RATE_LIMIT = 500;

# Should finish in 20s
statement ok
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/backfill/sink/different_pk_and_dist_key.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ statement ok
create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1);

statement ok
set streaming_rate_limit = 1;
set backfill_rate_limit = 1;

statement ok
set background_ddl = true;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/background_ddl/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ statement ok
FLUSH;

statement ok
SET STREAMING_RATE_LIMIT=10000;
SET BACKFILL_RATE_LIMIT=10000;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/background_ddl/sim/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ statement ok
FLUSH;

statement ok
SET STREAMING_RATE_LIMIT=4000;
SET BACKFILL_RATE_LIMIT=4000;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;
Expand Down
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ postmaster license_key
postmaster max_concurrent_creating_streaming_jobs
postmaster pause_on_next_bootstrap
user application_name
user backfill_rate_limit
user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/ddl/drop/drop_creating_mv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ statement ok
flush;

statement ok
set streaming_rate_limit=1;
set backfill_rate_limit=1;

############## Test drop foreground mv
onlyif can-use-recover
Expand Down Expand Up @@ -61,7 +61,7 @@ drop materialized view m1;

############## Make sure the mv can still be successfully created later.
statement ok
set streaming_rate_limit=default;
set backfill_rate_limit=default;

statement ok
set background_ddl=false;
Expand Down
14 changes: 8 additions & 6 deletions e2e_test/ddl/throttle.slt
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
# streaming_rate_limit also applies to create sink and create source, please refer to
# e2e_test/source/basic/kafka.slt and e2e_test/sink/kafka/create_sink.slt for this part
# streaming_rate_limit applies to create source, please refer to
# e2e_test/source/basic/kafka.slt.
# backfill_rate_limit applies to create sink, please refer to
# e2e_test/sink/kafka/create_sink.slt.

statement ok
create table t1 (v1 int);

# tracked in https://github.com/risingwavelabs/risingwave/issues/13474
# create with duplicate streaming_rate_limit
# create with duplicate backfill_rate_limit
statement error Duplicated option
create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000, backfill_rate_limit = 2000) as select * from t1;

# create with unknown fields
statement error unexpected options in WITH clause
create materialized view mv1 with (streaming_rate_limit = 1000, unknown_field = 2000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000, unknown_field = 2000) as select * from t1;

statement ok
create materialized view mv1 with (streaming_rate_limit = 1000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000) as select * from t1;

statement ok
drop materialized view mv1;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ create sink multiple_pk_throttle from t_kafka with (
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar',
streaming_rate_limit = 200
backfill_rate_limit = 200
);

statement ok
Expand All @@ -165,7 +165,7 @@ create sink multiple_pk_throttle_1
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar',
streaming_rate_limit = 200
backfill_rate_limit = 200
);

statement ok
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
insert into t select 2 from generate_series(1, 1000000);

statement ok
set streaming_rate_limit=1;
set backfill_rate_limit=1;

statement ok
set background_ddl=true;
Expand All @@ -25,7 +25,7 @@ statement ok
set background_ddl = false;

statement ok
set streaming_rate_limit=default;
set backfill_rate_limit=default;

statement ok
flush;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/slow_tests/udf/always_retry_python.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ statement ok
flush;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
SET BACKGROUND_DDL=true;
Expand Down Expand Up @@ -57,7 +57,7 @@ SELECT count(*) FROM mv_always_retry where s1 is NULL;
# t

statement ok
SET STREAMING_RATE_LIMIT TO DEFAULT;
SET BACKFILL_RATE_LIMIT TO DEFAULT;

statement ok
SET BACKGROUND_DDL=false;
Expand Down
8 changes: 4 additions & 4 deletions e2e_test/source/basic/alter/rate_limit_source_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ sleep 3s
############## Create MV on source

statement ok
SET STREAMING_RATE_LIMIT=0;
SET SOURCE_RATE_LIMIT=0;

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;
Expand All @@ -56,7 +56,7 @@ statement ok
create materialized view rl_mv3 as select count(*) from kafka_source;

statement ok
SET STREAMING_RATE_LIMIT=default;
SET SOURCE_RATE_LIMIT=default;

############## MVs should have 0 records, since source has (rate_limit = 0)

Expand All @@ -82,11 +82,11 @@ select * from rl_mv3;

skipif in-memory
query I
alter source kafka_source set streaming_rate_limit to 1000;
alter source kafka_source set source_rate_limit to 1000;

skipif in-memory
query I
alter source kafka_source set streaming_rate_limit to default;
alter source kafka_source set source_rate_limit to default;

skipif in-memory
sleep 3s
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/source/basic/alter/rate_limit_table_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ create table kafka_source (v1 int) with (
topic = 'kafka_source',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
streaming_rate_limit = 0
source_rate_limit = 0
) FORMAT PLAIN ENCODE JSON

statement ok
Expand Down Expand Up @@ -61,11 +61,11 @@ select count(*) from kafka_source;

skipif in-memory
query I
alter table kafka_source set streaming_rate_limit to 1000;
alter table kafka_source set source_rate_limit to 1000;

skipif in-memory
query I
alter table kafka_source set streaming_rate_limit to default;
alter table kafka_source set source_rate_limit to default;

skipif in-memory
sleep 3s
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ create table s29 (id bytea) with (
topic = 'kafka_source_format_bytes',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
streaming_rate_limit = 200
source_rate_limit = 200
) FORMAT PLAIN ENCODE BYTES

statement ok
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/streaming/rate_limit/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
CREATE TABLE t1(v1 int, v2 int);

statement ok
SET STREAMING_RATE_LIMIT TO 2000;
SET BACKFILL_RATE_LIMIT TO 2000;

statement ok
CREATE MATERIALIZED VIEW m AS SELECT * FROM t1;
Expand Down Expand Up @@ -35,7 +35,7 @@ FLUSH;
# Test with small rate_limit. In this case, the stream chunk needs to be split

statement ok
SET STREAMING_RATE_LIMIT TO 1;
SET BACKFILL_RATE_LIMIT TO 1;

statement ok
CREATE MATERIALIZED VIEW m AS SELECT * FROM t1;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/streaming/rate_limit/snapshot_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
CREATE TABLE table (i1 int);
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
CREATE TABLE source_table (i1 int)
Expand Down
13 changes: 10 additions & 3 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;

// NOTE(kwannoel): We declare it separately as a constant,
// otherwise seems like it can't infer the type of -1 when written inline.
const DISABLE_STREAMING_RATE_LIMIT: i32 = -1;
const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;

#[serde_as]
/// This is the Session Config of RisingWave.
Expand Down Expand Up @@ -253,11 +254,17 @@ pub struct SessionConfig {
#[parameter(default = STANDARD_CONFORMING_STRINGS)]
standard_conforming_strings: String,

/// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the snapshot read / source read.
#[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
backfill_rate_limit: i32,

/// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the snapshot read / source read.
#[parameter(default = DISABLE_STREAMING_RATE_LIMIT)]
streaming_rate_limit: i32,
#[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
source_rate_limit: i32,

/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub async fn handle_alter_streaming_rate_limit(
let source_id = if let Some(id) = table.associated_source_id {
id.table_id()
} else {
bail!("ALTER STREAMING_RATE_LIMIT is not for table without source")
bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
};
(StatementType::ALTER_SOURCE, source_id)
}
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ pub async fn handle(
} => alter_table_with_sr::handle_refresh_schema(handler_args, name).await,
Statement::AlterTable {
name,
operation: AlterTableOperation::SetStreamingRateLimit { rate_limit },
operation: AlterTableOperation::SetSourceRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
Expand Down Expand Up @@ -814,7 +814,7 @@ pub async fn handle(
Statement::AlterView {
materialized,
name,
operation: AlterViewOperation::SetStreamingRateLimit { rate_limit },
operation: AlterViewOperation::SetBackfillRateLimit { rate_limit },
} if materialized => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
Expand Down Expand Up @@ -946,7 +946,7 @@ pub async fn handle(
} => alter_source_with_sr::handler_refresh_schema(handler_args, name).await,
Statement::AlterSource {
name,
operation: AlterSourceOperation::SetStreamingRateLimit { rate_limit },
operation: AlterSourceOperation::SetSourceRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl StreamCdcTableScan {
// The table desc used by backfill executor
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
disable_backfill: options.disable_backfill,
options: Some(options),
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl StreamNode for StreamFsFetch {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().source_rate_limit,
secret_refs,
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl StreamNode for StreamSource {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().source_rate_limit,
secret_refs,
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl StreamSourceScan {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
secret_refs,
};

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl StreamTableScan {
table_desc: Some(self.core.table_desc.try_to_protobuf()?),
state_table: Some(catalog),
arrangement_table,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
..Default::default()
});

Expand Down
Loading
Loading