Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support sink rate limit for external sink #19660

Merged
merged 13 commits into from
Dec 23, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ ${EXTRA_ARGS:-} \
2> $LOGDIR/recovery-background-ddl-{}.log && rm $LOGDIR/recovery-background-ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, ddl"
seq "$TEST_NUM" | parallel 'MADSIM_TEST_SEED={} ./risingwave_simulation \
seq "$TEST_NUM" | parallel --tmpdir .risingwave 'MADSIM_TEST_SEED={} ./risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
Expand Down
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ user server_encoding
user server_version
user server_version_num
user sink_decouple
user sink_rate_limit
user source_rate_limit
user standard_conforming_strings
user statement_timeout
Expand Down
49 changes: 49 additions & 0 deletions e2e_test/sink/rate_limit.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
statement ok
CREATE TABLE t1(v1 int, v2 int);

statement ok
CREATE SINK s1 AS select * from t1 WITH (
connector = 'blackhole'
);

statement ok
SET SINK_RATE_LIMIT TO 100;

statement ok
SET SINK_RATE_LIMIT TO 0;

statement ok
SET SINK_RATE_LIMIT TO default;

statement ok
ALTER SINK s1 SET SINK_RATE_LIMIT = 1000;

statement ok
ALTER SINK s1 SET SINK_RATE_LIMIT = 0;

statement ok
ALTER SINK s1 SET SINK_RATE_LIMIT = default;

statement ok
create table t2 (v1 int primary key, v2 int);

statement ok
create sink s2 into t2 as select v1, v2 from t1;

statement error
ALTER SINK s2 SET SINK_RATE_LIMIT = 0;

statement ok
DROP SINK s2;

statement ok
DROP SINK s1;

statement ok
DROP TABLE t2;

statement ok
DROP TABLE t1;

statement ok
FLUSH;
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ enum ThrottleTarget {
TABLE_WITH_SOURCE = 3;
CDC_TABLE = 4;
TABLE_DML = 5;
SINK = 6;
}

message ApplyThrottleRequest {
Expand Down
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ message SinkNode {
// A sink with a kv log store should have a table.
catalog.Table table = 2;
SinkLogStoreType log_store_type = 3;
optional uint32 rate_limit = 4;
}

message ProjectNode {
Expand Down
19 changes: 19 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,25 @@ impl StreamChunk {
data: self.data.with_visibility(vis),
}
}

// Derive the chunk permits based on the provided rate limit
pub fn compute_rate_limit_chunk_permits(&self, limit: usize) -> usize {
let chunk_size = self.capacity();
let ends_with_update = if chunk_size >= 2 {
// Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`.
// If something inconsistent happens in the stream, we may not have `U+` after this `U-`.
self.ops()[chunk_size - 2].is_update_delete()
} else {
false
};
if chunk_size == limit + 1 && ends_with_update {
// If the chunk size exceed limit because of the last `Update` operation,
// we should minus 1 to make sure the permits consumed is within the limit (max burst).
chunk_size - 1
} else {
chunk_size
}
}
}

impl Deref for StreamChunk {
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
const DISABLE_DML_RATE_LIMIT: i32 = -1;
const DISABLE_SINK_RATE_LIMIT: i32 = -1;

/// Default to bypass cluster limits iff in debug mode.
const BYPASS_CLUSTER_LIMITS: bool = cfg!(debug_assertions);
Expand Down Expand Up @@ -289,6 +290,12 @@ pub struct SessionConfig {
#[parameter(default = DISABLE_DML_RATE_LIMIT)]
dml_rate_limit: i32,

/// Set sink rate limit (rows per second) for each parallelism for external sink.
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the sink.
#[parameter(default = DISABLE_SINK_RATE_LIMIT)]
sink_rate_limit: i32,

/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
#[serde_as(as = "DisplayFromStr")]
Expand Down
6 changes: 6 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ csv = "1.3"
deltalake = { workspace = true }
duration-str = "0.11.2"
easy-ext = "1"
either = "1"
elasticsearch = { version = "8.15.0-alpha.1", features = ["rustls-tls"] }
enum-as-inner = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand All @@ -60,6 +61,11 @@ google-cloud-bigquery = { version = "0.13.0", features = ["auth"] }
google-cloud-gax = "0.19.0"
google-cloud-googleapis = { version = "0.15", features = ["pubsub", "bigquery"] }
google-cloud-pubsub = "0.29"
governor = { version = "0.6", default-features = false, features = [
"std",
"dashmap",
"jitter",
] }
http = "0.2"
iceberg = { workspace = true }
iceberg-catalog-glue = { workspace = true }
Expand Down
Loading
Loading