Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 24, 2024
1 parent 5a66167 commit 2a7e975
Show file tree
Hide file tree
Showing 22 changed files with 65 additions and 39 deletions.
2 changes: 1 addition & 1 deletion e2e_test/backfill/sink/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
create table t (v1 int);

statement ok
SET STREAMING_RATE_LIMIT = 500;
SET BACKFILL_RATE_LIMIT = 500;

# Should finish in 20s
statement ok
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/backfill/sink/different_pk_and_dist_key.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ statement ok
create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1);

statement ok
set streaming_rate_limit = 1;
set backfill_rate_limit = 1;

statement ok
set background_ddl = true;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/background_ddl/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ statement ok
FLUSH;

statement ok
SET STREAMING_RATE_LIMIT=10000;
SET BACKFILL_RATE_LIMIT=10000;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/background_ddl/sim/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ statement ok
FLUSH;

statement ok
SET STREAMING_RATE_LIMIT=4000;
SET BACKFILL_RATE_LIMIT=4000;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;
Expand Down
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ postmaster license_key
postmaster max_concurrent_creating_streaming_jobs
postmaster pause_on_next_bootstrap
user application_name
user backfill_rate_limit
user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/ddl/drop/drop_creating_mv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ statement ok
flush;

statement ok
set streaming_rate_limit=1;
set backfill_rate_limit=1;

############## Test drop foreground mv
onlyif can-use-recover
Expand Down Expand Up @@ -61,7 +61,7 @@ drop materialized view m1;

############## Make sure the mv can still be successfully created later.
statement ok
set streaming_rate_limit=default;
set backfill_rate_limit=default;

statement ok
set background_ddl=false;
Expand Down
14 changes: 8 additions & 6 deletions e2e_test/ddl/throttle.slt
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
# streaming_rate_limit also applies to create sink and create source, please refer to
# e2e_test/source/basic/kafka.slt and e2e_test/sink/kafka/create_sink.slt for this part
# streaming_rate_limit applies to create source, please refer to
# e2e_test/source/basic/kafka.slt.
# backfill_rate_limit applies to create sink, please refer to
# e2e_test/sink/kafka/create_sink.slt.

statement ok
create table t1 (v1 int);

# tracked in https://github.com/risingwavelabs/risingwave/issues/13474
# create with duplicate streaming_rate_limit
# create with duplicate backfill_rate_limit
statement error Duplicated option
create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000, backfill_rate_limit = 2000) as select * from t1;

# create with unknown fields
statement error unexpected options in WITH clause
create materialized view mv1 with (streaming_rate_limit = 1000, unknown_field = 2000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000, unknown_field = 2000) as select * from t1;

statement ok
create materialized view mv1 with (streaming_rate_limit = 1000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000) as select * from t1;

statement ok
drop materialized view mv1;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ create sink multiple_pk_throttle from t_kafka with (
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar',
streaming_rate_limit = 200
backfill_rate_limit = 200
);

statement ok
Expand All @@ -165,7 +165,7 @@ create sink multiple_pk_throttle_1
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar',
streaming_rate_limit = 200
backfill_rate_limit = 200
);

statement ok
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
insert into t select 2 from generate_series(1, 1000000);

statement ok
set streaming_rate_limit=1;
set backfill_rate_limit=1;

statement ok
set background_ddl=true;
Expand All @@ -25,7 +25,7 @@ statement ok
set background_ddl = false;

statement ok
set streaming_rate_limit=default;
set backfill_rate_limit=default;

statement ok
flush;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/slow_tests/udf/always_retry_python.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ statement ok
flush;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
SET BACKGROUND_DDL=true;
Expand Down Expand Up @@ -57,7 +57,7 @@ SELECT count(*) FROM mv_always_retry where s1 is NULL;
# t

statement ok
SET STREAMING_RATE_LIMIT TO DEFAULT;
SET BACKFILL_RATE_LIMIT TO DEFAULT;

statement ok
SET BACKGROUND_DDL=false;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/streaming/rate_limit/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
CREATE TABLE t1(v1 int, v2 int);

statement ok
SET STREAMING_RATE_LIMIT TO 2000;
SET BACKFILL_RATE_LIMIT TO 2000;

statement ok
CREATE MATERIALIZED VIEW m AS SELECT * FROM t1;
Expand Down Expand Up @@ -35,7 +35,7 @@ FLUSH;
# Test with small rate_limit. In this case, the stream chunk needs to be split

statement ok
SET STREAMING_RATE_LIMIT TO 1;
SET BACKFILL_RATE_LIMIT TO 1;

