Skip to content

Commit

Permalink
feat(frontend): support alter rate limit in mv, source, table with so…
Browse files Browse the repository at this point in the history
…urce (#16399)
  • Loading branch information
kwannoel authored Apr 29, 2024
1 parent 8ee413b commit 03e6283
Show file tree
Hide file tree
Showing 15 changed files with 480 additions and 6 deletions.
4 changes: 4 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ risedev slt './e2e_test/source/basic/*.slt'
risedev slt './e2e_test/source/basic/old_row_format_syntax/*.slt'
risedev slt './e2e_test/source/basic/alter/kafka.slt'

echo "--- e2e, kafka alter source rate limit"
risedev slt './e2e_test/source/basic/alter/rate_limit_source_kafka.slt'
risedev slt './e2e_test/source/basic/alter/rate_limit_table_kafka.slt'

echo "--- e2e, kafka alter source"
chmod +x ./scripts/source/prepare_data_after_alter.sh
./scripts/source/prepare_data_after_alter.sh 2
Expand Down
130 changes: 130 additions & 0 deletions e2e_test/source/basic/alter/rate_limit_source_kafka.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

############## Sink into kafka

statement ok
create sink kafka_sink
from
kafka_seed_data with (
properties.bootstrap.server = 'message_queue:29092',
topic = 'kafka_source',
type = 'append-only',
force_append_only='true',
connector = 'kafka'
);

############## Source from kafka (rate_limit = 0)

# Wait for the topic to create
skipif in-memory
sleep 5s

statement ok
create source kafka_source (v1 int) with (
connector = 'kafka',
topic = 'kafka_source',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Check data

skipif in-memory
sleep 3s

############## Create MV on source

statement ok
SET STREAMING_RATE_LIMIT=0;

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;

statement ok
create materialized view rl_mv2 as select count(*) from kafka_source;

statement ok
create materialized view rl_mv3 as select count(*) from kafka_source;

statement ok
SET STREAMING_RATE_LIMIT=default;

############## MVs should have 0 records, since source has (rate_limit = 0)

statement ok
flush;

query I
select * from rl_mv1;
----
0

query I
select * from rl_mv2;
----
0

query I
select * from rl_mv3;
----
0

############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
alter source kafka_source set streaming_rate_limit to 1000;

skipif in-memory
query I
alter source kafka_source set streaming_rate_limit to default;

skipif in-memory
sleep 3s

skipif in-memory
query I
select count(*) > 0 from rl_mv1;
----
t

skipif in-memory
query I
select count(*) > 0 from rl_mv2;
----
t

skipif in-memory
query I
select count(*) > 0 from rl_mv3;
----
t

############## Cleanup

statement ok
drop materialized view rl_mv1;

statement ok
drop materialized view rl_mv2;

statement ok
drop materialized view rl_mv3;

statement ok
drop source kafka_source;

statement ok
drop sink kafka_sink;

statement ok
drop table kafka_seed_data;
88 changes: 88 additions & 0 deletions e2e_test/source/basic/alter/rate_limit_table_kafka.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

############## Sink into kafka

statement ok
create sink kafka_sink
from
kafka_seed_data with (
properties.bootstrap.server = 'message_queue:29092',
topic = 'kafka_source',
type = 'append-only',
force_append_only='true',
connector = 'kafka'
);

############## Source from kafka (rate_limit = 0)

statement ok
create table kafka_source (v1 int) with (
connector = 'kafka',
topic = 'kafka_source',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
streaming_rate_limit = 0
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Check data

skipif in-memory
sleep 3s

skipif in-memory
query I
select count(*) from kafka_source;
----
0

############## Can still insert data when rate limit = 0

statement ok
insert into kafka_source values(1);

statement ok
flush;

query I
select count(*) from kafka_source;
----
1

############## Alter source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
alter table kafka_source set streaming_rate_limit to 1000;

skipif in-memory
query I
alter table kafka_source set streaming_rate_limit to default;

skipif in-memory
sleep 3s

skipif in-memory
query I
select count(*) > 1 from kafka_source;
----
t

############## Cleanup

statement ok
drop table kafka_source;

statement ok
drop sink kafka_sink;

statement ok
drop table kafka_seed_data;
35 changes: 35 additions & 0 deletions e2e_test/streaming/rate_limit/alter_rate_limit.slt.ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- This test is ignored until alter mv rate limit is fully supported.

statement ok
CREATE TABLE t (v1 int);

statement ok
INSERT INTO t VALUES (1);

statement ok
flush;

statement ok
SET BACKGROUND_DDL=true;

statement ok
CREATE MATERIALIZED VIEW streaming_rate_limit_0 with ( streaming_rate_limit = 0 ) AS SELECT * FROM t;

skipif in-memory
sleep 1s

query I
select progress from rw_ddl_progress;
----
0.00%

statement ok
ALTER MATERIALIZED VIEW streaming_rate_limit_0 SET STREAMING_RATE_LIMIT = 1;

statement ok
wait;

query I
select * from streaming_rate_limit_0;
----
1
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ enum ThrottleTarget {
THROTTLE_TARGET_UNSPECIFIED = 0;
SOURCE = 1;
MV = 2;
TABLE_WITH_SOURCE = 3;
}

message ApplyThrottleRequest {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ impl Catalog {
))
}

/// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
pub fn get_table_by_name<'a>(
&self,
db_name: &str,
Expand Down
89 changes: 89 additions & 0 deletions src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_pb::meta::PbThrottleTarget;
use risingwave_sqlparser::ast::ObjectName;

use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result};
use crate::Binder;

pub async fn handle_alter_streaming_rate_limit(
handler_args: HandlerArgs,
kind: PbThrottleTarget,
table_name: ObjectName,
rate_limit: i32,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let (stmt_type, id) = match kind {
PbThrottleTarget::Mv => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_table_by_name(db_name, schema_path, &real_table_name)?;
if table.table_type != TableType::MaterializedView {
return Err(ErrorCode::InvalidInputSyntax(format!(
"\"{table_name}\" is not a materialized view",
))
.into());
}
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_MATERIALIZED_VIEW, table.id.table_id)
}
PbThrottleTarget::Source => {
let reader = session.env().catalog_reader().read_guard();
let (source, schema_name) =
reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
session.check_privilege_for_drop_alter(schema_name, &**source)?;
(StatementType::ALTER_SOURCE, source.id)
}
PbThrottleTarget::TableWithSource => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_table_by_name(db_name, schema_path, &real_table_name)?;
session.check_privilege_for_drop_alter(schema_name, &**table)?;
// Get the corresponding source catalog.
let source_id = if let Some(id) = table.associated_source_id {
id.table_id()
} else {
bail!("ALTER STREAMING_RATE_LIMIT is not for table without source")
};
(StatementType::ALTER_SOURCE, source_id)
}
_ => bail!("Unsupported throttle target: {:?}", kind),
};

let meta_client = session.env().meta_client();

let rate_limit = if rate_limit < 0 {
None
} else {
Some(rate_limit as u32)
};

meta_client.apply_throttle(kind, id, rate_limit).await?;

Ok(PgResponse::empty_result(stmt_type))
}
Loading

0 comments on commit 03e6283

Please sign in to comment.