Skip to content

Commit

Permalink
rate limit for cdc backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Aug 9, 2024
1 parent fbe1a04 commit 00d066d
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 1 deletion.
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ enum ThrottleTarget {
SOURCE = 1;
MV = 2;
TABLE_WITH_SOURCE = 3;
CDC_TABLE = 4;
}

message ApplyThrottleRequest {
Expand Down
15 changes: 14 additions & 1 deletion src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_pb::meta::PbThrottleTarget;
use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
use risingwave_sqlparser::ast::ObjectName;

use super::{HandlerArgs, RwPgResponse};
Expand Down Expand Up @@ -72,6 +72,19 @@ pub async fn handle_alter_streaming_rate_limit(
};
(StatementType::ALTER_SOURCE, source_id)
}
PbThrottleTarget::CdcTable => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
if table.table_type != TableType::Table {
return Err(ErrorCode::InvalidInputSyntax(format!(
"\"{table_name}\" is not a table",
))
.into());
}
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_TABLE, table.id.table_id)
}
_ => bail!("Unsupported throttle target: {:?}", kind),
};

Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,18 @@ pub async fn handle(
)
.await
}
Statement::AlterTable {
name,
operation: AlterTableOperation::SetBackfillRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::CdcTable,
name,
rate_limit,
)
.await
}
Statement::AlterIndex {
name,
operation: AlterIndexOperation::RenameIndex { index_name },
Expand Down
5 changes: 5 additions & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ impl StreamManagerService for StreamServiceImpl {
.update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::CdcTable => {
self.metadata_manager
.update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::Unspecified => {
return Err(Status::invalid_argument("unspecified throttle target"))
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,10 @@ impl CatalogController {
|| (*fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0)
{
visit_stream_node(stream_node, |node| match node {
PbNodeBody::StreamCdcScan(node) => {
node.rate_limit = rate_limit;
found = true;
}
PbNodeBody::StreamScan(node) => {
node.rate_limit = rate_limit;
found = true;
Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,11 @@ impl FragmentManager {
for actor in &mut fragment.actors {
if let Some(node) = actor.nodes.as_mut() {
visit_stream_node(node, |node_body| match node_body {
// rate limit for cdc backfill
NodeBody::StreamCdcScan(ref mut node) => {
node.rate_limit = rate_limit;
actor_to_apply.push(actor.actor_id);
}
NodeBody::StreamScan(ref mut node) => {
node.rate_limit = rate_limit;
actor_to_apply.push(actor.actor_id);
Expand Down
7 changes: 7 additions & 0 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ pub enum AlterTableOperation {
SetSourceRateLimit {
rate_limit: i32,
},
/// SET BACKFILL_RATE_LIMIT TO <rate_limit>
SetBackfillRateLimit {
rate_limit: i32,
},
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -293,6 +297,9 @@ impl fmt::Display for AlterTableOperation {
AlterTableOperation::SetSourceRateLimit { rate_limit } => {
write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
}
AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3092,6 +3092,8 @@ impl Parser<'_> {
}
} else if let Some(rate_limit) = self.parse_alter_source_rate_limit(true)? {
AlterTableOperation::SetSourceRateLimit { rate_limit }
} else if let Some(rate_limit) = self.parse_alter_backfill_rate_limit()? {
AlterTableOperation::SetBackfillRateLimit { rate_limit }
} else {
return self.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT after SET");
}
Expand Down

0 comments on commit 00d066d

Please sign in to comment.