Skip to content

Commit

Permalink
feat(frontend): change streaming_rate_limit=0 to pause stream inste…
Browse files Browse the repository at this point in the history
…ad of disable rate limit (#16333)
  • Loading branch information
kwannoel authored Apr 18, 2024
1 parent 144986e commit ebf0104
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 10 deletions.
2 changes: 1 addition & 1 deletion e2e_test/udf/always_retry_python.slt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ SELECT count(*) > 0 FROM mv_no_retry where s1 is NULL;
t

statement ok
SET STREAMING_RATE_LIMIT=0;
SET STREAMING_RATE_LIMIT TO DEFAULT;

statement ok
SET BACKGROUND_DDL=false;
Expand Down
13 changes: 9 additions & 4 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub enum SessionConfigError {

type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;

// NOTE(kwannoel): We declare it separately as a constant,
// otherwise seems like it can't infer the type of -1 when written inline.
const DISABLE_STREAMING_RATE_LIMIT: i32 = -1;

#[serde_as]
/// This is the Session Config of RisingWave.
#[derive(Clone, Debug, Deserialize, Serialize, SessionConfig, ConfigDoc, PartialEq)]
Expand Down Expand Up @@ -241,10 +245,11 @@ pub struct SessionConfig {
#[parameter(default = STANDARD_CONFORMING_STRINGS)]
standard_conforming_strings: String,

/// Set streaming rate limit (rows per second) for each parallelism for mv backfilling
#[serde_as(as = "DisplayFromStr")]
#[parameter(default = ConfigNonZeroU64::default())]
streaming_rate_limit: ConfigNonZeroU64,
/// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the snapshot read / source read.
#[parameter(default = DISABLE_STREAMING_RATE_LIMIT)]
streaming_rate_limit: i32,

/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/src/utils/overwrite_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl OverwriteOptions {

pub fn new(args: &mut HandlerArgs) -> Self {
let streaming_rate_limit = {
// CREATE MATERIALIZED VIEW m1 WITH (rate_limit = N) ...
if let Some(x) = args
.with_options
.inner_mut()
Expand All @@ -32,10 +33,12 @@ impl OverwriteOptions {
// FIXME(tabVersion): validate the value
Some(x.parse::<u32>().unwrap())
} else {
args.session
.config()
.streaming_rate_limit()
.map(|limit| limit.get() as u32)
let rate_limit = args.session.config().streaming_rate_limit();
if rate_limit < 0 {
None
} else {
Some(rate_limit as u32)
}
}
};
Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const SEED_TABLE_100: &str = "INSERT INTO t SELECT generate_series FROM generate
const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;";
const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;";
const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;";
const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=0;";
const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=DEFAULT;";
const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";
const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;";
const WAIT: &str = "WAIT;";
Expand Down

0 comments on commit ebf0104

Please sign in to comment.