Skip to content

Commit

Permalink
rename source
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 29, 2024
1 parent 717e5dc commit 160924c
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 21 deletions.
6 changes: 3 additions & 3 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
// NOTE(kwannoel): We declare it separately as a constant,
// otherwise seems like it can't infer the type of -1 when written inline.
const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
const DISABLE_STREAMING_RATE_LIMIT: i32 = -1;
const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;

#[serde_as]
/// This is the Session Config of RisingWave.
Expand Down Expand Up @@ -263,8 +263,8 @@ pub struct SessionConfig {
/// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the snapshot read / source read.
#[parameter(default = DISABLE_STREAMING_RATE_LIMIT)]
streaming_rate_limit: i32,
#[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
source_rate_limit: i32,

/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub async fn handle_alter_streaming_rate_limit(
let source_id = if let Some(id) = table.associated_source_id {
id.table_id()
} else {
bail!("ALTER STREAMING_RATE_LIMIT is not for table without source")
bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
};
(StatementType::ALTER_SOURCE, source_id)
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ pub async fn handle(
} => alter_table_with_sr::handle_refresh_schema(handler_args, name).await,
Statement::AlterTable {
name,
operation: AlterTableOperation::SetStreamingRateLimit { rate_limit },
operation: AlterTableOperation::SetSourceRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl StreamNode for StreamFsFetch {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().source_rate_limit,
secret_refs,
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl StreamNode for StreamSource {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().source_rate_limit,
secret_refs,
}
});
Expand Down
12 changes: 6 additions & 6 deletions src/frontend/src/utils/overwrite_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ use crate::handler::HandlerArgs;

#[derive(Debug, Clone, Default)]
pub struct OverwriteOptions {
pub streaming_rate_limit: Option<u32>,
pub source_rate_limit: Option<u32>,
pub backfill_rate_limit: Option<u32>,
}

impl OverwriteOptions {
pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit";
pub(crate) const STREAMING_RATE_LIMIT_KEY: &'static str = "streaming_rate_limit";
pub(crate) const SOURCE_RATE_LIMIT_KEY: &'static str = "source_rate_limit";

pub fn new(args: &mut HandlerArgs) -> Self {
let streaming_rate_limit = {
if let Some(x) = args.with_options.remove(Self::STREAMING_RATE_LIMIT_KEY) {
let source_rate_limit = {
if let Some(x) = args.with_options.remove(Self::SOURCE_RATE_LIMIT_KEY) {
// FIXME(tabVersion): validate the value
Some(x.parse::<u32>().unwrap())
} else {
let rate_limit = args.session.config().streaming_rate_limit();
let rate_limit = args.session.config().source_rate_limit();
if rate_limit < 0 {
None
} else {
Expand All @@ -52,7 +52,7 @@ impl OverwriteOptions {
}
};
Self {
streaming_rate_limit,
source_rate_limit,
backfill_rate_limit,
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ impl WithOptions {
.inner
.into_iter()
.filter(|(key, _)| {
key != OverwriteOptions::STREAMING_RATE_LIMIT_KEY
&& key != options::RETENTION_SECONDS
key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
})
.collect();

Expand Down
8 changes: 4 additions & 4 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ pub enum AlterTableOperation {
deferred: bool,
},
RefreshSchema,
/// `SET STREAMING_RATE_LIMIT TO <rate_limit>`
SetStreamingRateLimit {
/// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
SetSourceRateLimit {
rate_limit: i32,
},
}
Expand Down Expand Up @@ -290,8 +290,8 @@ impl fmt::Display for AlterTableOperation {
AlterTableOperation::RefreshSchema => {
write!(f, "REFRESH SCHEMA")
}
AlterTableOperation::SetStreamingRateLimit { rate_limit } => {
write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit)
AlterTableOperation::SetSourceRateLimit { rate_limit } => {
write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3089,7 +3089,7 @@ impl Parser<'_> {
deferred,
}
} else if let Some(rate_limit) = self.parse_alter_source_rate_limit(true)? {
AlterTableOperation::SetStreamingRateLimit { rate_limit }
AlterTableOperation::SetSourceRateLimit { rate_limit }
} else {
return self.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT after SET");
}
Expand Down Expand Up @@ -3402,7 +3402,7 @@ impl Parser<'_> {
AlterSourceOperation::RefreshSchema
} else {
return self.expected(
"RENAME, ADD COLUMN, OWNER TO, SET or STREAMING_RATE_LIMIT after ALTER SOURCE",
"RENAME, ADD COLUMN, OWNER TO, SET or SOURCE_RATE_LIMIT after ALTER SOURCE",
);
};

Expand Down

0 comments on commit 160924c

Please sign in to comment.