diff --git a/Cargo.lock b/Cargo.lock index 368de65395b56..cd2d38f532fd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11417,6 +11417,7 @@ dependencies = [ "arrow-udf-wasm", "async-trait", "auto_enums", + "bytes", "chrono", "chrono-tz 0.9.0", "criterion", diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 1f264957f3953..17dc713818ff0 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -881,24 +881,25 @@ steps: timeout_in_minutes: 10 retry: *auto-retry - - label: "end-to-end cassandra sink test" - key: "e2e-cassandra-sink-tests" - command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release" - if: | - !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - || build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests" - || build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/ - depends_on: - - "build" - - "build-other" - plugins: - - docker-compose#v5.1.0: - run: sink-test-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 10 - retry: *auto-retry +# FIXME(xxhZs): https://github.com/risingwavelabs/risingwave/issues/17855 +# - label: "end-to-end cassandra sink test" +# key: "e2e-cassandra-sink-tests" +# command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release" +# if: | +# !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null +# || build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests" +# || build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/ +# depends_on: +# - "build" +# - "build-other" +# plugins: +# - docker-compose#v5.1.0: +# run: sink-test-env +# config: ci/docker-compose.yml +# mount-buildkite-agent: true +# - ./ci/plugins/upload-failure-logs +# timeout_in_minutes: 10 +# retry: *auto-retry - label: "end-to-end clickhouse sink test" key: "e2e-clickhouse-sink-tests" diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 929dd23c2b65f..9f1f3652acaee 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -59,6 +59,7 @@ services: # If ENABLE_TELEMETRY is not set, telemetry will start by default ENABLE_TELEMETRY: ${ENABLE_TELEMETRY:-true} RW_TELEMETRY_TYPE: ${RW_TELEMETRY_TYPE:-"docker-compose"} + RW_SECRET_STORE_PRIVATE_KEY_HEX: ${RW_SECRET_STORE_PRIVATE_KEY_HEX:-0123456789abcdef} container_name: risingwave-standalone healthcheck: test: diff --git a/e2e_test/backfill/sink/create_sink.slt b/e2e_test/backfill/sink/create_sink.slt index 016e3bcb2049b..1eea929f3d50b 100644 --- a/e2e_test/backfill/sink/create_sink.slt +++ b/e2e_test/backfill/sink/create_sink.slt @@ -5,7 +5,7 @@ statement ok create table t (v1 int); statement ok -SET STREAMING_RATE_LIMIT = 500; +SET BACKFILL_RATE_LIMIT = 500; # Should finish in 20s statement ok diff --git a/e2e_test/backfill/sink/different_pk_and_dist_key.slt b/e2e_test/backfill/sink/different_pk_and_dist_key.slt index bc8256b28e62a..41c99f315b153 100644 --- a/e2e_test/backfill/sink/different_pk_and_dist_key.slt +++ b/e2e_test/backfill/sink/different_pk_and_dist_key.slt @@ -18,7 +18,7 @@ statement ok create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1); statement ok -set streaming_rate_limit = 1; +set backfill_rate_limit = 1; statement ok set background_ddl = true; diff --git a/e2e_test/background_ddl/basic.slt b/e2e_test/background_ddl/basic.slt index b0be505eb1dd9..3c98b6943610e 100644 --- a/e2e_test/background_ddl/basic.slt +++ b/e2e_test/background_ddl/basic.slt @@ -14,7 +14,7 @@ statement ok FLUSH; statement ok -SET STREAMING_RATE_LIMIT=10000; +SET BACKFILL_RATE_LIMIT=10000; statement ok CREATE MATERIALIZED VIEW m1 as SELECT * FROM t; diff --git a/e2e_test/background_ddl/sim/basic.slt b/e2e_test/background_ddl/sim/basic.slt index 35f5814fe8b4f..40cb606410130 100644 --- a/e2e_test/background_ddl/sim/basic.slt +++ b/e2e_test/background_ddl/sim/basic.slt @@ -14,7 +14,7 @@ statement ok FLUSH; statement ok -SET STREAMING_RATE_LIMIT=4000; +SET BACKFILL_RATE_LIMIT=4000; statement ok CREATE MATERIALIZED VIEW m1 as SELECT * FROM t; diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index f405cc71c2c0d..5744cd0362cbf 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -17,6 +17,7 @@ postmaster license_key postmaster max_concurrent_creating_streaming_jobs postmaster pause_on_next_bootstrap user application_name +user backfill_rate_limit user background_ddl user batch_enable_distributed_dml user batch_parallelism @@ -52,10 +53,10 @@ user server_encoding user server_version user server_version_num user sink_decouple +user source_rate_limit user standard_conforming_strings user statement_timeout user streaming_parallelism -user streaming_rate_limit user streaming_use_arrangement_backfill user synchronize_seqscans user timezone diff --git a/e2e_test/ddl/drop/drop_creating_mv.slt b/e2e_test/ddl/drop/drop_creating_mv.slt index 621ac216d4ec0..e9d8423f3cdec 100644 --- a/e2e_test/ddl/drop/drop_creating_mv.slt +++ b/e2e_test/ddl/drop/drop_creating_mv.slt @@ -8,7 +8,7 @@ statement ok flush; statement ok -set streaming_rate_limit=1; +set backfill_rate_limit=1; ############## Test drop foreground mv onlyif can-use-recover @@ -61,7 +61,7 @@ drop materialized view m1; ############## Make sure the mv can still be successfully created later. statement ok -set streaming_rate_limit=default; +set backfill_rate_limit=default; statement ok set background_ddl=false; diff --git a/e2e_test/ddl/secret.slt b/e2e_test/ddl/secret.slt index 7c11e2e6245a3..62a5a09add668 100644 --- a/e2e_test/ddl/secret.slt +++ b/e2e_test/ddl/secret.slt @@ -1,3 +1,33 @@ +statement ok +ALTER SYSTEM SET license_key TO ''; + +statement error +create secret secret_1 with ( + backend = 'fake-backend' +) as 'demo_secret'; +---- +db error: ERROR: Failed to run the query + +Caused by: + feature SecretManagement is only available for tier Paid and above, while the current tier is Free + +Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command. + + +statement error +drop secret secret_1; +---- +db error: ERROR: Failed to run the query + +Caused by: + feature SecretManagement is only available for tier Paid and above, while the current tier is Free + +Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command. + + +statement ok +ALTER SYSTEM SET license_key TO DEFAULT; + statement error secret backend "fake-backend" is not supported create secret secret_1 with ( backend = 'fake-backend' diff --git a/e2e_test/ddl/throttle.slt b/e2e_test/ddl/throttle.slt index 9b6c2f053bf63..e8a33b0bfb433 100644 --- a/e2e_test/ddl/throttle.slt +++ b/e2e_test/ddl/throttle.slt @@ -1,20 +1,22 @@ -# streaming_rate_limit also applies to create sink and create source, please refer to -# e2e_test/source/basic/kafka.slt and e2e_test/sink/kafka/create_sink.slt for this part +# streaming_rate_limit applies to create source, please refer to +# e2e_test/source/basic/kafka.slt. +# backfill_rate_limit applies to create sink, please refer to +# e2e_test/sink/kafka/create_sink.slt. statement ok create table t1 (v1 int); # tracked in https://github.com/risingwavelabs/risingwave/issues/13474 -# create with duplicate streaming_rate_limit +# create with duplicate backfill_rate_limit statement error Duplicated option -create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1; +create materialized view mv1 with (backfill_rate_limit = 1000, backfill_rate_limit = 2000) as select * from t1; # create with unknown fields statement error unexpected options in WITH clause -create materialized view mv1 with (streaming_rate_limit = 1000, unknown_field = 2000) as select * from t1; +create materialized view mv1 with (backfill_rate_limit = 1000, unknown_field = 2000) as select * from t1; statement ok -create materialized view mv1 with (streaming_rate_limit = 1000) as select * from t1; +create materialized view mv1 with (backfill_rate_limit = 1000) as select * from t1; statement ok drop materialized view mv1; diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index be4c39c7cb1c6..338465c471af9 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -153,7 +153,7 @@ create sink multiple_pk_throttle from t_kafka with ( topic = 'test-rw-sink-debezium', type = 'debezium', primary_key = 'id,v_varchar', - streaming_rate_limit = 200 + backfill_rate_limit = 200 ); statement ok @@ -165,7 +165,7 @@ create sink multiple_pk_throttle_1 topic = 'test-rw-sink-debezium', type = 'debezium', primary_key = 'id,v_varchar', - streaming_rate_limit = 200 + backfill_rate_limit = 200 ); statement ok diff --git a/e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt b/e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt index a2b1a6fc63130..adf133ff7f3e7 100644 --- a/e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt +++ b/e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt @@ -5,7 +5,7 @@ statement ok insert into t select 2 from generate_series(1, 1000000); statement ok -set streaming_rate_limit=1; +set backfill_rate_limit=1; statement ok set background_ddl=true; @@ -25,7 +25,7 @@ statement ok set background_ddl = false; statement ok -set streaming_rate_limit=default; +set backfill_rate_limit=default; statement ok flush; diff --git a/e2e_test/slow_tests/udf/always_retry_python.slt b/e2e_test/slow_tests/udf/always_retry_python.slt index 78bf926c32986..18184846f272a 100644 --- a/e2e_test/slow_tests/udf/always_retry_python.slt +++ b/e2e_test/slow_tests/udf/always_retry_python.slt @@ -21,7 +21,7 @@ statement ok flush; statement ok -SET STREAMING_RATE_LIMIT=1; +SET BACKFILL_RATE_LIMIT=1; statement ok SET BACKGROUND_DDL=true; @@ -57,7 +57,7 @@ SELECT count(*) FROM mv_always_retry where s1 is NULL; # t statement ok -SET STREAMING_RATE_LIMIT TO DEFAULT; +SET BACKFILL_RATE_LIMIT TO DEFAULT; statement ok SET BACKGROUND_DDL=false; diff --git a/e2e_test/source/basic/alter/rate_limit_source_kafka.slt b/e2e_test/source/basic/alter/rate_limit_source_kafka.slt index 7d991083345a1..e76cf72c0220a 100644 --- a/e2e_test/source/basic/alter/rate_limit_source_kafka.slt +++ b/e2e_test/source/basic/alter/rate_limit_source_kafka.slt @@ -44,7 +44,7 @@ sleep 3s ############## Create MV on source statement ok -SET STREAMING_RATE_LIMIT=0; +SET SOURCE_RATE_LIMIT=0; statement ok create materialized view rl_mv1 as select count(*) from kafka_source; @@ -56,7 +56,7 @@ statement ok create materialized view rl_mv3 as select count(*) from kafka_source; statement ok -SET STREAMING_RATE_LIMIT=default; +SET SOURCE_RATE_LIMIT=default; ############## MVs should have 0 records, since source has (rate_limit = 0) @@ -82,11 +82,11 @@ select * from rl_mv3; skipif in-memory query I -alter source kafka_source set streaming_rate_limit to 1000; +alter source kafka_source set source_rate_limit to 1000; skipif in-memory query I -alter source kafka_source set streaming_rate_limit to default; +alter source kafka_source set source_rate_limit to default; skipif in-memory sleep 3s diff --git a/e2e_test/source/basic/alter/rate_limit_table_kafka.slt b/e2e_test/source/basic/alter/rate_limit_table_kafka.slt index bf1fd6672d6ea..d93687799cac7 100644 --- a/e2e_test/source/basic/alter/rate_limit_table_kafka.slt +++ b/e2e_test/source/basic/alter/rate_limit_table_kafka.slt @@ -27,7 +27,7 @@ create table kafka_source (v1 int) with ( topic = 'kafka_source', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest', - streaming_rate_limit = 0 + source_rate_limit = 0 ) FORMAT PLAIN ENCODE JSON statement ok @@ -61,11 +61,11 @@ select count(*) from kafka_source; skipif in-memory query I -alter table kafka_source set streaming_rate_limit to 1000; +alter table kafka_source set source_rate_limit to 1000; skipif in-memory query I -alter table kafka_source set streaming_rate_limit to default; +alter table kafka_source set source_rate_limit to default; skipif in-memory sleep 3s diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index dee5de1cbb539..40e9b46036112 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -391,7 +391,7 @@ create table s29 (id bytea) with ( topic = 'kafka_source_format_bytes', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest', - streaming_rate_limit = 200 + source_rate_limit = 200 ) FORMAT PLAIN ENCODE BYTES statement ok diff --git a/e2e_test/streaming/aggregate/approx_percentile.slt b/e2e_test/streaming/aggregate/approx_percentile.slt new file mode 100644 index 0000000000000..efc377f8aed48 --- /dev/null +++ b/e2e_test/streaming/aggregate/approx_percentile.slt @@ -0,0 +1,104 @@ +# Single phase approx percentile +statement ok +create table t(p_col double, grp_col int); + +statement ok +insert into t select a, 1 from generate_series(-1000, 1000) t(a); + +statement ok +flush; + +query I + select + approx_percentile(0.01, 0.01) within group (order by p_col) as p01, + approx_percentile(0.1, 0.01) within group (order by p_col) as p10, + approx_percentile(0.5, 0.01) within group (order by p_col) as p50, + approx_percentile(0.9, 0.01) within group (order by p_col) as p90, + approx_percentile(0.99, 0.01) within group (order by p_col) as p99 + from t group by grp_col; +---- +-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152 + +query I +select + percentile_disc(0.01) within group (order by p_col) as p01, + percentile_disc(0.1) within group (order by p_col) as p10, + percentile_disc(0.5) within group (order by p_col) as p50, + percentile_disc(0.9) within group (order by p_col) as p90, + percentile_disc(0.99) within group (order by p_col) as p99 +from t group by grp_col; +---- +-980 -800 0 800 980 + +statement ok +create materialized view m1 as + select + approx_percentile(0.01, 0.01) within group (order by p_col) as p01, + approx_percentile(0.1, 0.01) within group (order by p_col) as p10, + approx_percentile(0.5, 0.01) within group (order by p_col) as p50, + approx_percentile(0.9, 0.01) within group (order by p_col) as p90, + approx_percentile(0.99, 0.01) within group (order by p_col) as p99 + from t group by grp_col; + +query I +select * from m1; +---- +-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152 + +# Test state encode / decode +onlyif can-use-recover +statement ok +recover; + +onlyif can-use-recover +sleep 10s + +query I +select * from m1; +---- +-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152 + +# Test state encode / decode +onlyif can-use-recover +statement ok +recover; + +onlyif can-use-recover +sleep 10s + +query I +select * from m1; +---- +-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152 + +# Test 0 self.from_int16_array(array.as_any().downcast_ref().unwrap()), Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()), Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()), + Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()), Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()), Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()), Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()), @@ -602,6 +603,13 @@ pub trait FromArrow { Ok(ArrayImpl::Int256(array.into())) } + fn from_decimal128_array( + &self, + array: &arrow_array::Decimal128Array, + ) -> Result { + Ok(ArrayImpl::Decimal(array.try_into()?)) + } + fn from_float32_array( &self, array: &arrow_array::Float32Array, diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 8fbfd2cd6e969..16cc337c6f46a 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -54,7 +54,8 @@ type SessionConfigResult = std::result::Result; // NOTE(kwannoel): We declare it separately as a constant, // otherwise seems like it can't infer the type of -1 when written inline. -const DISABLE_STREAMING_RATE_LIMIT: i32 = -1; +const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1; +const DISABLE_SOURCE_RATE_LIMIT: i32 = -1; #[serde_as] /// This is the Session Config of RisingWave. @@ -253,11 +254,17 @@ pub struct SessionConfig { #[parameter(default = STANDARD_CONFORMING_STRINGS)] standard_conforming_strings: String, + /// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling + /// If set to -1, disable rate limit. + /// If set to 0, this pauses the snapshot read / source read. + #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)] + backfill_rate_limit: i32, + /// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads. /// If set to -1, disable rate limit. /// If set to 0, this pauses the snapshot read / source read. - #[parameter(default = DISABLE_STREAMING_RATE_LIMIT)] - streaming_rate_limit: i32, + #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)] + source_rate_limit: i32, /// Cache policy for partition cache in streaming over window. /// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`". diff --git a/src/expr/core/src/aggregate/def.rs b/src/expr/core/src/aggregate/def.rs index d35077698da67..b050f8039e1c6 100644 --- a/src/expr/core/src/aggregate/def.rs +++ b/src/expr/core/src/aggregate/def.rs @@ -425,7 +425,8 @@ pub mod agg_kinds { | PbAggKind::BoolAnd | PbAggKind::BoolOr | PbAggKind::ApproxCountDistinct - | PbAggKind::InternalLastSeenValue, + | PbAggKind::InternalLastSeenValue + | PbAggKind::ApproxPercentile, ) | AggKind::UserDefined(_) }; } diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index 0f69c91e34162..bb97e63f795fb 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -36,6 +36,7 @@ arrow-udf-python = { workspace = true, optional = true } arrow-udf-wasm = { workspace = true, optional = true } async-trait = "0.1" auto_enums = { workspace = true } +bytes = "1" chrono = { version = "0.4", default-features = false, features = [ "clock", "std", diff --git a/src/expr/impl/src/aggregate/approx_percentile.rs b/src/expr/impl/src/aggregate/approx_percentile.rs index 7f9ae3fb4a539..720aaebeb5935 100644 --- a/src/expr/impl/src/aggregate/approx_percentile.rs +++ b/src/expr/impl/src/aggregate/approx_percentile.rs @@ -12,32 +12,108 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; +use std::mem::size_of; use std::ops::Range; +use bytes::{Buf, Bytes}; use risingwave_common::array::*; +use risingwave_common::row::Row; use risingwave_common::types::*; use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{AggCall, AggStateDyn, AggregateFunction, AggregateState}; use risingwave_expr::{build_aggregate, Result}; -#[build_aggregate("approx_percentile(float8) -> float8")] +/// TODO(kwannoel): for single phase agg, we can actually support `UDDSketch`. +/// For two phase agg, we still use `DDSketch`. +/// Then we also need to store the `relative_error` of the sketch, so we can report it +/// in an internal table, if it changes. +#[build_aggregate("approx_percentile(float8) -> float8", state = "bytea")] fn build(agg: &AggCall) -> Result> { - let fraction = agg.direct_args[0] + let quantile = agg.direct_args[0] .literal() - .map(|x| (*x.as_float64()).into()); - Ok(Box::new(ApproxPercentile { fraction })) + .map(|x| (*x.as_float64()).into()) + .unwrap(); + let relative_error: f64 = agg.direct_args[1] + .literal() + .map(|x| (*x.as_float64()).into()) + .unwrap(); + let base = (1.0 + relative_error) / (1.0 - relative_error); + Ok(Box::new(ApproxPercentile { quantile, base })) } #[allow(dead_code)] pub struct ApproxPercentile { - fraction: Option, + quantile: f64, + base: f64, } -#[derive(Debug, Default, EstimateSize)] -struct State(Vec); +type BucketCount = u64; +type BucketId = i32; +type Count = u64; + +#[derive(Debug, Default)] +struct State { + count: BucketCount, + pos_buckets: BTreeMap, + zeros: Count, + neg_buckets: BTreeMap, +} + +impl EstimateSize for State { + fn estimated_heap_size(&self) -> usize { + let count_size = size_of::(); + let pos_buckets_size = + self.pos_buckets.len() * (size_of::() + size_of::()); + let zero_bucket_size = size_of::(); + let neg_buckets_size = + self.neg_buckets.len() * (size_of::() + size_of::()); + count_size + pos_buckets_size + zero_bucket_size + neg_buckets_size + } +} impl AggStateDyn for State {} +impl ApproxPercentile { + fn add_datum(&self, state: &mut State, op: Op, datum: DatumRef<'_>) { + if let Some(value) = datum { + let prim_value = value.into_float64().into_inner(); + let (non_neg, abs_value) = if prim_value < 0.0 { + (false, -prim_value) + } else { + (true, prim_value) + }; + let bucket_id = abs_value.log(self.base).ceil() as BucketId; + match op { + Op::Delete | Op::UpdateDelete => { + if abs_value == 0.0 { + state.zeros -= 1; + } else if non_neg { + let count = state.pos_buckets.entry(bucket_id).or_insert(0); + *count -= 1; + } else { + let count = state.neg_buckets.entry(bucket_id).or_insert(0); + *count -= 1; + } + state.count -= 1; + } + Op::Insert | Op::UpdateInsert => { + if abs_value == 0.0 { + state.zeros += 1; + } else if non_neg { + let count = state.pos_buckets.entry(bucket_id).or_insert(0); + *count += 1; + } else { + let count = state.neg_buckets.entry(bucket_id).or_insert(0); + *count += 1; + } + state.count += 1; + } + } + }; + } +} + #[async_trait::async_trait] impl AggregateFunction for ApproxPercentile { fn return_type(&self) -> DataType { @@ -45,23 +121,101 @@ impl AggregateFunction for ApproxPercentile { } fn create_state(&self) -> Result { - todo!() + Ok(AggregateState::Any(Box::::default())) } - async fn update(&self, _state: &mut AggregateState, _input: &StreamChunk) -> Result<()> { - todo!() + async fn update(&self, state: &mut AggregateState, input: &StreamChunk) -> Result<()> { + let state: &mut State = state.downcast_mut(); + for (op, row) in input.rows() { + let datum = row.datum_at(0); + self.add_datum(state, op, datum); + } + Ok(()) } async fn update_range( &self, - _state: &mut AggregateState, - _input: &StreamChunk, - _range: Range, + state: &mut AggregateState, + input: &StreamChunk, + range: Range, ) -> Result<()> { - todo!() + let state = state.downcast_mut(); + for (op, row) in input.rows_in(range) { + self.add_datum(state, op, row.datum_at(0)); + } + Ok(()) + } + + // TODO(kwannoel): Instead of iterating over all buckets, we can maintain the + // approximate quantile bucket on the fly. + async fn get_result(&self, state: &AggregateState) -> Result { + let state = state.downcast_ref::(); + let quantile_count = (state.count as f64 * self.quantile) as u64; + let mut acc_count = 0; + for (bucket_id, count) in state.neg_buckets.iter().rev() { + acc_count += count; + if acc_count > quantile_count { + // approx value = -2 * y^i / (y + 1) + let approx_percentile = -2.0 * self.base.powi(*bucket_id) / (self.base + 1.0); + let approx_percentile = ScalarImpl::Float64(approx_percentile.into()); + return Ok(Datum::from(approx_percentile)); + } + } + acc_count += state.zeros; + if acc_count > quantile_count { + return Ok(Datum::from(ScalarImpl::Float64(0.0.into()))); + } + for (bucket_id, count) in &state.pos_buckets { + acc_count += count; + if acc_count > quantile_count { + // approx value = 2 * y^i / (y + 1) + let approx_percentile = 2.0 * self.base.powi(*bucket_id) / (self.base + 1.0); + let approx_percentile = ScalarImpl::Float64(approx_percentile.into()); + return Ok(Datum::from(approx_percentile)); + } + } + return Ok(None); + } + + fn encode_state(&self, state: &AggregateState) -> Result { + let state = state.downcast_ref::(); + let mut encoded_state = Vec::with_capacity(state.estimated_heap_size()); + encoded_state.extend_from_slice(&state.count.to_be_bytes()); + encoded_state.extend_from_slice(&state.zeros.to_be_bytes()); + let neg_buckets_size = state.neg_buckets.len() as u64; + encoded_state.extend_from_slice(&neg_buckets_size.to_be_bytes()); + for (bucket_id, count) in &state.neg_buckets { + encoded_state.extend_from_slice(&bucket_id.to_be_bytes()); + encoded_state.extend_from_slice(&count.to_be_bytes()); + } + for (bucket_id, count) in &state.pos_buckets { + encoded_state.extend_from_slice(&bucket_id.to_be_bytes()); + encoded_state.extend_from_slice(&count.to_be_bytes()); + } + let encoded_scalar = ScalarImpl::Bytea(encoded_state.into()); + Ok(Datum::from(encoded_scalar)) } - async fn get_result(&self, _state: &AggregateState) -> Result { - todo!() + fn decode_state(&self, datum: Datum) -> Result { + let mut state = State::default(); + let Some(scalar_state) = datum else { + return Ok(AggregateState::Any(Box::new(state))); + }; + let encoded_state: Box<[u8]> = scalar_state.into_bytea(); + let mut buf = Bytes::from(encoded_state); + state.count = buf.get_u64(); + state.zeros = buf.get_u64(); + let neg_buckets_size = buf.get_u64(); + for _ in 0..neg_buckets_size { + let bucket_id = buf.get_i32(); + let count = buf.get_u64(); + state.neg_buckets.insert(bucket_id, count); + } + while !buf.is_empty() { + let bucket_id = buf.get_i32(); + let count = buf.get_u64(); + state.pos_buckets.insert(bucket_id, count); + } + Ok(AggregateState::Any(Box::new(state))) } } diff --git a/src/frontend/src/handler/alter_streaming_rate_limit.rs b/src/frontend/src/handler/alter_streaming_rate_limit.rs index e5273b85d57b8..d255da31a52ed 100644 --- a/src/frontend/src/handler/alter_streaming_rate_limit.rs +++ b/src/frontend/src/handler/alter_streaming_rate_limit.rs @@ -68,7 +68,7 @@ pub async fn handle_alter_streaming_rate_limit( let source_id = if let Some(id) = table.associated_source_id { id.table_id() } else { - bail!("ALTER STREAMING_RATE_LIMIT is not for table without source") + bail!("ALTER SOURCE_RATE_LIMIT is not for table without source") }; (StatementType::ALTER_SOURCE, source_id) } diff --git a/src/frontend/src/handler/create_secret.rs b/src/frontend/src/handler/create_secret.rs index 8e3e56f324b44..9810751361be3 100644 --- a/src/frontend/src/handler/create_secret.rs +++ b/src/frontend/src/handler/create_secret.rs @@ -15,6 +15,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use prost::Message; use risingwave_common::bail_not_implemented; +use risingwave_common::license::Feature; use risingwave_sqlparser::ast::{CreateSecretStatement, SqlOption, Value}; use crate::error::{ErrorCode, Result}; @@ -30,6 +31,10 @@ pub async fn handle_create_secret( handler_args: HandlerArgs, stmt: CreateSecretStatement, ) -> Result { + Feature::SecretManagement + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; + let session = handler_args.session.clone(); let db_name = session.database(); let (schema_name, connection_name) = diff --git a/src/frontend/src/handler/drop_secret.rs b/src/frontend/src/handler/drop_secret.rs index 4001cc99cfd8f..eff4b35224b8b 100644 --- a/src/frontend/src/handler/drop_secret.rs +++ b/src/frontend/src/handler/drop_secret.rs @@ -13,6 +13,7 @@ // limitations under the License. use pgwire::pg_response::StatementType; +use risingwave_common::license::Feature; use risingwave_sqlparser::ast::ObjectName; use crate::catalog::root_catalog::SchemaPath; @@ -25,6 +26,10 @@ pub async fn handle_drop_secret( secret_name: ObjectName, if_exists: bool, ) -> Result { + Feature::SecretManagement + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; + let session = handler_args.session; let db_name = session.database(); let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, secret_name)?; diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index f8beeedb19438..c3dcf0ce59768 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -697,7 +697,7 @@ pub async fn handle( } => alter_table_with_sr::handle_refresh_schema(handler_args, name).await, Statement::AlterTable { name, - operation: AlterTableOperation::SetStreamingRateLimit { rate_limit }, + operation: AlterTableOperation::SetSourceRateLimit { rate_limit }, } => { alter_streaming_rate_limit::handle_alter_streaming_rate_limit( handler_args, @@ -814,7 +814,7 @@ pub async fn handle( Statement::AlterView { materialized, name, - operation: AlterViewOperation::SetStreamingRateLimit { rate_limit }, + operation: AlterViewOperation::SetBackfillRateLimit { rate_limit }, } if materialized => { alter_streaming_rate_limit::handle_alter_streaming_rate_limit( handler_args, @@ -946,7 +946,7 @@ pub async fn handle( } => alter_source_with_sr::handler_refresh_schema(handler_args, name).await, Statement::AlterSource { name, - operation: AlterSourceOperation::SetStreamingRateLimit { rate_limit }, + operation: AlterSourceOperation::SetSourceRateLimit { rate_limit }, } => { alter_streaming_rate_limit::handle_alter_streaming_rate_limit( handler_args, diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index c0f41b8d82c70..d1ca0177261da 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -136,7 +136,6 @@ impl Agg { self.agg_calls.iter().all(|c| { matches!(c.agg_kind, agg_kinds::single_value_state!()) || (matches!(c.agg_kind, agg_kinds::single_value_state_iff_in_append_only!() if stream_input_append_only)) - || (matches!(c.agg_kind, AggKind::Builtin(PbAggKind::ApproxPercentile))) }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index a7aef5195ea5a..406ebdc8bfae5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -256,7 +256,7 @@ impl StreamCdcTableScan { // The table desc used by backfill executor state_table: Some(catalog), cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()), - rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit, disable_backfill: options.disable_backfill, options: Some(options), }); diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 08516631dc75b..3f8c89d7def1a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -119,7 +119,7 @@ impl StreamNode for StreamFsFetch { .map(|c| c.to_protobuf()) .collect_vec(), with_properties, - rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + rate_limit: self.base.ctx().overwrite_options().source_rate_limit, secret_refs, } }); diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 980df7911c7f3..159748bbdc18a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -112,7 +112,7 @@ impl StreamNode for StreamSource { .map(|c| c.to_protobuf()) .collect_vec(), with_properties, - rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + rate_limit: self.base.ctx().overwrite_options().source_rate_limit, secret_refs, } }); diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index 47689c1038790..83c79259952b2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -158,7 +158,7 @@ impl StreamSourceScan { .map(|c| c.to_protobuf()) .collect_vec(), with_properties, - rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit, secret_refs, }; diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 2255194dbee64..8a85a0c324dca 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -320,7 +320,7 @@ impl StreamTableScan { table_desc: Some(self.core.table_desc.try_to_protobuf()?), state_table: Some(catalog), arrangement_table, - rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit, ..Default::default() }); diff --git a/src/frontend/src/utils/overwrite_options.rs b/src/frontend/src/utils/overwrite_options.rs index 904518331234f..185f1b80b154b 100644 --- a/src/frontend/src/utils/overwrite_options.rs +++ b/src/frontend/src/utils/overwrite_options.rs @@ -16,20 +16,34 @@ use crate::handler::HandlerArgs; #[derive(Debug, Clone, Default)] pub struct OverwriteOptions { - pub streaming_rate_limit: Option, + pub source_rate_limit: Option, + pub backfill_rate_limit: Option, } impl OverwriteOptions { - pub(crate) const STREAMING_RATE_LIMIT_KEY: &'static str = "streaming_rate_limit"; + pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit"; + pub(crate) const SOURCE_RATE_LIMIT_KEY: &'static str = "source_rate_limit"; pub fn new(args: &mut HandlerArgs) -> Self { - let streaming_rate_limit = { - // CREATE MATERIALIZED VIEW m1 WITH (rate_limit = N) ... - if let Some(x) = args.with_options.remove(Self::STREAMING_RATE_LIMIT_KEY) { + let source_rate_limit = { + if let Some(x) = args.with_options.remove(Self::SOURCE_RATE_LIMIT_KEY) { // FIXME(tabVersion): validate the value Some(x.parse::().unwrap()) } else { - let rate_limit = args.session.config().streaming_rate_limit(); + let rate_limit = args.session.config().source_rate_limit(); + if rate_limit < 0 { + None + } else { + Some(rate_limit as u32) + } + } + }; + let backfill_rate_limit = { + if let Some(x) = args.with_options.remove(Self::BACKFILL_RATE_LIMIT_KEY) { + // FIXME(tabVersion): validate the value + Some(x.parse::().unwrap()) + } else { + let rate_limit = args.session.config().backfill_rate_limit(); if rate_limit < 0 { None } else { @@ -38,7 +52,8 @@ impl OverwriteOptions { } }; Self { - streaming_rate_limit, + source_rate_limit, + backfill_rate_limit, } } } diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 2456dceb9fb19..e306103c02e39 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -89,8 +89,7 @@ impl WithOptions { .inner .into_iter() .filter(|(key, _)| { - key != OverwriteOptions::STREAMING_RATE_LIMIT_KEY - && key != options::RETENTION_SECONDS + key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS }) .collect(); diff --git a/src/license/src/feature.rs b/src/license/src/feature.rs index ecfa9199f78d9..186e15c998ae4 100644 --- a/src/license/src/feature.rs +++ b/src/license/src/feature.rs @@ -46,6 +46,7 @@ macro_rules! for_all_features { { TestPaid, Paid, "A dummy feature that's only available on paid tier for testing purposes." }, { TimeTravel, Paid, "Query historical data within the retention period."}, { GlueSchemaRegistry, Paid, "Use Schema Registry from AWS Glue rather than Confluent." }, + { SecretManagement, Paid, "Secret management." }, } }; } diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index 1d6fcdd7f8fef..238f1bc4e6d00 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cmp::{Ordering, Reverse}; -use std::collections::BinaryHeap; +use std::collections::{BTreeMap, BinaryHeap}; use std::fmt::Write; use std::sync::Arc; @@ -22,6 +22,8 @@ use prometheus_http_query::response::Data::Vector; use risingwave_common::types::Timestamptz; use risingwave_common::util::StackTraceResponseExt; use risingwave_hummock_sdk::level::Level; +use risingwave_meta_model_v2::table::TableType; +use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::event_log::Event; use risingwave_pb::meta::EventLog; @@ -32,7 +34,8 @@ use thiserror_ext::AsReport; use crate::hummock::HummockManagerRef; use crate::manager::event_log::EventLogMangerRef; -use crate::manager::MetadataManager; +use crate::manager::{MetadataManager, MetadataManagerV2}; +use crate::MetaResult; pub type DiagnoseCommandRef = Arc; @@ -88,7 +91,15 @@ impl DiagnoseCommand { async fn write_catalog(&self, s: &mut String) { match &self.metadata_manager { MetadataManager::V1(_) => self.write_catalog_v1(s).await, - MetadataManager::V2(_) => self.write_catalog_v2(s).await, + MetadataManager::V2(mgr) => { + self.write_catalog_v2(s).await; + let _ = self.write_table_definition(mgr, s).await.inspect_err(|e| { + tracing::warn!( + error = e.to_report_string(), + "failed to display table definition" + ) + }); + } } } @@ -458,7 +469,6 @@ impl DiagnoseCommand { let top_k = 10; let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k); - let mut top_range_delete_sst = BinaryHeap::with_capacity(top_k); for compaction_group in version.levels.values() { let mut visit_level = |level: &Level| { sst_num += level.table_infos.len(); @@ -474,15 +484,6 @@ impl DiagnoseCommand { delete_ratio: tombstone_delete_ratio, }; top_k_sstables(top_k, &mut top_tombstone_delete_sst, e); - - let range_delete_ratio = - sst.range_tombstone_count * 10000 / sst.total_key_count; - let e = SstableSort { - compaction_group_id: compaction_group.group_id, - sst_id: sst.sst_id, - delete_ratio: range_delete_ratio, - }; - top_k_sstables(top_k, &mut top_range_delete_sst, e); } }; let Some(ref l0) = compaction_group.l0 else { @@ -522,8 +523,6 @@ impl DiagnoseCommand { let _ = writeln!(s, "top tombstone delete ratio"); let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst)); let _ = writeln!(s); - let _ = writeln!(s, "top range delete ratio"); - let _ = writeln!(s, "{}", format_table(top_range_delete_sst)); let _ = writeln!(s); self.write_storage_prometheus(s).await; @@ -678,6 +677,114 @@ impl DiagnoseCommand { write!(s, "{}", all.output()).unwrap(); } + + async fn write_table_definition( + &self, + mgr: &MetadataManagerV2, + s: &mut String, + ) -> MetaResult<()> { + let sources = mgr + .catalog_controller + .list_sources() + .await? + .into_iter() + .map(|s| { + // The usage of secrets suggests that it's safe to display the definition. + let redact = if !s.get_secret_refs().is_empty() { + false + } else { + !s.get_info() + .map(|i| !i.get_format_encode_secret_refs().is_empty()) + .unwrap_or(false) + }; + (s.id, (s.name, s.schema_id, s.definition, redact)) + }) + .collect::>(); + let tables = mgr + .catalog_controller + .list_tables_by_type(TableType::Table) + .await? + .into_iter() + .map(|t| { + let redact = + if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) = + t.optional_associated_source_id + { + sources.get(&source_id).map(|s| s.3).unwrap_or(true) + } else { + false + }; + (t.id, (t.name, t.schema_id, t.definition, redact)) + }) + .collect::>(); + let mvs = mgr + .catalog_controller + .list_tables_by_type(TableType::MaterializedView) + .await? + .into_iter() + .map(|t| (t.id, (t.name, t.schema_id, t.definition, false))) + .collect::>(); + let indexes = mgr + .catalog_controller + .list_tables_by_type(TableType::Index) + .await? + .into_iter() + .map(|t| (t.id, (t.name, t.schema_id, t.definition, false))) + .collect::>(); + let sinks = mgr + .catalog_controller + .list_sinks() + .await? + .into_iter() + .map(|s| { + // The usage of secrets suggests that it's safe to display the definition. + let redact = if !s.get_secret_refs().is_empty() { + false + } else { + !s.format_desc + .map(|i| !i.get_secret_refs().is_empty()) + .unwrap_or(false) + }; + (s.id, (s.name, s.schema_id, s.definition, redact)) + }) + .collect::>(); + let catalogs = [ + ("SOURCE", sources), + ("TABLE", tables), + ("MATERIALIZED VIEW", mvs), + ("INDEX", indexes), + ("SINK", sinks), + ]; + for (title, items) in catalogs { + use comfy_table::{Row, Table}; + let mut table = Table::new(); + table.set_header({ + let mut row = Row::new(); + row.add_cell("id".into()); + row.add_cell("name".into()); + row.add_cell("schema_id".into()); + row.add_cell("definition".into()); + row + }); + for (id, (name, schema_id, definition, redact)) in items { + let mut row = Row::new(); + let may_redact = if redact { + "[REDACTED]".into() + } else { + definition + }; + row.add_cell(id.into()); + row.add_cell(name.into()); + row.add_cell(schema_id.into()); + row.add_cell(may_redact.into()); + table.add_row(row); + } + let _ = writeln!(s); + let _ = writeln!(s, "{title}"); + let _ = writeln!(s, "{table}"); + } + Ok(()) + } } #[cfg_attr(coverage, coverage(off))] diff --git a/src/risedevtool/src/task/schema_registry_service.rs b/src/risedevtool/src/task/schema_registry_service.rs index 95a84a562ff07..0f37c732452ce 100644 --- a/src/risedevtool/src/task/schema_registry_service.rs +++ b/src/risedevtool/src/task/schema_registry_service.rs @@ -42,6 +42,9 @@ impl DockerServiceConfig for SchemaRegistryConfig { panic!("More than one Kafka is not supported yet"); } let kafka = &kafka[0]; + if kafka.user_managed { + panic!("user-managed Kafka with docker Schema Registry is not supported yet. Please make them both or neither user-managed."); + } vec![ ("SCHEMA_REGISTRY_HOST_NAME".to_owned(), self.address.clone()), ( diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 92683f42e742a..6ea385df950fc 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -102,8 +102,8 @@ pub enum AlterTableOperation { deferred: bool, }, RefreshSchema, - /// `SET STREAMING_RATE_LIMIT TO ` - SetStreamingRateLimit { + /// `SET SOURCE_RATE_LIMIT TO ` + SetSourceRateLimit { rate_limit: i32, }, } @@ -138,8 +138,8 @@ pub enum AlterViewOperation { parallelism: SetVariableValue, deferred: bool, }, - /// `SET STREAMING_RATE_LIMIT TO ` - SetStreamingRateLimit { + /// `SET BACKFILL_RATE_LIMIT TO ` + SetBackfillRateLimit { rate_limit: i32, }, } @@ -180,7 +180,7 @@ pub enum AlterSourceOperation { SetSchema { new_schema_name: ObjectName }, FormatEncode { connector_schema: ConnectorSchema }, RefreshSchema, - SetStreamingRateLimit { rate_limit: i32 }, + SetSourceRateLimit { rate_limit: i32 }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -290,8 +290,8 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::RefreshSchema => { write!(f, "REFRESH SCHEMA") } - AlterTableOperation::SetStreamingRateLimit { rate_limit } => { - write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) + AlterTableOperation::SetSourceRateLimit { rate_limit } => { + write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) } } } @@ -341,8 +341,8 @@ impl fmt::Display for AlterViewOperation { if *deferred { " DEFERRED" } else { "" } ) } - AlterViewOperation::SetStreamingRateLimit { rate_limit } => { - write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) + AlterViewOperation::SetBackfillRateLimit { rate_limit } => { + write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit) } } } @@ -412,8 +412,8 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::RefreshSchema => { write!(f, "REFRESH SCHEMA") } - AlterSourceOperation::SetStreamingRateLimit { rate_limit } => { - write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) + AlterSourceOperation::SetSourceRateLimit { rate_limit } => { + write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 561b602298f11..768301544ef21 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -489,7 +489,6 @@ define_keywords!( STDDEV_SAMP, STDIN, STORED, - STREAMING_RATE_LIMIT, STRING, STRUCT, SUBMULTISET, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 449db7864f2b2..1fe1b16e9601c 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -1812,6 +1812,18 @@ impl Parser<'_> { self.expected(expected) } + /// Check if the expected match is the next token. + /// The equality check is case-insensitive. + pub fn parse_word(&mut self, expected: &str) -> bool { + match self.peek_token().token { + Token::Word(w) if w.value.to_uppercase() == expected => { + self.next_token(); + true + } + _ => false, + } + } + /// Look for an expected keyword and consume it if it exists #[must_use] pub fn parse_keyword(&mut self, expected: Keyword) -> bool { @@ -3078,10 +3090,10 @@ impl Parser<'_> { parallelism: value, deferred, } - } else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? { - AlterTableOperation::SetStreamingRateLimit { rate_limit } + } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(true)? { + AlterTableOperation::SetSourceRateLimit { rate_limit } } else { - return self.expected("SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET"); + return self.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT after SET"); } } else if self.parse_keyword(Keyword::DROP) { let _ = self.parse_keyword(Keyword::COLUMN); @@ -3133,14 +3145,37 @@ impl Parser<'_> { }) } - /// STREAMING_RATE_LIMIT = default | NUMBER - /// STREAMING_RATE_LIMIT TO default | NUMBER - pub fn parse_alter_streaming_rate_limit(&mut self) -> PResult> { - if !self.parse_keyword(Keyword::STREAMING_RATE_LIMIT) { + /// BACKFILL_RATE_LIMIT = default | NUMBER + /// BACKFILL_RATE_LIMIT TO default | NUMBER + pub fn parse_alter_backfill_rate_limit(&mut self) -> PResult> { + if !self.parse_word("BACKFILL_RATE_LIMIT") { + return Ok(None); + } + if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { + return self.expected("TO or = after ALTER TABLE SET BACKFILL_RATE_LIMIT"); + } + let rate_limit = if self.parse_keyword(Keyword::DEFAULT) { + -1 + } else { + let s = self.parse_number_value()?; + if let Ok(n) = s.parse::() { + n + } else { + return self.expected("number or DEFAULT"); + } + }; + Ok(Some(rate_limit)) + } + + /// SOURCE_RATE_LIMIT = default | NUMBER + /// SOURCE_RATE_LIMIT TO default | NUMBER + pub fn parse_alter_source_rate_limit(&mut self, is_table: bool) -> PResult> { + if !self.parse_word("SOURCE_RATE_LIMIT") { return Ok(None); } if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { - return self.expected("TO or = after ALTER TABLE SET STREAMING_RATE_LIMIT"); + let ddl = if is_table { "TABLE" } else { "SOURCE" }; + return self.expected(&format!("TO or = after ALTER {ddl} SET SOURCE_RATE_LIMIT")); } let rate_limit = if self.parse_keyword(Keyword::DEFAULT) { -1 @@ -3229,11 +3264,11 @@ impl Parser<'_> { deferred, } } else if materialized - && let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? + && let Some(rate_limit) = self.parse_alter_backfill_rate_limit()? { - AlterViewOperation::SetStreamingRateLimit { rate_limit } + AlterViewOperation::SetBackfillRateLimit { rate_limit } } else { - return self.expected("SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET"); + return self.expected("SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET"); } } else { return self.expected(&format!( @@ -3354,8 +3389,8 @@ impl Parser<'_> { AlterSourceOperation::SetSchema { new_schema_name: schema_name, } - } else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? { - AlterSourceOperation::SetStreamingRateLimit { rate_limit } + } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(false)? { + AlterSourceOperation::SetSourceRateLimit { rate_limit } } else { return self.expected("SCHEMA after SET"); } @@ -3369,7 +3404,7 @@ impl Parser<'_> { AlterSourceOperation::RefreshSchema } else { return self.expected( - "RENAME, ADD COLUMN, OWNER TO, SET or STREAMING_RATE_LIMIT after ALTER SOURCE", + "RENAME, ADD COLUMN, OWNER TO, SET or SOURCE_RATE_LIMIT after ALTER SOURCE", ); }; diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 726fc3ccad556..ca4fea1fdde47 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -362,20 +362,32 @@ impl SinkExecutor { } else { chunks }; - let chunks = if re_construct_with_sink_pk { - StreamChunkCompactor::new(down_stream_pk.clone(), chunks) + if re_construct_with_sink_pk { + let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks) .reconstructed_compacted_chunks( chunk_size, input_data_types.clone(), sink_type != SinkType::ForceAppendOnly, - ) + ); + for c in chunks { + yield Message::Chunk(c); + } } else { - chunks + let mut chunk_builder = + StreamChunkBuilder::new(chunk_size, input_data_types.clone()); + for chunk in chunks { + for (op, row) in chunk.rows() { + if let Some(c) = chunk_builder.append_row(op, row) { + yield Message::Chunk(c); + } + } + } + + if let Some(c) = chunk_builder.take() { + yield Message::Chunk(c); + } }; - for c in chunks { - yield Message::Chunk(c); - } if let Some(w) = mem::take(&mut watermark) { yield Message::Watermark(w) } diff --git a/src/tests/simulation/tests/integration_tests/backfill_tests.rs b/src/tests/simulation/tests/integration_tests/backfill_tests.rs index 1d8f7bb727101..fb7cb3db6fb8b 100644 --- a/src/tests/simulation/tests/integration_tests/backfill_tests.rs +++ b/src/tests/simulation/tests/integration_tests/backfill_tests.rs @@ -167,7 +167,7 @@ async fn test_arrangement_backfill_replication() -> Result<()> { session .run("SET STREAMING_USE_ARRANGEMENT_BACKFILL=true") .await?; - session.run("SET STREAMING_RATE_LIMIT=30").await?; + session.run("SET BACKFILL_RATE_LIMIT=30").await?; session .run("create materialized view m1 as select * from t") .await?; @@ -251,7 +251,7 @@ async fn test_arrangement_backfill_progress() -> Result<()> { // Create arrangement backfill with rate limit session.run("SET STREAMING_PARALLELISM=1").await?; session.run("SET BACKGROUND_DDL=true").await?; - session.run("SET STREAMING_RATE_LIMIT=1").await?; + session.run("SET BACKFILL_RATE_LIMIT=1").await?; session .run("CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t") .await?; @@ -311,7 +311,7 @@ async fn test_enable_arrangement_backfill() -> Result<()> { async fn test_recovery_cancels_foreground_ddl() -> Result<()> { let mut cluster = Cluster::start(Configuration::enable_arrangement_backfill()).await?; let mut session = cluster.start_session(); - session.run("SET STREAMING_RATE_LIMIT=1").await?; + session.run("SET BACKFILL_RATE_LIMIT=1").await?; session.run("CREATE TABLE t(v1 int);").await?; session .run("INSERT INTO t select * from generate_series(1, 100000);") diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 80ff76a3f1020..b3776bcd57b64 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -28,9 +28,9 @@ const DROP_TABLE: &str = "DROP TABLE t;"; const SEED_TABLE_500: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);"; const SEED_TABLE_100: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100);"; const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;"; -const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;"; -const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;"; -const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=DEFAULT;"; +const SET_RATE_LIMIT_2: &str = "SET BACKFILL_RATE_LIMIT=2;"; +const SET_RATE_LIMIT_1: &str = "SET BACKFILL_RATE_LIMIT=1;"; +const RESET_RATE_LIMIT: &str = "SET BACKFILL_RATE_LIMIT=DEFAULT;"; const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;"; const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;"; const WAIT: &str = "WAIT;";