statement ok
CREATE MATERIALIZED VIEW m AS SELECT * FROM t1;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/streaming/rate_limit/snapshot_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
CREATE TABLE table (i1 int);
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
CREATE TABLE source_table (i1 int)
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ pub async fn handle(
Statement::AlterView {
materialized,
name,
operation: AlterViewOperation::SetStreamingRateLimit { rate_limit },
operation: AlterViewOperation::SetBackfillRateLimit { rate_limit },
} if materialized => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl StreamCdcTableScan {
// The table desc used by backfill executor
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
disable_backfill: options.disable_backfill,
options: Some(options),
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl StreamSourceScan {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
secret_refs,
};

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl StreamTableScan {
table_desc: Some(self.core.table_desc.try_to_protobuf()?),
state_table: Some(catalog),
arrangement_table,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
..Default::default()
});

Expand Down
8 changes: 4 additions & 4 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ pub enum AlterViewOperation {
parallelism: SetVariableValue,
deferred: bool,
},
/// `SET STREAMING_RATE_LIMIT TO <rate_limit>`
SetStreamingRateLimit {
/// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
SetBackfillRateLimit {
rate_limit: i32,
},
}
Expand Down Expand Up @@ -341,8 +341,8 @@ impl fmt::Display for AlterViewOperation {
if *deferred { " DEFERRED" } else { "" }
)
}
AlterViewOperation::SetStreamingRateLimit { rate_limit } => {
write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit)
AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ define_keywords!(
AUTHORIZATION,
AUTO,
AVG,
BACKFILL_RATE_LIMIT,
BASE64,
BEGIN,
BEGIN_FRAME,
Expand Down
28 changes: 25 additions & 3 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3133,6 +3133,28 @@ impl Parser<'_> {
})
}

/// BACKFILL_RATE_LIMIT = default | NUMBER
/// BACKFILL_RATE_LIMIT TO default | NUMBER
pub fn parse_alter_backfill_rate_limit(&mut self) -> PResult<Option<i32>> {
if !self.parse_keyword(Keyword::BACKFILL_RATE_LIMIT) {
return Ok(None);
}
if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() {
return self.expected("TO or = after ALTER TABLE SET BACKFILL_RATE_LIMIT");
}
let rate_limit = if self.parse_keyword(Keyword::DEFAULT) {
-1
} else {
let s = self.parse_number_value()?;
if let Ok(n) = s.parse::<i32>() {
n
} else {
return self.expected("number or DEFAULT");
}
};
Ok(Some(rate_limit))
}

/// STREAMING_RATE_LIMIT = default | NUMBER
/// STREAMING_RATE_LIMIT TO default | NUMBER
pub fn parse_alter_streaming_rate_limit(&mut self) -> PResult<Option<i32>> {
Expand Down Expand Up @@ -3229,11 +3251,11 @@ impl Parser<'_> {
deferred,
}
} else if materialized
&& let Some(rate_limit) = self.parse_alter_streaming_rate_limit()?
&& let Some(rate_limit) = self.parse_alter_backfill_rate_limit()?
{
AlterViewOperation::SetStreamingRateLimit { rate_limit }
AlterViewOperation::SetBackfillRateLimit { rate_limit }
} else {
return self.expected("SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET");
return self.expected("SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET");
}
} else {
return self.expected(&format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async fn test_arrangement_backfill_replication() -> Result<()> {
session
.run("SET STREAMING_USE_ARRANGEMENT_BACKFILL=true")
.await?;
session.run("SET STREAMING_RATE_LIMIT=30").await?;
session.run("SET BACKFILL_RATE_LIMIT=30").await?;
session
.run("create materialized view m1 as select * from t")
.await?;
Expand Down Expand Up @@ -251,7 +251,7 @@ async fn test_arrangement_backfill_progress() -> Result<()> {
// Create arrangement backfill with rate limit
session.run("SET STREAMING_PARALLELISM=1").await?;
session.run("SET BACKGROUND_DDL=true").await?;
session.run("SET STREAMING_RATE_LIMIT=1").await?;
session.run("SET BACKFILL_RATE_LIMIT=1").await?;
session
.run("CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t")
.await?;
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn test_enable_arrangement_backfill() -> Result<()> {
async fn test_recovery_cancels_foreground_ddl() -> Result<()> {
let mut cluster = Cluster::start(Configuration::enable_arrangement_backfill()).await?;
let mut session = cluster.start_session();
session.run("SET STREAMING_RATE_LIMIT=1").await?;
session.run("SET BACKFILL_RATE_LIMIT=1").await?;
session.run("CREATE TABLE t(v1 int);").await?;
session
.run("INSERT INTO t select * from generate_series(1, 100000);")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ const DROP_TABLE: &str = "DROP TABLE t;";
const SEED_TABLE_500: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);";
const SEED_TABLE_100: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100);";
const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;";
const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;";
const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;";
const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=DEFAULT;";
const SET_RATE_LIMIT_2: &str = "SET BACKFILL_RATE_LIMIT=2;";
const SET_RATE_LIMIT_1: &str = "SET BACKFILL_RATE_LIMIT=1;";
const RESET_RATE_LIMIT: &str = "SET BACKFILL_RATE_LIMIT=DEFAULT;";
const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";
const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;";
const WAIT: &str = "WAIT;";
Expand Down

0 comments on commit 2a7e975

Please sign in to comment.