diff --git a/ci/scripts/e2e-test-parallel-for-opendal.sh b/ci/scripts/e2e-test-parallel-for-opendal.sh index 606adcf929cd7..a6a6c89e41164 100755 --- a/ci/scripts/e2e-test-parallel-for-opendal.sh +++ b/ci/scripts/e2e-test-parallel-for-opendal.sh @@ -31,7 +31,7 @@ host_args=(-h localhost -p 4565 -h localhost -p 4566 -h localhost -p 4567) echo "--- e2e, ci-3cn-3fe-opendal-fs-backend, streaming" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-3cn-3fe-opendal-fs-backend -sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-opendal-fs-backend-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-opendal-fs-backend-${profile}" --label "parallel" echo "--- Kill cluster Streaming" risedev ci-kill @@ -41,8 +41,8 @@ rm -rf /tmp/rw_ci echo "--- e2e, ci-3cn-3fe-opendal-fs-backend, batch" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-3cn-3fe-opendal-fs-backend -sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-opendal-fs-backend-ddl-${profile}" -sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-opendal-fs-backend-batch-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-opendal-fs-backend-ddl-${profile}" --label "parallel" +sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-opendal-fs-backend-batch-${profile}" --label "parallel" echo "--- Kill cluster Batch" risedev ci-kill diff --git a/ci/scripts/e2e-test-parallel-in-memory.sh b/ci/scripts/e2e-test-parallel-in-memory.sh index fcde15644c2b6..f7e6292bed54e 100755 --- a/ci/scripts/e2e-test-parallel-in-memory.sh +++ b/ci/scripts/e2e-test-parallel-in-memory.sh @@ -28,15 +28,15 @@ host_args=(-h localhost -p 4565 -h localhost -p 4566 -h localhost -p 4567) echo "--- e2e, ci-3cn-3fe-in-memory, streaming" risedev ci-start ci-3cn-3fe-in-memory sqllogictest --version -sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-in-memory-streaming-${profile}" --label in-memory +sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-in-memory-streaming-${profile}" --label "in-memory" --label "parallel" echo "--- Kill cluster" risedev ci-kill echo "--- e2e, ci-3cn-3fe-in-memory, batch" risedev ci-start ci-3cn-3fe-in-memory -sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-in-memory-batch-ddl-${profile}" --label in-memory -sqllogictest "${host_args[@]}" -d dev './e2e_test/batch/**/*.slt' -j 16 --junit "parallel-in-memory-batch-${profile}" --label in-memory +sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-in-memory-batch-ddl-${profile}" --label "in-memory" --label "parallel" +sqllogictest "${host_args[@]}" -d dev './e2e_test/batch/**/*.slt' -j 16 --junit "parallel-in-memory-batch-${profile}" --label "in-memory" --label "parallel" echo "--- Kill cluster" risedev ci-kill diff --git a/ci/scripts/e2e-test-parallel.sh b/ci/scripts/e2e-test-parallel.sh index 5f16a4c817871..c366ef07fc209 100755 --- a/ci/scripts/e2e-test-parallel.sh +++ b/ci/scripts/e2e-test-parallel.sh @@ -38,21 +38,21 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i echo "--- e2e, ci-3streaming-2serving-3fe, streaming" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" --label "parallel" kill_cluster echo "--- e2e, ci-3streaming-2serving-3fe, batch" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" -sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" --label "parallel" +sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel" kill_cluster echo "--- e2e, ci-3streaming-2serving-3fe, generated" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" --label "parallel" kill_cluster diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index 4736f4aa53a8a..c64b54ca25a43 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -80,6 +80,9 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i cluster_start # Please make sure the regression is expected before increasing the timeout. sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}" +if [[ "$mode" != "single-node" ]]; then + sqllogictest -p 4566 -d dev './e2e_test/streaming_now/**/*.slt' --junit "streaming-${profile}" +fi sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt' echo "--- Kill cluster" diff --git a/e2e_test/batch/subquery/lateral_subquery.slt.part b/e2e_test/batch/subquery/lateral_subquery.slt.part index 04e93a3d397ce..98077487828e4 100644 --- a/e2e_test/batch/subquery/lateral_subquery.slt.part +++ b/e2e_test/batch/subquery/lateral_subquery.slt.part @@ -82,3 +82,32 @@ drop table all_sales; statement ok drop table salesperson; +statement ok +CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + +statement ok +INSERT INTO r VALUES +('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 2, 2), +('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 1, 3), +('2024-06-20T19:00:23Z'::TIMESTAMPTZ, 1, 2), +('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 2, 1), +('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 1, 2), +('2024-06-20T19:00:25Z'::TIMESTAMPTZ, 2, 1); + +query TII rowsort +SELECT e.ts AS e_ts, d.* +FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e +JOIN LATERAL +( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC +)d on true; +---- +2024-06-20 19:01:00+00:00 2024-06-20 19:00:22+00:00 1 3 +2024-06-20 19:01:00+00:00 2024-06-20 19:00:24+00:00 1 2 + +statement ok +DROP TABLE r CASCADE; diff --git a/e2e_test/source_inline/kafka/include_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt index 332e15bd54b03..919e47bbb4248 100644 --- a/e2e_test/source_inline/kafka/include_key_as.slt +++ b/e2e_test/source_inline/kafka/include_key_as.slt @@ -184,23 +184,11 @@ rpk topic create 'test_additional_columns' # Note: `sh` doesn't have {..} brace expansion system ok -bash -c 'for i in {0..100}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done' +bash -c 'for i in {0..10}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done' -statement ok -create table additional_columns (a int) -include key as key_col -include partition as partition_col -include offset as offset_col -include timestamp as timestamp_col -include header as header_col -WITH ( - ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, - topic = 'test_additional_columns') -FORMAT PLAIN ENCODE JSON -# header with varchar type & non-exist header key statement error -create table additional_columns_1 (a int) +create table additional_columns (a int) include key as key_col include partition as partition_col include offset as offset_col @@ -220,13 +208,13 @@ Caused by: Protocol error: Only header column can have inner field, but got "timestamp" -# header with varchar type & non-exist header key statement ok -create table additional_columns_1 (a int) +create table additional_columns (a int) include key as key_col include partition as partition_col include offset as offset_col include timestamp as timestamp_col +include header as header_col_combined include header 'header1' as header_col_1 include header 'header2' as header_col_2 include header 'header2' varchar as header_col_3 @@ -236,7 +224,6 @@ WITH ( topic = 'test_additional_columns') FORMAT PLAIN ENCODE JSON - # Wait enough time to ensure SourceExecutor consumes all Kafka data. sleep 3s @@ -284,19 +271,21 @@ WHERE key_col IS NOT NULL AND partition_col IS NOT NULL AND offset_col IS NOT NULL AND timestamp_col IS NOT NULL - AND header_col IS NOT NULL + AND header_col_combined IS NOT NULL ---- -101 +11 query ?? -SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value -FROM additional_columns limit 1; +WITH arr AS (SELECT header_col_combined FROM additional_columns), +unnested AS (SELECT unnest(header_col_combined) FROM arr) +select *, count(*) from unnested group by 1 order by 1; ---- -header1 \x7631 +(header1,"\\x7631") 11 +(header2,"\\x7632") 11 query ???? -select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns_1 limit 1 +select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns limit 1 ---- \x7631 \x7632 v2 NULL @@ -305,3 +294,6 @@ drop table upsert_students statement ok drop table plain_students + +statement ok +drop table additional_columns diff --git a/e2e_test/streaming_now/README b/e2e_test/streaming_now/README new file mode 100644 index 0000000000000..ad5d817e3c958 --- /dev/null +++ b/e2e_test/streaming_now/README @@ -0,0 +1 @@ +Because currently the e2e test for `generate_series(..., now(), ...)` relies on external command `./risedev psql`, it cannot be tested in parallel, madsim and single-node mode. Let's just place it in this special folder and manually run it in `run-e2e-test.sh` to avoid complexity. Later if we introduce a new `now()`-like function that returns the time of statement execution, we'll be able to get rid of the external command and direcly create MV instead. We will move this to `streaming` folder at that time. diff --git a/e2e_test/streaming_now/now.slt b/e2e_test/streaming_now/now.slt new file mode 100644 index 0000000000000..ea9f85a489786 --- /dev/null +++ b/e2e_test/streaming_now/now.slt @@ -0,0 +1,30 @@ +system ok +./risedev psql -c " +create materialized view mv as +select * from generate_series( + to_timestamp($(date +%s)) - interval '10 second', + now(), + interval '1 second' +); +" + +statement ok +flush; + +query I +select count(*) >= 10 from mv; +---- +t + +sleep 2s + +statement ok +flush; + +query I +select count(*) >= 12 from mv; +---- +t + +statement ok +drop materialized view mv; diff --git a/grafana/README.md b/grafana/README.md index ca3d4da00f9cc..8402421eabb65 100644 --- a/grafana/README.md +++ b/grafana/README.md @@ -1,6 +1,6 @@ # RisingWave Grafana Dashboard -The Grafana dashboard is generated with grafanalib. You'll need +The Grafana dashboard is generated with `grafanalib`. You'll need - Python - grafanalib @@ -29,18 +29,25 @@ And don't forget to include the generated `risingwave--dashboard.json` in t ./update.sh ``` -## Advanced Usage +## Multi-cluster Deployment -We can specify the source uid, dashboard uid, dashboard version, enable namespace filter and enable risingwave_name filter(used in multi-cluster deployment) via env variables. +The `generate.sh` supports multi-cluster deployment. The following environment variables are helpful: -For example, we can use the following query to generate dashboard json used in our benchmark cluster: +- `DASHBOARD_NAMESPACE_FILTER_ENABLED`: When set to `true`, a drop-down list will be added to the Grafana dashboard, and all Prometheus queries will be filtered by the selected namespace. +- `DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED`: When set to `true`, a drop-down list will be added to the Grafana dashboard, and all Prometheus queries will be filtered by the selected RisingWave name. This is useful when you have multiple RisingWave instances in the same namespace. +- `DASHBOARD_SOURCE_UID`: Set to the UID of your Prometheus source. +- `DASHBOARD_DYNAMIC_SOURCE`: Alternative to `DASHBOARD_SOURCE_UID`. When set to `true`, a drop-down list will be added to the Grafana dashboard to pick any one of the Prometheus sources. +- `DASHBOARD_UID`: Set to the UID of your Grafana dashboard. + +See more details in the `common.py` file. + +Examples: ```bash DASHBOARD_NAMESPACE_FILTER_ENABLED=true \ DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED=true \ DASHBOARD_SOURCE_UID= \ DASHBOARD_UID= \ -DASHBOARD_VERSION= \ ./generate.sh ``` @@ -51,6 +58,5 @@ DASHBOARD_NAMESPACE_FILTER_ENABLED=true \ DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED=true \ DASHBOARD_DYNAMIC_SOURCE=true \ DASHBOARD_UID= \ -DASHBOARD_VERSION= \ ./generate.sh ``` diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 223a23813f68d..a43e34bde8df3 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -725,9 +725,21 @@ message RowIdGenNode { uint64 row_id_index = 1; } +message NowModeUpdateCurrent {} + +message NowModeGenerateSeries { + data.Datum start_timestamp = 1; + data.Datum interval = 2; +} + message NowNode { // Persists emitted 'now'. catalog.Table state_table = 1; + + oneof mode { + NowModeUpdateCurrent update_current = 101; + NowModeGenerateSeries generate_series = 102; + } } message ValuesNode { diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 1637b5df6c659..c063a8ca0cee7 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -209,7 +209,6 @@ impl MockDatagenSource { .unwrap(); let parser_config = ParserConfig { specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Native, protocol_config: ProtocolProperties::Native, }, diff --git a/src/common/src/array/stream_chunk_builder.rs b/src/common/src/array/stream_chunk_builder.rs index a13cc3676792a..c44d313fffffe 100644 --- a/src/common/src/array/stream_chunk_builder.rs +++ b/src/common/src/array/stream_chunk_builder.rs @@ -104,6 +104,11 @@ impl StreamChunkBuilder { } } + /// Get the current number of rows in the builder. + pub fn size(&self) -> usize { + self.size + } + /// Append an iterator of output index and datum to the builder, return a chunk if the builder /// is full. /// diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index a067c689f669d..56dbdf6c54da9 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -174,10 +174,11 @@ impl EpochPair { Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST) } } -/// As most unit tests initializ a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0. + +/// As most unit tests initialize a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0. /// This method is to turn a a random epoch into a well shifted value. -pub const fn test_epoch(value: u64) -> u64 { - value << EPOCH_AVAILABLE_BITS +pub const fn test_epoch(value_millis: u64) -> u64 { + value_millis << EPOCH_AVAILABLE_BITS } /// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch. diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index 5df7fa28118d3..eeccf17be7a27 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -63,7 +63,6 @@ mod tests { async fn test_bytes_parser(get_payload: fn() -> Vec>) { let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())]; let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }), protocol_config: ProtocolProperties::Plain, }; diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 817c2a788f2be..bc43e556cb4f6 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -19,7 +19,6 @@ use risingwave_common::bail; use super::simd_json_parser::DebeziumJsonAccessBuilder; use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig}; use crate::error::ConnectorResult; -use crate::extract_key_config; use crate::parser::unified::debezium::DebeziumChangeEvent; use crate::parser::unified::json::TimestamptzHandling; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; @@ -89,8 +88,8 @@ impl DebeziumParser { rw_columns: Vec, source_ctx: SourceContextRef, ) -> ConnectorResult { - let (key_config, key_type) = extract_key_config!(props); - let key_builder = build_accessor_builder(key_config, key_type).await?; + let key_builder = + build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?; let payload_builder = build_accessor_builder(props.encoding_config, EncodingType::Value).await?; let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config { @@ -114,7 +113,6 @@ impl DebeziumParser { use crate::parser::JsonProperties; let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, @@ -225,7 +223,6 @@ mod tests { .collect::>(); let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, @@ -298,7 +295,6 @@ mod tests { .collect::>(); let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 85399ed772768..f9738eb9e357e 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -130,7 +130,6 @@ mod tests { async fn build_parser(rw_columns: Vec) -> DebeziumParser { let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index 5db6cdd52e90c..e5bc7291ccf07 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -35,7 +35,6 @@ mod tests { ]; let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index c11c833457a99..bca1b727d66a8 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1049,7 +1049,6 @@ pub struct CommonParserConfig { #[derive(Debug, Clone, Default)] pub struct SpecificParserConfig { - pub key_encoding_config: Option, pub encoding_config: EncodingProperties, pub protocol_config: ProtocolProperties, } @@ -1057,7 +1056,6 @@ pub struct SpecificParserConfig { impl SpecificParserConfig { // for test only pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, @@ -1278,7 +1276,6 @@ impl SpecificParserConfig { } }; Ok(Self { - key_encoding_config: None, encoding_config, protocol_config, }) diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 30cb1fbf7d62e..590ba927854d3 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -92,22 +92,6 @@ macro_rules! only_parse_payload { }; } -// Extract encoding config and encoding type from ParserProperties -// for message key. -// -// Suppose (A, B) is the combination of key/payload combination: -// For (None, B), key should be the the key setting from B -// For (A, B), key should be the value setting from A -#[macro_export] -macro_rules! extract_key_config { - ($props:ident) => { - match $props.key_encoding_config { - Some(config) => (config, EncodingType::Value), - None => ($props.encoding_config.clone(), EncodingType::Key), - } - }; -} - /// Load raw bytes from: /// * local file, for on-premise or testing. /// * http/https, for common usage. diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index 5716631dff620..600efda2f6255 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -278,7 +278,6 @@ mod tests { use_schema_registry: false, timestamptz_handling: None, }), - key_encoding_config: None, }, data_types, rows_per_second, diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index d3115f504f32e..992343058e9ef 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -399,7 +399,6 @@ mod tests { state, ParserConfig { specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Native, protocol_config: ProtocolProperties::Native, }, @@ -457,7 +456,6 @@ mod tests { }; let parser_config = ParserConfig { specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Native, protocol_config: ProtocolProperties::Native, }, diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index c3c800d6a5317..0340e584d0b8f 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -289,7 +289,6 @@ mod tests { let config = ParserConfig { common: CommonParserConfig { rw_columns: descs }, specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Csv(csv_config), protocol_config: ProtocolProperties::Plain, }, diff --git a/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml new file mode 100644 index 0000000000000..e121aba41ff6f --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml @@ -0,0 +1,30 @@ +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamptz, + now(), + interval '1 hour' + ); + expected_outputs: + - logical_plan + - optimized_logical_plan_for_stream + - stream_plan +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported + now(), + interval '1 hour' + ); + expected_outputs: + - binder_error +- sql: | + select * from generate_series( + now() - interval '1 hour', + now(), + interval '1 hour' + ); + expected_outputs: + - stream_error +- sql: | + select * from unnest(array[now(), now()]); + expected_outputs: + - stream_error diff --git a/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml index 869bbdf6d7136..8b9126f18d641 100644 --- a/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml @@ -119,3 +119,18 @@ ) AS b ON TRUE; expected_outputs: - stream_plan +- name: https://github.com/risingwavelabs/risingwave/issues/17382 + sql: | + CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + SELECT e.ts AS e_ts, d.* + FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e + JOIN LATERAL + ( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC + )d on true; + expected_outputs: + - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml index 4ba572a54e600..f88f7c4d69b76 100644 --- a/src/frontend/planner_test/tests/testdata/output/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml @@ -543,7 +543,7 @@ Failed to bind expression: v1 >= now() Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: forbid now in select for stream sql: | create table t (v1 timestamp with time zone, v2 timestamp with time zone); @@ -552,7 +552,7 @@ Failed to bind expression: now() Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: forbid now in agg filter for stream sql: | create table t (v1 timestamp with time zone, v2 int); @@ -561,7 +561,7 @@ Failed to bind expression: sum(v2) FILTER(WHERE v1 >= now()) Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: typo pg_teminate_backend sql: | select pg_teminate_backend(1); diff --git a/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml new file mode 100644 index 0000000000000..4c8d71f987351 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml @@ -0,0 +1,35 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamptz, + now(), + interval '1 hour' + ); + logical_plan: |- + LogicalProject { exprs: [generate_series] } + └─LogicalTableFunction { table_function: GenerateSeries('2024-06-21 17:36:00':Varchar::Timestamptz, Now, '01:00:00':Interval) } + optimized_logical_plan_for_stream: 'LogicalNow { output: [ts] }' + stream_plan: |- + StreamMaterialize { columns: [generate_series], stream_key: [generate_series], pk_columns: [generate_series], pk_conflict: NoCheck, watermark_columns: [generate_series] } + └─StreamNow { output: [ts] } +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported + now(), + interval '1 hour' + ); + binder_error: function generate_series(timestamp without time zone, timestamp with time zone, interval) does not exist +- sql: | + select * from generate_series( + now() - interval '1 hour', + now(), + interval '1 hour' + ); + stream_error: |- + Not supported: General `now()` function in streaming queries + HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns. +- sql: | + select * from unnest(array[now(), now()]); + stream_error: |- + Not supported: General `now()` function in streaming queries + HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns. diff --git a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml index 39639b3ebb647..815890d6a73b8 100644 --- a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml @@ -242,3 +242,31 @@ └─StreamExchange { dist: HashShard(t1.c1, $expr3) } └─StreamProject { exprs: [t1.c1, t1.c2, t1.c3, AtTimeZone(t1.c4, 'UTC':Varchar) as $expr2, t1.c4::Date as $expr3, t1._row_id] } └─StreamTableScan { table: t1, columns: [t1.c1, t1.c2, t1.c3, t1.c4, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } +- name: https://github.com/risingwavelabs/risingwave/issues/17382 + sql: | + CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + SELECT e.ts AS e_ts, d.* + FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e + JOIN LATERAL + ( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC + )d on true; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: Inner, predicate: 1:Int32 IS NOT DISTINCT FROM 1:Int32 AND '2024-06-20 19:01:00+00:00':Timestamptz IS NOT DISTINCT FROM '2024-06-20 19:01:00+00:00':Timestamptz, output: ['2024-06-20 19:01:00+00:00':Timestamptz, r.ts, r.src_id, r.dev_id] } + ├─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + │ └─BatchValues { rows: [['2024-06-20 19:01:00+00:00':Timestamptz, 1:Int32]] } + └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + └─BatchGroupTopN { order: [r.src_id ASC, r.dev_id ASC, r.ts DESC], limit: 1, offset: 0, group_key: [1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz, r.src_id, r.dev_id] } + └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz, r.src_id, r.dev_id) } + └─BatchHashJoin { type: Inner, predicate: 1:Int32 = r.src_id AND (r.ts <= '2024-06-20 19:01:00+00:00':Timestamptz), output: all } + ├─BatchExchange { order: [], dist: HashShard(1:Int32) } + │ └─BatchHashAgg { group_key: [1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + │ └─BatchValues { rows: [[1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz]] } + └─BatchExchange { order: [], dist: HashShard(r.src_id) } + └─BatchScan { table: r, columns: [r.ts, r.src_id, r.dev_id], distribution: SomeShard } diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 09c05a29695dc..b9a3b27825abf 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1572,11 +1572,15 @@ impl Binder { if self.is_for_stream() && !matches!( self.context.clause, - Some(Clause::Where) | Some(Clause::Having) | Some(Clause::JoinOn) + Some(Clause::Where) + | Some(Clause::Having) + | Some(Clause::JoinOn) + | Some(Clause::From) ) { return Err(ErrorCode::InvalidInputSyntax(format!( - "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: {:?}. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information", + "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: {:?}. \ + Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information", self.context.clause )) .into()); diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 9dd5f7be1d53d..89142d0e9b237 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -413,7 +413,7 @@ macro_rules! impl_has_variant { }; } -impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction} +impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, Now} #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct InequalityInputPair { diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 9f28dfeb74c8c..0fecee8ab45c0 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -20,6 +20,7 @@ use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::AggKind; pub use risingwave_expr::sig::*; +use risingwave_pb::expr::table_function::PbType as PbTableFuncType; use super::{align_types, cast_ok_base, CastContext}; use crate::error::{ErrorCode, Result}; @@ -36,13 +37,24 @@ pub fn infer_type_with_sigmap( sig_map: &FunctionRegistry, ) -> Result { // special cases - if let FuncName::Scalar(func_type) = func_name - && let Some(res) = infer_type_for_special(func_type, inputs).transpose() - { - return res; - } - if let FuncName::Aggregate(AggKind::Grouping) = func_name { - return Ok(DataType::Int32); + match &func_name { + FuncName::Scalar(func_type) => { + if let Some(res) = infer_type_for_special(*func_type, inputs).transpose() { + return res; + } + } + FuncName::Table(func_type) => { + if let Some(res) = infer_type_for_special_table_function(*func_type, inputs).transpose() + { + return res; + } + } + FuncName::Aggregate(agg_kind) => { + if *agg_kind == AggKind::Grouping { + return Ok(DataType::Int32); + } + } + _ => {} } let actuals = inputs @@ -634,6 +646,34 @@ fn infer_type_for_special( } } +fn infer_type_for_special_table_function( + func_type: PbTableFuncType, + inputs: &mut [ExprImpl], +) -> Result> { + match func_type { + PbTableFuncType::GenerateSeries => { + if inputs.len() < 3 { + // let signature map handle this + return Ok(None); + } + match ( + inputs[0].return_type(), + inputs[1].return_type(), + inputs[2].return_type(), + ) { + (DataType::Timestamptz, DataType::Timestamptz, DataType::Interval) => { + // This is to allow `generate_series('2024-06-20 00:00:00'::timestamptz, now(), interval '1 day')`, + // which in streaming mode will be further converted to `StreamNow`. + Ok(Some(DataType::Timestamptz)) + } + // let signature map handle the rest + _ => Ok(None), + } + } + _ => Ok(None), + } +} + /// From all available functions in `sig_map`, find and return the best matching `FuncSign` for the /// provided `func_name` and `inputs`. This not only support exact function signature match, but can /// also match `substr(varchar, smallint)` or even `substr(varchar, unknown)` to `substr(varchar, diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index d452626bb9418..931a645b3d680 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -118,6 +118,14 @@ static DAG_TO_TREE: LazyLock = LazyLock::new(|| { ) }); +static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock = LazyLock::new(|| { + OptimizationStage::new( + "Convert GENERATE_SERIES Ends With NOW", + vec![GenerateSeriesWithNowRule::create()], + ApplyOrder::TopDown, + ) +}); + static TABLE_FUNCTION_TO_PROJECT_SET: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Table Function To Project Set", @@ -572,6 +580,9 @@ impl LogicalOptimizer { } plan = plan.optimize_by_rules(&SET_OPERATION_MERGE); plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN); + // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode. + // Should be applied before converting table function to project set. + plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW); // In order to unnest a table function, we need to convert it into a `project_set` first. plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_PROJECT_SET); diff --git a/src/frontend/src/optimizer/plan_node/convert.rs b/src/frontend/src/optimizer/plan_node/convert.rs index db961b3b1e20a..6f98073304d8b 100644 --- a/src/frontend/src/optimizer/plan_node/convert.rs +++ b/src/frontend/src/optimizer/plan_node/convert.rs @@ -84,11 +84,6 @@ pub fn stream_enforce_eowc_requirement( } Ok(StreamEowcSort::new(plan, watermark_col_idx).into()) } - } else if !emit_on_window_close && plan.emit_on_window_close() { - Err(ErrorCode::InternalError( - "Some bad thing happened, the generated plan is not correct.".to_string(), - ) - .into()) } else { Ok(plan) } diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 00db5730e8038..392f073371843 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -78,6 +78,8 @@ mod cte_ref; pub use cte_ref::*; mod recursive_union; pub use recursive_union::*; +mod now; +pub use now::*; pub trait DistillUnit { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a>; diff --git a/src/frontend/src/optimizer/plan_node/generic/now.rs b/src/frontend/src/optimizer/plan_node/generic/now.rs new file mode 100644 index 0000000000000..911217d064214 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/now.rs @@ -0,0 +1,106 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use educe::Educe; +use enum_as_inner::EnumAsInner; +use pretty_xmlish::{Pretty, Str, XmlNode}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::{DataType, Interval, Timestamptz}; + +use super::{DistillUnit, GenericPlanNode}; +use crate::optimizer::plan_node::utils::childless_record; +use crate::optimizer::property::FunctionalDependencySet; +use crate::OptimizerContextRef; + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct Now { + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, + + pub mode: Mode, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumAsInner)] +pub enum Mode { + /// Emit current timestamp on startup, update it on barrier. + UpdateCurrent, + /// Generate a series of timestamps starting from `start_timestamp` with `interval`. + /// Keep generating new timestamps on barrier. + GenerateSeries { + start_timestamp: Timestamptz, + interval: Interval, + }, +} + +impl GenericPlanNode for Now { + fn functional_dependency(&self) -> crate::optimizer::property::FunctionalDependencySet { + FunctionalDependencySet::new(1) // only one column and no dependency + } + + fn schema(&self) -> risingwave_common::catalog::Schema { + Schema::new(vec![Field { + data_type: DataType::Timestamptz, + name: String::from(if self.mode.is_update_current() { + "now" + } else { + "ts" + }), + sub_fields: vec![], + type_name: String::default(), + }]) + } + + fn stream_key(&self) -> Option> { + match self.mode { + Mode::UpdateCurrent => Some(vec![]), + Mode::GenerateSeries { .. } => Some(vec![0]), + } + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } +} + +impl Now { + pub fn update_current(ctx: OptimizerContextRef) -> Self { + Self::new_inner(ctx, Mode::UpdateCurrent) + } + + pub fn generate_series( + ctx: OptimizerContextRef, + start_timestamp: Timestamptz, + interval: Interval, + ) -> Self { + Self::new_inner( + ctx, + Mode::GenerateSeries { + start_timestamp, + interval, + }, + ) + } + + fn new_inner(ctx: OptimizerContextRef, mode: Mode) -> Self { + Self { ctx, mode } + } +} + +impl DistillUnit for Now { + fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a> { + childless_record(name, vec![("mode", Pretty::debug(&self.mode))]) + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index f9c33eb3d9cc1..ea34037c8977a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -14,10 +14,8 @@ use pretty_xmlish::XmlNode; use risingwave_common::bail; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; -use super::generic::GenericPlanRef; +use super::generic::{self, GenericPlanRef, Mode}; use super::utils::{childless_record, Distill}; use super::{ ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef, @@ -26,30 +24,25 @@ use super::{ use crate::error::Result; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::FunctionalDependencySet; use crate::utils::ColIndexMapping; -use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalNow { pub base: PlanBase, + core: generic::Now, } impl LogicalNow { - pub fn new(ctx: OptimizerContextRef) -> Self { - let schema = Schema::new(vec![Field { - data_type: DataType::Timestamptz, - name: String::from("now"), - sub_fields: vec![], - type_name: String::default(), - }]); - let base = PlanBase::new_logical( - ctx, - schema, - Some(vec![]), - FunctionalDependencySet::default(), - ); - Self { base } + pub fn new(core: generic::Now) -> Self { + let base = PlanBase::new_logical_with_core(&core); + Self { base, core } + } + + pub fn max_one_row(&self) -> bool { + match self.core.mode { + Mode::UpdateCurrent => true, + Mode::GenerateSeries { .. } => false, + } } } @@ -91,7 +84,7 @@ impl ToStream for LogicalNow { /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)` fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - Ok(StreamNow::new(self.clone(), self.ctx()).into()) + Ok(StreamNow::new(self.core.clone()).into()) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 6f31b35a21ee0..8f6966ece6c70 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -22,7 +22,7 @@ use super::{ LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamProjectSet, ToBatch, ToStream, }; -use crate::error::Result; +use crate::error::{ErrorCode, Result}; use crate::expr::{ collect_input_refs, Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef, TableFunction, @@ -400,6 +400,16 @@ impl ToStream for LogicalProjectSet { // TODO: implement to_stream_with_dist_required like LogicalProject fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { + if self.select_list().iter().any(|item| item.has_now()) { + // User may use `now()` in table function in a wrong way, because we allow `now()` in `FROM` clause. + return Err(ErrorCode::NotSupported( + "General `now()` function in streaming queries".to_string(), + "Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns." + .to_string(), + ) + .into()); + } + let new_input = self.input().to_stream(ctx)?; let mut new_logical = self.core.clone(); new_logical.input = new_input; diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index d27321c08d06b..22a0d2c5fb0fb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -14,46 +14,39 @@ use fixedbitset::FixedBitSet; use pretty_xmlish::XmlNode; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; +use risingwave_common::types::Datum; +use risingwave_common::util::value_encoding::DatumToProtoExt; +use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::NowNode; +use risingwave_pb::stream_plan::{PbNowModeGenerateSeries, PbNowModeUpdateCurrent, PbNowNode}; +use super::generic::Mode; use super::stream::prelude::*; use super::utils::{childless_record, Distill, TableCatalogBuilder}; -use super::{ExprRewritable, LogicalNow, PlanBase, StreamNode}; +use super::{generic, ExprRewritable, PlanBase, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::{Distribution, FunctionalDependencySet}; +use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamNow { pub base: PlanBase, + core: generic::Now, } impl StreamNow { - pub fn new(_logical: LogicalNow, ctx: OptimizerContextRef) -> Self { - let schema = Schema::new(vec![Field { - data_type: DataType::Timestamptz, - name: String::from("now"), - sub_fields: vec![], - type_name: String::default(), - }]); + pub fn new(core: generic::Now) -> Self { let mut watermark_columns = FixedBitSet::with_capacity(1); watermark_columns.set(0, true); - let base = PlanBase::new_stream( - ctx, - schema, - Some(vec![]), - FunctionalDependencySet::default(), + let base = PlanBase::new_stream_with_core( + &core, Distribution::Single, - false, - false, // TODO(rc): derive EOWC property from input + core.mode.is_generate_series(), // append only + core.mode.is_generate_series(), // emit on window close watermark_columns, ); - Self { base } + Self { base, core } } } @@ -83,8 +76,18 @@ impl StreamNode for StreamNow { let table_catalog = internal_table_catalog_builder .build(dist_keys, 0) .with_id(state.gen_table_id_wrapped()); - NodeBody::Now(NowNode { + NodeBody::Now(PbNowNode { state_table: Some(table_catalog.to_internal_table_prost()), + mode: Some(match &self.core.mode { + Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}), + Mode::GenerateSeries { + start_timestamp, + interval, + } => PbNowMode::GenerateSeries(PbNowModeGenerateSeries { + start_timestamp: Some(Datum::Some((*start_timestamp).into()).to_protobuf()), + interval: Some(Datum::Some((*interval).into()).to_protobuf()), + }), + }), }) } } diff --git a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs index c81bf539a5713..07459b59b1d5f 100644 --- a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs @@ -174,8 +174,12 @@ impl PlanVisitor for CardinalityVisitor { } } - fn visit_logical_now(&mut self, _plan: &plan_node::LogicalNow) -> Cardinality { - 1.into() + fn visit_logical_now(&mut self, plan: &plan_node::LogicalNow) -> Cardinality { + if plan.max_one_row() { + 1.into() + } else { + Cardinality::unknown() + } } fn visit_logical_expand(&mut self, plan: &plan_node::LogicalExpand) -> Cardinality { diff --git a/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs index 9a6884b33e9a2..dbdb180b1358c 100644 --- a/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs @@ -49,7 +49,8 @@ impl Rule for ApplyTopNTransposeRule { apply.clone().decompose(); assert_eq!(join_type, JoinType::Inner); let topn: &LogicalTopN = right.as_logical_top_n()?; - let (topn_input, limit, offset, with_ties, mut order, group_key) = topn.clone().decompose(); + let (topn_input, limit, offset, with_ties, mut order, mut group_key) = + topn.clone().decompose(); let apply_left_len = left.schema().len(); @@ -73,6 +74,7 @@ impl Rule for ApplyTopNTransposeRule { .column_orders .iter_mut() .for_each(|ord| ord.column_index += apply_left_len); + group_key.iter_mut().for_each(|idx| *idx += apply_left_len); let new_group_key = (0..apply_left_len).chain(group_key).collect_vec(); LogicalTopN::new(new_apply, limit, offset, with_ties, order, new_group_key) }; diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 9364a6f2b7f5b..fd06402c7497f 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -92,6 +92,7 @@ pub use top_n_on_index_rule::*; mod stream; pub use stream::bushy_tree_join_ordering_rule::*; pub use stream::filter_with_now_to_join_rule::*; +pub use stream::generate_series_with_now_rule::*; pub use stream::split_now_and_rule::*; pub use stream::split_now_or_rule::*; pub use stream::stream_project_merge_rule::*; @@ -203,6 +204,7 @@ macro_rules! for_all_rules { , { SplitNowAndRule } , { SplitNowOrRule } , { FilterWithNowToJoinRule } + , { GenerateSeriesWithNowRule } , { TopNOnIndexRule } , { TrivialProjectToValuesRule } , { UnionInputValuesMergeRule } diff --git a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs index 498696589c81b..cbdb65b4528a5 100644 --- a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs +++ b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs @@ -18,7 +18,7 @@ use risingwave_pb::plan_common::JoinType; use crate::expr::{ try_derive_watermark, ExprRewriter, FunctionCall, InputRef, WatermarkDerivation, }; -use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::generic::{self, GenericPlanRef}; use crate::optimizer::plan_node::{LogicalFilter, LogicalJoin, LogicalNow}; use crate::optimizer::rule::{BoxedRule, Rule}; use crate::optimizer::PlanRef; @@ -63,7 +63,7 @@ impl Rule for FilterWithNowToJoinRule { for now_filter in now_filters { new_plan = LogicalJoin::new( new_plan, - LogicalNow::new(plan.ctx()).into(), + LogicalNow::new(generic::Now::update_current(plan.ctx())).into(), JoinType::LeftSemi, Condition { conjunctions: vec![now_filter.into()], diff --git a/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs b/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs new file mode 100644 index 0000000000000..665967f6660b0 --- /dev/null +++ b/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs @@ -0,0 +1,86 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::DataType; +use risingwave_pb::expr::table_function::PbType as PbTableFuncType; + +use crate::expr::{Expr, ExprRewriter}; +use crate::optimizer::plan_node::{generic, LogicalNow}; +use crate::optimizer::rule::{BoxedRule, Rule}; +use crate::PlanRef; + +pub struct GenerateSeriesWithNowRule {} +impl Rule for GenerateSeriesWithNowRule { + fn apply(&self, plan: PlanRef) -> Option { + let ctx = plan.ctx(); + let table_func = plan.as_logical_table_function()?.table_function(); + + if !table_func.args.iter().any(|arg| arg.has_now()) { + return None; + } + + if !(table_func.function_type == PbTableFuncType::GenerateSeries + && table_func.args.len() == 3 + && table_func.args[0].return_type() == DataType::Timestamptz + && table_func.args[1].is_now() + && table_func.args[2].return_type() == DataType::Interval) + { + // only convert `generate_series(const timestamptz, now(), const interval)` + ctx.warn_to_user( + "`now()` is currently only supported in `generate_series(timestamptz, timestamptz, interval)` function as `stop`. \ + You may not using it correctly. Please kindly check the document." + ); + return None; + } + + let start_timestamp = ctx + .session_timezone() + .rewrite_expr(table_func.args[0].clone()) + .try_fold_const() + .transpose() + .ok() + .flatten() + .flatten(); + let interval = ctx + .session_timezone() + .rewrite_expr(table_func.args[2].clone()) + .try_fold_const() + .transpose() + .ok() + .flatten() + .flatten(); + + if start_timestamp.is_none() || interval.is_none() { + ctx.warn_to_user( + "When using `generate_series` with `now()`, the `start` and `step` must be non-NULL constants", + ); + return None; + } + + Some( + LogicalNow::new(generic::Now::generate_series( + ctx, + start_timestamp.unwrap().into_timestamptz(), + interval.unwrap().into_interval(), + )) + .into(), + ) + } +} + +impl GenerateSeriesWithNowRule { + pub fn create() -> BoxedRule { + Box::new(Self {}) + } +} diff --git a/src/frontend/src/optimizer/rule/stream/mod.rs b/src/frontend/src/optimizer/rule/stream/mod.rs index cc86298e766e8..539d9048cff60 100644 --- a/src/frontend/src/optimizer/rule/stream/mod.rs +++ b/src/frontend/src/optimizer/rule/stream/mod.rs @@ -14,6 +14,7 @@ pub(crate) mod bushy_tree_join_ordering_rule; pub(crate) mod filter_with_now_to_join_rule; +pub(crate) mod generate_series_with_now_rule; pub(crate) mod split_now_and_rule; pub(crate) mod split_now_or_rule; pub(crate) mod stream_project_merge_rule; diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 3aeab64bef128..9ba9333cee8e1 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1073,6 +1073,67 @@ impl CatalogManager { Ok(()) } + /// `finish_stream_job` finishes a stream job and clean some states. + pub async fn finish_stream_job( + &self, + mut stream_job: StreamingJob, + internal_tables: Vec, + ) -> MetaResult { + // 1. finish procedure. + let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec(); + + // Update the corresponding 'created_at' field. + stream_job.mark_created(); + + let version = match stream_job { + StreamingJob::MaterializedView(table) => { + creating_internal_table_ids.push(table.id); + self.finish_create_table_procedure(internal_tables, table) + .await? + } + StreamingJob::Sink(sink, target_table) => { + let sink_id = sink.id; + + let mut version = self + .finish_create_sink_procedure(internal_tables, sink) + .await?; + + if let Some((table, source)) = target_table { + version = self + .finish_replace_table_procedure(&source, &table, None, Some(sink_id), None) + .await?; + } + + version + } + StreamingJob::Table(source, table, ..) => { + creating_internal_table_ids.push(table.id); + if let Some(source) = source { + self.finish_create_table_procedure_with_source(source, table, internal_tables) + .await? + } else { + self.finish_create_table_procedure(internal_tables, table) + .await? + } + } + StreamingJob::Index(index, table) => { + creating_internal_table_ids.push(table.id); + self.finish_create_index_procedure(internal_tables, index, table) + .await? + } + StreamingJob::Source(source) => { + self.finish_create_source_procedure(source, internal_tables) + .await? + } + }; + + // 2. unmark creating tables. + self.unmark_creating_tables(&creating_internal_table_ids, false) + .await; + + Ok(version) + } + /// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`. pub async fn finish_create_table_procedure( &self, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index e1c6ebb0f32a8..cc5d7dd8971ad 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1317,8 +1317,9 @@ impl DdlController { }; tracing::debug!(id = job_id, "finishing stream job"); - let version = self - .finish_stream_job(mgr, stream_job, internal_tables) + let version = mgr + .catalog_manager + .finish_stream_job(stream_job, internal_tables) .await?; tracing::debug!(id = job_id, "finished stream job"); @@ -1757,84 +1758,6 @@ impl DdlController { Ok(()) } - /// `finish_stream_job` finishes a stream job and clean some states. - async fn finish_stream_job( - &self, - mgr: &MetadataManagerV1, - mut stream_job: StreamingJob, - internal_tables: Vec
, - ) -> MetaResult { - // 1. finish procedure. - let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec(); - - // Update the corresponding 'created_at' field. - stream_job.mark_created(); - - let version = match stream_job { - StreamingJob::MaterializedView(table) => { - creating_internal_table_ids.push(table.id); - mgr.catalog_manager - .finish_create_table_procedure(internal_tables, table) - .await? - } - StreamingJob::Sink(sink, target_table) => { - let sink_id = sink.id; - - let mut version = mgr - .catalog_manager - .finish_create_sink_procedure(internal_tables, sink) - .await?; - - if let Some((table, source)) = target_table { - let streaming_job = - StreamingJob::Table(source, table, TableJobType::Unspecified); - - version = self - .finish_replace_table( - mgr.catalog_manager.clone(), - &streaming_job, - None, - Some(sink_id), - None, - ) - .await?; - } - - version - } - StreamingJob::Table(source, table, ..) => { - creating_internal_table_ids.push(table.id); - if let Some(source) = source { - mgr.catalog_manager - .finish_create_table_procedure_with_source(source, table, internal_tables) - .await? - } else { - mgr.catalog_manager - .finish_create_table_procedure(internal_tables, table) - .await? - } - } - StreamingJob::Index(index, table) => { - creating_internal_table_ids.push(table.id); - mgr.catalog_manager - .finish_create_index_procedure(internal_tables, index, table) - .await? - } - StreamingJob::Source(source) => { - mgr.catalog_manager - .finish_create_source_procedure(source, internal_tables) - .await? - } - }; - - // 2. unmark creating tables. - mgr.catalog_manager - .unmark_creating_tables(&creating_internal_table_ids, false) - .await; - - Ok(version) - } - async fn drop_table_inner( &self, source_id: Option, diff --git a/src/risedevtool/connector.toml b/src/risedevtool/connector.toml index abc23b67abdf8..535b4db843172 100644 --- a/src/risedevtool/connector.toml +++ b/src/risedevtool/connector.toml @@ -17,7 +17,7 @@ script = ''' set -euo pipefail -if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ ^(11|17|19|20|21) ]]); then +if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ ^(11|17|19|20|21|22) ]]); then echo "JDK 11+ is not installed. Please install JDK 11+ first." exit 1 fi diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 168e619cc956a..4e564130accbb 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -641,7 +641,6 @@ impl CdcBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) { let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 6d5c5867060de..a6a2152283668 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -129,7 +129,7 @@ pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; pub use mview::*; pub use no_op::NoOpExecutor; -pub use now::NowExecutor; +pub use now::*; pub use over_window::*; pub use project::ProjectExecutor; pub use project_set::*; diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 43316fef71094..049eee7f8c724 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -15,30 +15,65 @@ use std::ops::Bound; use std::ops::Bound::Unbounded; +use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row; +use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef}; +use risingwave_expr::capture_context; +use risingwave_expr::expr::{ + build_func_non_strict, EvalErrorReport, ExpressionBoxExt, InputRefExpression, + LiteralExpression, NonStrictExpression, +}; +use risingwave_expr::expr_context::TIME_ZONE; use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::executor::prelude::*; +use crate::task::ActorEvalErrorReport; pub struct NowExecutor { data_types: Vec, + mode: NowMode, + eval_error_report: ActorEvalErrorReport, + /// Receiver of barrier channel. barrier_receiver: UnboundedReceiver, state_table: StateTable, } +pub enum NowMode { + /// Emit current timestamp on startup, update it on barrier. + UpdateCurrent, + /// Generate a series of timestamps starting from `start_timestamp` with `interval`. + /// Keep generating new timestamps on barrier. + GenerateSeries { + start_timestamp: Timestamptz, + interval: Interval, + }, +} + +enum ModeVars { + UpdateCurrent, + GenerateSeries { + chunk_builder: StreamChunkBuilder, + add_interval_expr: NonStrictExpression, + }, +} + impl NowExecutor { pub fn new( data_types: Vec, + mode: NowMode, + eval_error_report: ActorEvalErrorReport, barrier_receiver: UnboundedReceiver, state_table: StateTable, ) -> Self { Self { data_types, + mode, + eval_error_report, barrier_receiver, state_table, } @@ -48,24 +83,43 @@ impl NowExecutor { async fn execute_inner(self) { let Self { data_types, + mode, + eval_error_report, barrier_receiver, mut state_table, } = self; + let max_chunk_size = crate::config::chunk_size(); + // Whether the executor is paused. let mut paused = false; // The last timestamp **sent** to the downstream. let mut last_timestamp: Datum = None; + // Whether the first barrier is handled and `last_timestamp` is initialized. let mut initialized = false; + let mut mode_vars = match &mode { + NowMode::UpdateCurrent => ModeVars::UpdateCurrent, + NowMode::GenerateSeries { interval, .. } => { + // in most cases there won't be more than one row except for the first time + let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1)); + let add_interval_expr = + build_add_interval_expr_captured(*interval, eval_error_report)?; + ModeVars::GenerateSeries { + chunk_builder, + add_interval_expr, + } + } + }; + const MAX_MERGE_BARRIER_SIZE: usize = 64; #[for_await] for barriers in UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE) { - let mut timestamp = None; + let mut curr_timestamp = None; if barriers.len() > 1 { warn!( "handle multiple barriers at once in now executor: {}", @@ -74,7 +128,7 @@ impl NowExecutor { } for barrier in barriers { if !initialized { - // Handle the first barrier. + // Handle the initial barrier. state_table.init_epoch(barrier.epoch); let state_row = { let sub_range: &(Bound, Bound) = @@ -97,7 +151,7 @@ impl NowExecutor { } // Extract timestamp from the current epoch. - timestamp = Some(barrier.get_curr_epoch().as_scalar()); + curr_timestamp = Some(barrier.get_curr_epoch().as_scalar()); // Update paused state. if let Some(mutation) = barrier.mutation.as_deref() { @@ -116,28 +170,94 @@ impl NowExecutor { continue; } - let stream_chunk = if last_timestamp.is_some() { - let last_row = row::once(&last_timestamp); - let row = row::once(×tamp); - state_table.update(last_row, row); + match (&mode, &mut mode_vars) { + (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => { + let chunk = if last_timestamp.is_some() { + let last_row = row::once(&last_timestamp); + let row = row::once(&curr_timestamp); + state_table.update(last_row, row); + + StreamChunk::from_rows( + &[(Op::Delete, last_row), (Op::Insert, row)], + &data_types, + ) + } else { + let row = row::once(&curr_timestamp); + state_table.insert(row); + + StreamChunk::from_rows(&[(Op::Insert, row)], &data_types) + }; - StreamChunk::from_rows(&[(Op::Delete, last_row), (Op::Insert, row)], &data_types) - } else { - let row = row::once(×tamp); - state_table.insert(row); + yield Message::Chunk(chunk); + last_timestamp.clone_from(&curr_timestamp) + } + ( + NowMode::GenerateSeries { + start_timestamp, .. + }, + ModeVars::GenerateSeries { + chunk_builder, + ref add_interval_expr, + }, + ) => { + if last_timestamp.is_none() { + // We haven't emit any timestamp yet. Let's emit the first one and populate the state table. + let first = Some((*start_timestamp).into()); + let first_row = row::once(&first); + let _ = chunk_builder.append_row(Op::Insert, first_row); + state_table.insert(first_row); + last_timestamp = first; + } - StreamChunk::from_rows(&[(Op::Insert, row)], &data_types) - }; + // Now let's step through the timestamps from the last timestamp to the current timestamp. + // We use `last_row` as a temporary cursor to track the progress, and won't touch `last_timestamp` + // until the end of the loop, so that `last_timestamp` is always synced with the state table. + let mut last_row = OwnedRow::new(vec![last_timestamp.clone()]); + + loop { + if chunk_builder.size() >= max_chunk_size { + // Manually yield the chunk when size exceeds the limit. We don't want to use chunk builder + // with limited size here because the initial capacity can be too large for most cases. + // Basically only the first several chunks can potentially exceed the `max_chunk_size`. + if let Some(chunk) = chunk_builder.take() { + yield Message::Chunk(chunk); + } + } + + let next = add_interval_expr.eval_row_infallible(&last_row).await; + if DefaultOrdered(next.to_datum_ref()) + > DefaultOrdered(curr_timestamp.to_datum_ref()) + { + // We only increase the timestamp to the current timestamp. + break; + } + + let next_row = OwnedRow::new(vec![next]); + let _ = chunk_builder.append_row(Op::Insert, &next_row); + last_row = next_row; + } + + if let Some(chunk) = chunk_builder.take() { + yield Message::Chunk(chunk); + } - yield Message::Chunk(stream_chunk); + // Update the last timestamp. + state_table.update(row::once(&last_timestamp), &last_row); + last_timestamp = last_row + .into_inner() + .into_vec() + .into_iter() + .exactly_one() + .unwrap(); + } + _ => unreachable!(), + } yield Message::Watermark(Watermark::new( 0, DataType::Timestamptz, - timestamp.clone().unwrap(), + curr_timestamp.unwrap(), )); - - last_timestamp = timestamp; } } } @@ -148,36 +268,54 @@ impl Execute for NowExecutor { } } +#[capture_context(TIME_ZONE)] +pub fn build_add_interval_expr( + time_zone: &str, + interval: Interval, + eval_error_report: impl EvalErrorReport + 'static, +) -> risingwave_expr::Result { + let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0); + let interval = LiteralExpression::new(DataType::Interval, Some(interval.into())); + let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into())); + + use risingwave_pb::expr::expr_node::PbType as PbExprType; + build_func_non_strict( + PbExprType::AddWithTimeZone, + DataType::Timestamptz, + vec![ + timestamptz_input.boxed(), + interval.boxed(), + time_zone.boxed(), + ], + eval_error_report, + ) +} + #[cfg(test)] mod tests { - use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::test_prelude::StreamChunkTestExt; - use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::types::test_utils::IntervalTestExt; + use risingwave_common::util::epoch::test_epoch; use risingwave_storage::memory::MemoryStateStore; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; - use super::NowExecutor; - use crate::common::table::state_table::StateTable; + use super::*; use crate::executor::test_utils::StreamExecutorTestExt; - use crate::executor::{ - Barrier, BoxedMessageStream, Execute, Mutation, StreamExecutorResult, Watermark, - }; #[tokio::test] async fn test_now() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier - tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1)) - .unwrap(); + tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -188,7 +326,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -199,14 +337,17 @@ mod tests { ) ); - tx.send(Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16)) - .unwrap(); + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(2), + test_epoch(1), + )) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -218,7 +359,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -230,21 +371,25 @@ mod tests { ); // No more messages until the next barrier - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Recovery - drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(&state_store).await; - tx.send(Barrier::with_prev_epoch_for_test(3 << 16, 1 << 16)) - .unwrap(); + drop((tx, now)); + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(3), + test_epoch(2), + )) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), + // the last chunk was not checkpointed so the deleted old value should be `001` StreamChunk::from_pretty( " TZ - 2021-04-01T00:00:00.001Z @@ -253,7 +398,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -265,28 +410,32 @@ mod tests { ); // Recovery with paused - drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(&state_store).await; - tx.send(Barrier::new_test_barrier(4 << 16).with_mutation(Mutation::Pause)) - .unwrap(); + drop((tx, now)); + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; + tx.send( + Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(3)) + .with_mutation(Mutation::Pause), + ) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // There should be no messages until `Resume` - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Resume barrier tx.send( - Barrier::with_prev_epoch_for_test(5 << 16, 4 << 16).with_mutation(Mutation::Resume), + Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4)) + .with_mutation(Mutation::Resume), ) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), StreamChunk::from_pretty( @@ -297,7 +446,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -314,29 +463,30 @@ mod tests { #[tokio::test] async fn test_now_start_with_paused() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier - tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1).with_mutation(Mutation::Pause)) + tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause)) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // There should be no messages until `Resume` - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Resume barrier tx.send( - Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16).with_mutation(Mutation::Resume), + Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1)) + .with_mutation(Mutation::Resume), ) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -347,7 +497,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -359,7 +509,121 @@ mod tests { ); // No more messages until the next barrier - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); + + Ok(()) + } + + #[tokio::test] + async fn test_now_generate_series() -> StreamExecutorResult<()> { + TIME_ZONE::scope("UTC".to_string(), test_now_generate_series_inner()).await + } + + async fn test_now_generate_series_inner() -> StreamExecutorResult<()> { + let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); // 2021-03-31 23:59:50 UTC + let interval = Interval::from_millis(1000); // 1s interval + + let state_store = create_state_store(); + let (tx, mut now) = create_executor( + NowMode::GenerateSeries { + start_timestamp, + interval, + }, + &state_store, + ) + .await; + + // Init barrier + tx.send(Barrier::new_test_barrier(test_epoch(1000))) + .unwrap(); + now.next_unwrap_ready_barrier()?; + + // Initial timestamps + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!(chunk.cardinality(), 12); // seconds from 23:59:50 to 00:00:01 (inclusive) + + assert_eq!( + now.next_unwrap_ready_watermark()?, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap()) + ) + ); + + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(2000), + test_epoch(1000), + )) + .unwrap(); + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(3000), + test_epoch(2000), + )) + .unwrap(); + + now.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; + + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!( + chunk.compact(), + StreamChunk::from_pretty( + " TZ + + 2021-04-01T00:00:02.000Z + + 2021-04-01T00:00:03.000Z" + ) + ); + + let watermark = now.next_unwrap_ready_watermark()?; + assert_eq!( + watermark, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap()) + ) + ); + + // Recovery + drop((tx, now)); + let (tx, mut now) = create_executor( + NowMode::GenerateSeries { + start_timestamp, + interval, + }, + &state_store, + ) + .await; + + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(4000), + test_epoch(3000), + )) + .unwrap(); + + now.next_unwrap_ready_barrier()?; + + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!( + chunk.compact(), + StreamChunk::from_pretty( + " TZ + + 2021-04-01T00:00:02.000Z + + 2021-04-01T00:00:03.000Z + + 2021-04-01T00:00:04.000Z" + ) + ); + + let watermark = now.next_unwrap_ready_watermark()?; + assert_eq!( + watermark, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap()) + ) + ); Ok(()) } @@ -369,6 +633,7 @@ mod tests { } async fn create_executor( + mode: NowMode, state_store: &MemoryStateStore, ) -> (UnboundedSender, BoxedMessageStream) { let table_id = TableId::new(1); @@ -384,8 +649,17 @@ mod tests { let (sender, barrier_receiver) = unbounded_channel(); - let now_executor = - NowExecutor::new(vec![DataType::Timestamptz], barrier_receiver, state_table); + let eval_error_report = ActorEvalErrorReport { + actor_context: ActorContext::for_test(123), + identity: "NowExecutor".into(), + }; + let now_executor = NowExecutor::new( + vec![DataType::Timestamptz], + mode, + eval_error_report, + barrier_receiver, + state_table, + ); (sender, now_executor.boxed().execute()) } } diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index a917cd136f3b4..9eac7caa13557 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::stream_plan::NowNode; +use anyhow::Context; +use risingwave_common::types::{DataType, Datum}; +use risingwave_common::util::value_encoding::DatumFromProtoExt; +use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; +use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries}; use risingwave_storage::StateStore; use tokio::sync::mpsc::unbounded_channel; use super::ExecutorBuilder; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; -use crate::executor::{Executor, NowExecutor}; +use crate::executor::{Executor, NowExecutor, NowMode}; use crate::task::ExecutorParams; pub struct NowExecutorBuilder; @@ -37,11 +41,43 @@ impl ExecutorBuilder for NowExecutorBuilder { .create_actor_context .register_sender(params.actor_context.id, sender); + let mode = if let Ok(pb_mode) = node.get_mode() { + match pb_mode { + PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent, + PbNowMode::GenerateSeries(PbNowModeGenerateSeries { + start_timestamp, + interval, + }) => { + let start_timestamp = Datum::from_protobuf( + start_timestamp.as_ref().unwrap(), + &DataType::Timestamptz, + ) + .context("`start_timestamp` field is not decodable")? + .context("`start_timestamp` field should not be NULL")? + .into_timestamptz(); + let interval = + Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval) + .context("`interval` field is not decodable")? + .context("`interval` field should not be NULL")? + .into_interval(); + NowMode::GenerateSeries { + start_timestamp, + interval, + } + } + } + } else { + // default to `UpdateCurrent` for backward-compatibility + NowMode::UpdateCurrent + }; + let state_table = StateTable::from_table_catalog(node.get_state_table()?, store, None).await; let exec = NowExecutor::new( params.info.schema.data_types(), + mode, + params.eval_error_report, barrier_receiver, state_table, ); diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 51a735af0f5a7..a199e4c8acb9f 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -40,6 +40,7 @@ #![feature(btree_cursors)] #![feature(assert_matches)] #![feature(try_blocks)] +#![feature(result_flattening)] // required by `capture_context` use std::sync::Arc; diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index e2fdfb6b54bb8..943d9bffcf4ca 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -20,7 +20,7 @@ use anyhow::{bail, Result}; use itertools::Itertools; use rand::{thread_rng, Rng, SeedableRng}; use rand_chacha::ChaChaRng; -use sqllogictest::{ParallelTestError, QueryExpect, Record, StatementExpect}; +use sqllogictest::{Condition, ParallelTestError, QueryExpect, Record, StatementExpect}; use crate::client::RisingWave; use crate::cluster::{Cluster, KillOpts}; @@ -274,6 +274,11 @@ pub async fn run_slt_task( } = &record && matches!(cmd, SqlCmd::CreateMaterializedView { .. }) && !manual_background_ddl_enabled + && conditions.iter().all(|c| { + *c != Condition::SkipIf { + label: "madsim".to_string(), + } + }) { let background_ddl_setting = rng.gen_bool(background_ddl_rate); let set_background_ddl = Record::Statement {