Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): rate limit for cdc backfill #17989

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions e2e_test/source/cdc_inline/alter/cdc_backfill_rate_limit.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
control substitution on

# mysql env vars will be read from the `.risingwave/config/risedev-env` file

system ok
mysql -e "
SET GLOBAL time_zone = '+00:00';
"

system ok
mysql -e "
DROP DATABASE IF EXISTS testdb2;
CREATE DATABASE testdb2;
USE testdb2;
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
"

statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST}',
port = '${MYSQL_TCP_PORT}',
username = 'root',
password = '${MYSQL_PWD}',
database.name = 'testdb2',
server.id = '5185'
);

# backfill rate limit to zero
statement ok
set backfill_rate_limit=0;

statement ok
create table my_orders (
order_id int,
order_date timestamp,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) from mysql_source table 'testdb2.orders';

sleep 3s

query I
select count(*) from my_orders;
----
0

# alter rate limit
statement ok
ALTER TABLE my_orders SET backfill_rate_limit = 1000;

# wait alter ddl
sleep 3s

query I
select count(*) from my_orders;
----
3

statement ok
drop table my_orders;

statement ok
drop source mysql_source cascade;
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ enum ThrottleTarget {
SOURCE = 1;
MV = 2;
TABLE_WITH_SOURCE = 3;
CDC_TABLE = 4;
}

message ApplyThrottleRequest {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
}
};
if !inited {
bail!("failed to start cdc connector");
bail!("failed to start cdc connector.\nHINT: increase `cdc_source_wait_streaming_start_timeout` session variable to a large value and retry.");
}
}
tracing::info!(?source_id, "cdc connector started");
Expand Down
15 changes: 14 additions & 1 deletion src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_pb::meta::PbThrottleTarget;
use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
use risingwave_sqlparser::ast::ObjectName;

use super::{HandlerArgs, RwPgResponse};
Expand Down Expand Up @@ -72,6 +72,19 @@ pub async fn handle_alter_streaming_rate_limit(
};
(StatementType::ALTER_SOURCE, source_id)
}
PbThrottleTarget::CdcTable => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
if table.table_type != TableType::Table {
return Err(ErrorCode::InvalidInputSyntax(format!(
"\"{table_name}\" is not a table",
))
.into());
}
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_TABLE, table.id.table_id)
}
_ => bail!("Unsupported throttle target: {:?}", kind),
};

Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,18 @@ pub async fn handle(
)
.await
}
Statement::AlterTable {
name,
operation: AlterTableOperation::SetBackfillRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::CdcTable,
name,
rate_limit,
)
.await
}
Statement::AlterIndex {
name,
operation: AlterIndexOperation::RenameIndex { index_name },
Expand Down
5 changes: 5 additions & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ impl StreamManagerService for StreamServiceImpl {
.update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::CdcTable => {
self.metadata_manager
.update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::Unspecified => {
return Err(Status::invalid_argument("unspecified throttle target"))
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,10 @@ impl CatalogController {
|| (*fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0)
{
visit_stream_node(stream_node, |node| match node {
PbNodeBody::StreamCdcScan(node) => {
node.rate_limit = rate_limit;
found = true;
}
PbNodeBody::StreamScan(node) => {
node.rate_limit = rate_limit;
found = true;
Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,11 @@ impl FragmentManager {
for actor in &mut fragment.actors {
if let Some(node) = actor.nodes.as_mut() {
visit_stream_node(node, |node_body| match node_body {
// rate limit for cdc backfill
NodeBody::StreamCdcScan(ref mut node) => {
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
node.rate_limit = rate_limit;
actor_to_apply.push(actor.actor_id);
}
NodeBody::StreamScan(ref mut node) => {
node.rate_limit = rate_limit;
actor_to_apply.push(actor.actor_id);
Expand Down
7 changes: 7 additions & 0 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ pub enum AlterTableOperation {
SetSourceRateLimit {
rate_limit: i32,
},
/// SET BACKFILL_RATE_LIMIT TO <rate_limit>
SetBackfillRateLimit {
rate_limit: i32,
},
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -293,6 +297,9 @@ impl fmt::Display for AlterTableOperation {
AlterTableOperation::SetSourceRateLimit { rate_limit } => {
write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
}
AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3092,6 +3092,8 @@ impl Parser<'_> {
}
} else if let Some(rate_limit) = self.parse_alter_source_rate_limit(true)? {
AlterTableOperation::SetSourceRateLimit { rate_limit }
} else if let Some(rate_limit) = self.parse_alter_backfill_rate_limit()? {
AlterTableOperation::SetBackfillRateLimit { rate_limit }
} else {
return self.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT after SET");
}
Expand Down
21 changes: 20 additions & 1 deletion src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let first_barrier = expect_first_barrier(&mut upstream).await?;

let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);

// Check whether this parallelism has been assigned splits,
// if not, we should bypass the backfill directly.
Expand Down Expand Up @@ -343,7 +344,23 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
&& *new_rate_limit != self.rate_limit_rps
{
self.rate_limit_rps = *new_rate_limit;
// rebuild the new reader stream with new rate limit
rate_limit_to_zero = self
.rate_limit_rps
.is_some_and(|val| val == 0);

// update and persist current backfill progress without draining the buffered upstream chunks
state_impl
.mutate_state(
current_pk_pos.clone(),
last_binlog_offset.clone(),
total_snapshot_row_count,
false,
)
.await?;
state_impl.commit_state(barrier.epoch).await?;
yield Message::Barrier(barrier);

// rebuild the snapshot stream with new rate limit
continue 'backfill_loop;
}
}
Expand Down Expand Up @@ -497,7 +514,9 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// It maybe a cancellation bug of the mysql driver.
let (_, mut snapshot_stream) = backfill_stream.into_inner();

// skip consume the snapshot stream if it is paused or rate limit to 0
if !is_snapshot_paused
&& !rate_limit_to_zero
&& let Some(msg) = snapshot_stream
.next()
.instrument_await("consume_snapshot_stream_once")
Expand Down
Loading