From 4f8bfc4481dcba8e6c3a38e69d1d6ea3e930fb73 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 24 Jun 2024 14:31:02 +0800 Subject: [PATCH 1/9] refactor: refine error message and test for FORMAT UPSERT (#17397) Signed-off-by: xxchan Co-authored-by: Bugen Zhao --- Makefile.toml | 3 +- ci/Dockerfile | 4 +- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +- e2e_test/source/basic/inlcude_key_as.slt | 133 -------- .../kafka/avro/name_strategy.slt | 1 - .../source_inline/kafka/include_key_as.slt | 307 ++++++++++++++++++ scripts/source/prepare_ci_kafka.sh | 4 - src/frontend/src/handler/create_source.rs | 96 +++--- src/frontend/src/handler/create_table.rs | 6 +- src/sqlparser/src/ast/statement.rs | 23 +- 11 files changed, 389 insertions(+), 200 deletions(-) delete mode 100644 e2e_test/source/basic/inlcude_key_as.slt create mode 100644 e2e_test/source_inline/kafka/include_key_as.slt diff --git a/Makefile.toml b/Makefile.toml index 1e618b1aed6ba..cdbc49a490ff5 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1294,7 +1294,7 @@ echo "All processes has exited." [tasks.slt] category = "RiseDev - Test - SQLLogicTest" -install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ +install_crate = { min_version = "0.20.6", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", ], install_command = "binstall" } dependencies = ["check-and-load-risedev-env-file"] @@ -1308,6 +1308,7 @@ SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}" SLT_DB = "dev" PATH = "${PWD}/e2e_test/commands:${PATH}" RUST_BACKTRACE = "0" # slt backtrace is useless +RUST_LOG = "sqllogictest=warn,sqllogictest::system_command=info" [tasks.slt-clean] category = "RiseDev - Test - SQLLogicTest" diff --git a/ci/Dockerfile b/ci/Dockerfile index 1e98d02fa522d..04e2154a93b17 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -70,14 +70,14 @@ ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash RUN cargo binstall -y --no-symlinks cargo-llvm-cov cargo-nextest cargo-hakari cargo-sort cargo-cache cargo-audit \ cargo-make@0.37.9 \ - sqllogictest-bin@0.20.4 \ + sqllogictest-bin@0.20.6 \ sccache@0.7.4 \ && cargo cache -a \ && rm -rf "/root/.cargo/registry/index" \ && rm -rf "/root/.cargo/registry/cache" \ && rm -rf "/root/.cargo/git/db" RUN cargo install cargo-dylint@3.1.0 dylint-link@3.1.0 -RUN cargo uninstall cargo-binstall cargo-cache +RUN cargo uninstall cargo-cache # install risedev COPY < ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) + + +# upsert format must have a pk +statement error +CREATE TABLE upsert_students ( + "ID" INT, + "firstName" VARCHAR, + "lastName" VARCHAR, + age INT, + height REAL, + weight REAL +) +INCLUDE KEY AS rw_key +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_include_key') +FORMAT UPSERT ENCODE JSON +---- +db error: ERROR: Failed to run the query + +Caused by: + Protocol error: Primary key must be specified to rw_key + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) + + +# upsert format pk must be the key column +statement error +CREATE TABLE upsert_students ( + "ID" INT primary key, + "firstName" VARCHAR, + "lastName" VARCHAR, + age INT, + height REAL, + weight REAL +) +INCLUDE KEY AS rw_key +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_include_key') +FORMAT UPSERT ENCODE JSON +---- +db error: ERROR: Failed to run the query + +Caused by: + Protocol error: Only "rw_key" can be used as primary key + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) + + +statement error +CREATE SOURCE upsert_students ( + primary key (rw_key), + "ID" INT, + "firstName" VARCHAR, + "lastName" VARCHAR, + age INT, + height REAL, + weight REAL, +) +INCLUDE KEY AS rw_key +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_include_key') +FORMAT UPSERT ENCODE JSON +---- +db error: ERROR: Failed to run the query + +Caused by: + Bind error: can't CREATE SOURCE with FORMAT UPSERT + +Hint: use CREATE TABLE instead + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) + + +statement ok +CREATE TABLE plain_students ( + "ID" INT, + "firstName" VARCHAR, + "lastName" VARCHAR, + age INT, + height REAL, + weight REAL, +) +INCLUDE KEY AS rw_key +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_include_key') +FORMAT PLAIN ENCODE JSON + + +statement ok +CREATE TABLE upsert_students ( + primary key (rw_key), + "ID" INT, + "firstName" VARCHAR, + "lastName" VARCHAR, + age INT, + height REAL, + weight REAL, +) +INCLUDE KEY AS rw_key +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_include_key') +FORMAT UPSERT ENCODE JSON + + +system ok +rpk topic delete 'test_additional_columns' || true; + +system ok +rpk topic create 'test_additional_columns' + +# Note: `sh` doesn't have {..} brace expansion +system ok +bash -c 'for i in {0..100}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done' + +statement ok +create table additional_columns (a int) +include key as key_col +include partition as partition_col +include offset as offset_col +include timestamp as timestamp_col +include header as header_col +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_additional_columns') +FORMAT PLAIN ENCODE JSON + +# header with varchar type & non-exist header key +statement error +create table additional_columns_1 (a int) +include key as key_col +include partition as partition_col +include offset as offset_col +include timestamp 'header1' as timestamp_col +include header 'header1' as header_col_1 +include header 'header2' as header_col_2 +include header 'header2' varchar as header_col_3 +include header 'header3' as header_col_4 +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_additional_columns') +FORMAT PLAIN ENCODE JSON +---- +db error: ERROR: Failed to run the query + +Caused by: + Protocol error: Only header column can have inner field, but got "timestamp" + + +# header with varchar type & non-exist header key +statement ok +create table additional_columns_1 (a int) +include key as key_col +include partition as partition_col +include offset as offset_col +include timestamp as timestamp_col +include header 'header1' as header_col_1 +include header 'header2' as header_col_2 +include header 'header2' varchar as header_col_3 +include header 'header3' as header_col_4 +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_additional_columns') +FORMAT PLAIN ENCODE JSON + + +# Wait enough time to ensure SourceExecutor consumes all Kafka data. +sleep 3s + +query ? +select count(rw_key) from plain_students +---- +15 + +query ??????? +select * from plain_students order by ("ID", "firstName"); +---- +1 Ethan Martinez 18 6.1 180 \x7b224944223a20317d +1 John Doe 18 5.1 150 \x7b224944223a20317d +1 Olivia Hernandez 22 5.6 125 \x7b224944223a20317d +2 Emily Jackson 19 5.4 110 \x7b224944223a20327d +2 Sarah Smith 19 5.5 120 \x7b224944223a20327d +3 Ben Johnson 21 6 175 \x7b224944223a20337d +3 Noah Thompson 21 6.3 195 \x7b224944223a20337d +4 Emma Brown 20 5.3 130 \x7b224944223a20347d +5 Michael Williams 22 6.2 190 \x7b224944223a20357d +6 Leah Davis 18 5.7 140 \x7b224944223a20367d +7 Connor Wilson 19 5.9 160 \x7b224944223a20377d +8 Ava Garcia 21 5.2 115 \x7b224944223a20387d +9 Jacob Anderson 20 5.8 155 \x7b224944223a20397d +NULL NULL NULL NULL NULL NULL \x7b224944223a20377d +NULL NULL NULL NULL NULL NULL \x7b224944223a20387d + + +query ??????? +select * from upsert_students order by "ID"; +---- +1 Ethan Martinez 18 6.1 180 \x7b224944223a20317d +2 Emily Jackson 19 5.4 110 \x7b224944223a20327d +3 Noah Thompson 21 6.3 195 \x7b224944223a20337d +4 Emma Brown 20 5.3 130 \x7b224944223a20347d +5 Michael Williams 22 6.2 190 \x7b224944223a20357d +6 Leah Davis 18 5.7 140 \x7b224944223a20367d +9 Jacob Anderson 20 5.8 155 \x7b224944223a20397d + + +query ? +SELECT count(*) +FROM additional_columns +WHERE key_col IS NOT NULL + AND partition_col IS NOT NULL + AND offset_col IS NOT NULL + AND timestamp_col IS NOT NULL + AND header_col IS NOT NULL +---- +101 + + +query ?? +SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value +FROM additional_columns limit 1; +---- +header1 \x7631 + +query ???? +select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns_1 limit 1 +---- +\x7631 \x7632 v2 NULL + +statement ok +drop table upsert_students + +statement ok +drop table plain_students diff --git a/scripts/source/prepare_ci_kafka.sh b/scripts/source/prepare_ci_kafka.sh index 7bcb7871ab1c2..27c88e6e63af2 100755 --- a/scripts/source/prepare_ci_kafka.sh +++ b/scripts/source/prepare_ci_kafka.sh @@ -66,7 +66,3 @@ for filename in $kafka_data_files; do fi ) & done - -# test additional columns: produce messages with headers -ADDI_COLUMN_TOPIC="kafka_additional_columns" -for i in {0..100}; do echo "key$i:{\"a\": $i}" | kcat -P -b "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index e5ead9d71a19b..4c5a8abdec005 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -750,6 +750,19 @@ pub(crate) fn bind_all_columns( } } +/// TODO: perhaps put the hint in notice is better. The error message format might be not that reliable. +fn hint_upsert(encode: &Encode) -> String { + format!( + r#"Hint: For FORMAT UPSERT ENCODE {encode:}, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE {encode:} (...) +"# + ) +} + /// Bind column from source. Add key column to table columns if necessary. /// Return `pk_names`. pub(crate) async fn bind_source_pk( @@ -760,7 +773,7 @@ pub(crate) async fn bind_source_pk( with_properties: &WithOptions, ) -> Result> { let sql_defined_pk = !sql_defined_pk_names.is_empty(); - let key_column_name: Option = { + let include_key_column_name: Option = { // iter columns to check if contains additional columns from key part // return the key column names if exists columns.iter().find_map(|catalog| { @@ -793,34 +806,36 @@ pub(crate) async fn bind_source_pk( // For all Upsert formats, we only accept one and only key column as primary key. // Additional KEY columns must be set in this case and must be primary key. (Format::Upsert, encode @ Encode::Json | encode @ Encode::Avro) => { - if let Some(ref key_column_name) = key_column_name + if let Some(ref key_column_name) = include_key_column_name && sql_defined_pk { - if sql_defined_pk_names.len() != 1 { - return Err(RwError::from(ProtocolError(format!( - "upsert {:?} supports only one primary key column ({}).", - encode, key_column_name - )))); - } + // pk is set. check if it's valid + // the column name have been converted to real value in `handle_addition_columns` // so we don't ignore ascii case here - if !key_column_name.eq(sql_defined_pk_names[0].as_str()) { + if sql_defined_pk_names.len() != 1 + || !key_column_name.eq(sql_defined_pk_names[0].as_str()) + { return Err(RwError::from(ProtocolError(format!( - "upsert {}'s key column {} not match with sql defined primary key {}", - encode, key_column_name, sql_defined_pk_names[0] + "Only \"{}\" can be used as primary key\n\n{}", + key_column_name, + hint_upsert(encode) )))); } sql_defined_pk_names } else { - return if key_column_name.is_none() { + // pk not set, or even key not included + return if let Some(include_key_column_name) = include_key_column_name { Err(RwError::from(ProtocolError(format!( - "INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE {:?}", - encode + "Primary key must be specified to {}\n\n{}", + include_key_column_name, + hint_upsert(encode) )))) } else { Err(RwError::from(ProtocolError(format!( - "Primary key must be specified to {} when creating source with FORMAT UPSERT ENCODE {:?}", - key_column_name.unwrap(), encode)))) + "INCLUDE KEY clause not set\n\n{}", + hint_upsert(encode) + )))) }; } } @@ -1340,8 +1355,9 @@ pub fn bind_connector_props( } Ok(WithOptions::new(with_properties)) } + #[allow(clippy::too_many_arguments)] -pub async fn bind_create_source( +pub async fn bind_create_source_or_table_with_connector( handler_args: HandlerArgs, full_name: ObjectName, source_schema: ConnectorSchema, @@ -1364,9 +1380,25 @@ pub async fn bind_create_source( session.get_database_and_schema_id_for_create(schema_name.clone())?; if !is_create_source && with_properties.is_iceberg_connector() { - return Err( - ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(), - ); + return Err(ErrorCode::BindError( + "can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead" + .to_string(), + ) + .into()); + } + if is_create_source { + match source_schema.format { + Format::Upsert => { + return Err(ErrorCode::BindError(format!( + "can't CREATE SOURCE with FORMAT UPSERT\n\nHint: use CREATE TABLE instead\n\n{}", + hint_upsert(&source_schema.row_encode) + )) + .into()); + } + _ => { + // TODO: enhance error message for other formats + } + } } ensure_table_constraints_supported(&constraints)?; @@ -1522,7 +1554,7 @@ pub async fn handle_create_source( } let mut col_id_gen = ColumnIdGenerator::new_initial(); - let (source_catalog, database_id, schema_id) = bind_create_source( + let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector( handler_args.clone(), stmt.source_name, source_schema, @@ -1810,28 +1842,6 @@ pub mod tests { }; assert_eq!(columns, expect_columns); - // test derive include column for format upsert - let sql = "CREATE SOURCE s1 (v1 int) with (connector = 'kafka') format upsert encode json" - .to_string(); - match frontend.run_sql(sql).await { - Err(e) => { - assert_eq!( - e.to_string(), - "Protocol error: INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE Json" - ) - } - _ => unreachable!(), - } - - let sql = "CREATE SOURCE s2 (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format upsert encode json" - .to_string(); - match frontend.run_sql(sql).await { - Err(e) => { - assert_eq!(e.to_string(), "Protocol error: Primary key must be specified to _rw_kafka_key when creating source with FORMAT UPSERT ENCODE Json") - } - _ => unreachable!(), - } - let sql = "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json" .to_string(); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index c542762702053..de81115e628f1 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -55,8 +55,8 @@ use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId}; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime}; use crate::handler::create_source::{ - bind_columns_from_source, bind_connector_props, bind_create_source, bind_source_watermark, - handle_addition_columns, UPSTREAM_SOURCE_KEY, + bind_columns_from_source, bind_connector_props, bind_create_source_or_table_with_connector, + bind_source_watermark, handle_addition_columns, UPSTREAM_SOURCE_KEY, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind}; @@ -483,7 +483,7 @@ pub(crate) async fn gen_create_table_plan_with_source( let (columns_from_resolve_source, source_info) = bind_columns_from_source(session, &source_schema, &with_properties).await?; - let (source_catalog, database_id, schema_id) = bind_create_source( + let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector( handler_args.clone(), table_name, source_schema, diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 0c92209471450..facedadeb5bc0 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -94,11 +94,16 @@ pub struct CreateSourceStatement { pub include_column_options: IncludeOption, } +/// FORMAT means how to get the operation(Insert/Delete) from the input. +/// +/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed. #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Format { + /// The format is the same with RisingWave's internal representation. + /// Used internally for schema change Native, - // Keyword::NONE + /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user. None, // Keyword::DEBEZIUM Debezium, @@ -143,8 +148,8 @@ impl Format { "CANAL" => Format::Canal, "PLAIN" => Format::Plain, "UPSERT" => Format::Upsert, - "NATIVE" => Format::Native, // used internally for schema change - "NONE" => Format::None, // used by iceberg + "NATIVE" => Format::Native, + "NONE" => Format::None, _ => parser_err!( "expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT" ), @@ -152,6 +157,7 @@ impl Format { } } +/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed. #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Encode { @@ -160,8 +166,11 @@ pub enum Encode { Protobuf, // Keyword::PROTOBUF Json, // Keyword::JSON Bytes, // Keyword::BYTES - None, // Keyword::None - Text, // Keyword::TEXT + /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user. + None, + Text, // Keyword::TEXT + /// The encode is the same with RisingWave's internal representation. + /// Used internally for schema change Native, Template, } @@ -197,8 +206,8 @@ impl Encode { "PROTOBUF" => Encode::Protobuf, "JSON" => Encode::Json, "TEMPLATE" => Encode::Template, - "NATIVE" => Encode::Native, // used internally for schema change - "NONE" => Encode::None, // used by iceberg + "NATIVE" => Encode::Native, + "NONE" => Encode::None, _ => parser_err!( "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | NONE after Encode" ), From 165ef33e220b85c1d10fab55fe09f09195740bdc Mon Sep 17 00:00:00 2001 From: Wallace Date: Mon, 24 Jun 2024 14:44:47 +0800 Subject: [PATCH 2/9] fix(storage): fix reverse scan (#17217) Signed-off-by: Little-Wallace --- src/storage/src/mem_table.rs | 9 +- src/storage/src/memory.rs | 93 +++++++++-- src/stream/src/common/table/state_table.rs | 90 +++++++++-- .../executor/over_window/over_partition.rs | 147 +++++++++--------- 4 files changed, 235 insertions(+), 104 deletions(-) diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 8562991943c1a..f7c5073a6a8cf 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -433,6 +433,7 @@ pub(crate) async fn merge_stream<'a>( inner_stream: impl Stream> + 'static, table_id: TableId, epoch: u64, + rev: bool, ) { let inner_stream = inner_stream.peekable(); pin_mut!(inner_stream); @@ -459,7 +460,11 @@ pub(crate) async fn merge_stream<'a>( } (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => { debug_assert_eq!(inner_key.user_key.table_id, table_id); - match inner_key.user_key.table_key.cmp(mem_table_key) { + let mut ret = inner_key.user_key.table_key.cmp(mem_table_key); + if rev { + ret = ret.reverse(); + } + match ret { Ordering::Less => { // yield data from storage let (key, value) = inner_stream.next().await.unwrap()?; @@ -580,6 +585,7 @@ impl LocalStateStore for MemtableLocalState iter.into_stream(to_owned_item), self.table_id, self.epoch(), + false, )))) } } @@ -600,6 +606,7 @@ impl LocalStateStore for MemtableLocalState iter.into_stream(to_owned_item), self.table_id, self.epoch(), + true, )))) } } diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index a266fbc129cfd..261249d90ff8a 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cmp::Ordering; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; use std::sync::{Arc, LazyLock}; @@ -337,7 +337,6 @@ pub mod sled { } mod batched_iter { - use itertools::Itertools; use super::*; @@ -378,13 +377,10 @@ mod batched_iter { Some(Self::BATCH_SIZE), )? } else { - self.inner - .range( - (self.range.0.clone(), self.range.1.clone()), - Some(Self::BATCH_SIZE), - )? - .into_iter() - .collect_vec() + self.inner.range( + (self.range.0.clone(), self.range.1.clone()), + Some(Self::BATCH_SIZE), + )? }; if let Some((last_key, _)) = batch.last() { @@ -609,7 +605,7 @@ impl RangeKvStateStore { impl StateStoreRead for RangeKvStateStore { type ChangeLogIter = RangeKvStateStoreChangeLogIter; type Iter = RangeKvStateStoreIter; - type RevIter = RangeKvStateStoreIter; + type RevIter = RangeKvStateStoreRevIter; #[allow(clippy::unused_async)] async fn get( @@ -654,7 +650,7 @@ impl StateStoreRead for RangeKvStateStore { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - Ok(RangeKvStateStoreIter::new( + Ok(RangeKvStateStoreRevIter::new( batched_iter::Iter::new( self.inner.clone(), to_full_key_range(read_options.table_id, key_range), @@ -834,6 +830,81 @@ impl RangeKvStateStoreIter { } } +pub struct RangeKvStateStoreRevIter { + inner: batched_iter::Iter, + + epoch: HummockEpoch, + is_inclusive_epoch: bool, + + item_buffer: VecDeque, +} + +impl RangeKvStateStoreRevIter { + pub fn new( + inner: batched_iter::Iter, + epoch: HummockEpoch, + is_inclusive_epoch: bool, + ) -> Self { + Self { + inner, + epoch, + is_inclusive_epoch, + item_buffer: VecDeque::default(), + } + } +} + +impl StateStoreIter for RangeKvStateStoreRevIter { + #[allow(clippy::unused_async)] + async fn try_next(&mut self) -> StorageResult>> { + self.next_inner()?; + Ok(self + .item_buffer + .back() + .map(|(key, value)| (key.to_ref(), value.as_ref()))) + } +} + +impl RangeKvStateStoreRevIter { + fn next_inner(&mut self) -> StorageResult<()> { + self.item_buffer.pop_back(); + while let Some((key, value)) = self.inner.next()? { + let epoch = key.epoch_with_gap.pure_epoch(); + if epoch > self.epoch { + continue; + } + if epoch == self.epoch && !self.is_inclusive_epoch { + continue; + } + + let v = match value { + Some(v) => v, + None => { + if let Some(last_key) = self.item_buffer.front() + && key.user_key.as_ref() == last_key.0.user_key.as_ref() + { + self.item_buffer.clear(); + } + continue; + } + }; + + if let Some(last_key) = self.item_buffer.front() { + if key.user_key.as_ref() != last_key.0.user_key.as_ref() { + self.item_buffer.push_front((key, v)); + break; + } else { + self.item_buffer.pop_front(); + self.item_buffer.push_front((key, v)); + } + } else { + self.item_buffer.push_front((key, v)); + } + } + Ok(()) + } +} + pub struct RangeKvStateStoreChangeLogIter { new_value_iter: RangeKvStateStoreIter, old_value_iter: RangeKvStateStoreIter, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 1d1a5e2a6047f..98c8517cfbbc3 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1359,6 +1359,12 @@ where } } +pub trait KeyedRowStream<'a>: Stream>> + 'a {} +impl<'a, T> KeyedRowStream<'a> for T where + T: Stream>> + 'a +{ +} + // Iterator functions impl< S, @@ -1382,7 +1388,7 @@ where vnode: VirtualNode, pk_range: &(Bound, Bound), prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { + ) -> StreamExecutorResult> { Ok(deserialize_keyed_row_stream( self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options) .await?, @@ -1427,6 +1433,27 @@ where Ok(self.local_store.iter(table_key_range, read_options).await?) } + async fn rev_iter_kv( + &self, + table_key_range: TableKeyRange, + prefix_hint: Option, + prefetch_options: PrefetchOptions, + ) -> StreamExecutorResult<::RevIter<'_>> { + let read_options = ReadOptions { + prefix_hint, + retention_seconds: self.table_option.retention_seconds, + table_id: self.table_id, + prefetch_options, + cache_policy: CachePolicy::Fill(CacheContext::Default), + ..Default::default() + }; + + Ok(self + .local_store + .rev_iter(table_key_range, read_options) + .await?) + } + /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`. /// `pk_prefix` is used to identify the exact vnode the scan should perform on. @@ -1435,7 +1462,28 @@ where pk_prefix: impl Row, sub_range: &(Bound, Bound), prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { + ) -> StreamExecutorResult> { + self.iter_with_prefix_inner::(pk_prefix, sub_range, prefetch_options) + .await + } + + /// This function scans the table just like `iter_with_prefix`, but in reverse order. + pub async fn rev_iter_with_prefix( + &self, + pk_prefix: impl Row, + sub_range: &(Bound, Bound), + prefetch_options: PrefetchOptions, + ) -> StreamExecutorResult> { + self.iter_with_prefix_inner::(pk_prefix, sub_range, prefetch_options) + .await + } + + async fn iter_with_prefix_inner( + &self, + pk_prefix: impl Row, + sub_range: &(Bound, Bound), + prefetch_options: PrefetchOptions, + ) -> StreamExecutorResult> { let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); @@ -1466,7 +1514,8 @@ where trace!( table_id = %self.table_id(), ?prefix_hint, ?pk_prefix, - ?pk_prefix_indices, + ?pk_prefix_indices, + iter_direction = if REVERSE { "reverse" } else { "forward" }, "storage_iter_with_prefix" ); @@ -1475,15 +1524,27 @@ where let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode); - Ok(deserialize_keyed_row_stream( - self.iter_kv( - memcomparable_range_with_vnode, - prefix_hint, - prefetch_options, - ) - .await?, - &self.row_serde, - )) + Ok(if REVERSE { + futures::future::Either::Left(deserialize_keyed_row_stream( + self.rev_iter_kv( + memcomparable_range_with_vnode, + prefix_hint, + prefetch_options, + ) + .await?, + &self.row_serde, + )) + } else { + futures::future::Either::Right(deserialize_keyed_row_stream( + self.iter_kv( + memcomparable_range_with_vnode, + prefix_hint, + prefetch_options, + ) + .await?, + &self.row_serde, + )) + }) } /// This function scans raw key-values from the relational table with specific `pk_range` under @@ -1592,13 +1653,10 @@ where } } -pub type KeyedRowStream<'a, S: StateStore, SD: ValueRowSerde + 'a> = - impl Stream>> + 'a; - fn deserialize_keyed_row_stream<'a>( iter: impl StateStoreIter + 'a, deserializer: &'a impl ValueRowSerde, -) -> impl Stream>> + 'a { +) -> impl KeyedRowStream<'a> { iter.into_stream(move |(key, value)| { Ok(KeyedRow::new( // TODO: may avoid clone the key when key is not needed diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index a00f86aff9082..f81ba9d89a50e 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -15,7 +15,7 @@ //! Types and functions that store or manipulate state/cache inside one single over window //! partition. -use std::collections::{BTreeMap, VecDeque}; +use std::collections::BTreeMap; use std::marker::PhantomData; use std::ops::{Bound, RangeInclusive}; @@ -861,37 +861,14 @@ impl<'a, S: StateStore> OverPartition<'a, S> { .await?; } - // TODO(rc): Uncomment the following to enable prefetching rows before the start of the - // range once we have STATE TABLE REVERSE ITERATOR. - // self.extend_cache_leftward_by_n(table, range.start()).await?; + // prefetch rows before the start of the range + self.extend_cache_leftward_by_n(table, range.start()) + .await?; // prefetch rows after the end of the range self.extend_cache_rightward_by_n(table, range.end()).await } - async fn extend_cache_by_range_inner( - &mut self, - table: &StateTable, - table_sub_range: (Bound, Bound), - ) -> StreamExecutorResult<()> { - let stream = table - .iter_with_prefix( - self.this_partition_key, - &table_sub_range, - PrefetchOptions::default(), - ) - .await?; - - #[for_await] - for row in stream { - let row: OwnedRow = row?.into_owned_row(); - let key = self.row_conv.row_to_state_key(&row)?; - self.range_cache.insert(CacheKey::from(key), row); - } - - Ok(()) - } - async fn extend_cache_leftward_by_n( &mut self, table: &StateTable, @@ -937,55 +914,6 @@ impl<'a, S: StateStore> OverPartition<'a, S> { Ok(()) } - async fn extend_cache_leftward_by_n_inner( - &mut self, - table: &StateTable, - range_to_exclusive: &StateKey, - ) -> StreamExecutorResult<()> { - let mut to_extend: VecDeque = VecDeque::with_capacity(MAGIC_BATCH_SIZE); - { - let sub_range = ( - Bound::::Unbounded, - Bound::Excluded( - self.row_conv - .state_key_to_table_sub_pk(range_to_exclusive)?, - ), - ); - let stream = table - .iter_with_prefix( - self.this_partition_key, - &sub_range, - PrefetchOptions::default(), - ) - .await?; - - #[for_await] - for row in stream { - let row: OwnedRow = row?.into_owned_row(); - - // For leftward extension, we now must iterate the table in order from the beginning - // of this partition and fill only the last n rows to the cache. - // TODO(rc): WE NEED STATE TABLE REVERSE ITERATOR!! - if to_extend.len() == MAGIC_BATCH_SIZE { - to_extend.pop_front(); - } - to_extend.push_back(row); - } - } - - let n_extended = to_extend.len(); - for row in to_extend { - let key = self.row_conv.row_to_state_key(&row)?; - self.range_cache.insert(CacheKey::from(key), row); - } - if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 { - // we reached the beginning of this partition in the table - self.range_cache.remove(&CacheKey::Smallest); - } - - Ok(()) - } - async fn extend_cache_rightward_by_n( &mut self, table: &StateTable, @@ -1031,6 +959,73 @@ impl<'a, S: StateStore> OverPartition<'a, S> { Ok(()) } + async fn extend_cache_by_range_inner( + &mut self, + table: &StateTable, + table_sub_range: (Bound, Bound), + ) -> StreamExecutorResult<()> { + let stream = table + .iter_with_prefix( + self.this_partition_key, + &table_sub_range, + PrefetchOptions::default(), + ) + .await?; + + #[for_await] + for row in stream { + let row: OwnedRow = row?.into_owned_row(); + let key = self.row_conv.row_to_state_key(&row)?; + self.range_cache.insert(CacheKey::from(key), row); + } + + Ok(()) + } + + async fn extend_cache_leftward_by_n_inner( + &mut self, + table: &StateTable, + range_to_exclusive: &StateKey, + ) -> StreamExecutorResult<()> { + let mut n_extended = 0usize; + { + let sub_range = ( + Bound::::Unbounded, + Bound::Excluded( + self.row_conv + .state_key_to_table_sub_pk(range_to_exclusive)?, + ), + ); + let rev_stream = table + .rev_iter_with_prefix( + self.this_partition_key, + &sub_range, + PrefetchOptions::default(), + ) + .await?; + + #[for_await] + for row in rev_stream { + let row: OwnedRow = row?.into_owned_row(); + + let key = self.row_conv.row_to_state_key(&row)?; + self.range_cache.insert(CacheKey::from(key), row); + + n_extended += 1; + if n_extended == MAGIC_BATCH_SIZE { + break; + } + } + } + + if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 { + // we reached the beginning of this partition in the table + self.range_cache.remove(&CacheKey::Smallest); + } + + Ok(()) + } + async fn extend_cache_rightward_by_n_inner( &mut self, table: &StateTable, From 7ed08b002282ca98e5b40392824ee10b26428f8e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 24 Jun 2024 06:52:55 +0000 Subject: [PATCH 3/9] chore(deps): Bump the aws group across 1 directory with 2 updates (#17247) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cbbb3db4da6a0..1e0d22c5867cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1690,9 +1690,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.6.1" +version = "1.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b7d790d553d163c7d80a4e06e2906bf24b9172c9ebe045fc3a274e9358ab7bb" +checksum = "4179bd8a1c943e1aceb46c5b9fc014a561bd6c35a2153e816ba29076ee49d245" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -1707,9 +1707,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.1.10" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b6764ba7e1c5ede1c9f9e4046645534f06c2581402461c559b481a420330a83" +checksum = "cfe321a6b21f5d8eabd0ade9c55d3d0335f3c3157fc2b3e87f05f34b539e4df5" dependencies = [ "base64-simd 0.8.0", "bytes", @@ -9795,7 +9795,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.12.1", + "parking_lot 0.11.2", "portable-atomic", "pyo3-build-config", "pyo3-ffi", From 9cbb8c27accf16043ab4df8df5e51f2b60d26c7a Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Mon, 24 Jun 2024 15:33:03 +0800 Subject: [PATCH 4/9] feat(connector): upgrade opendal to 0.47 for connector crate (#17406) --- Cargo.lock | 1 - src/connector/Cargo.toml | 10 +++++++--- src/connector/src/sink/iceberg/mock_catalog.rs | 4 ++-- .../source/filesystem/opendal_source/opendal_reader.rs | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e0d22c5867cb..9fc9ca7f523eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11021,7 +11021,6 @@ dependencies = [ "mysql_common", "nexmark", "num-bigint", - "opendal 0.45.1", "opendal 0.47.0", "openssl", "parking_lot 0.12.1", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 2740a3a8754a4..17a00eb81735f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -64,8 +64,6 @@ google-cloud-gax = "0.17.0" google-cloud-googleapis = { version = "0.13", features = ["pubsub", "bigquery"] } google-cloud-pubsub = "0.25" http = "0.2" -# TODO: remove this once the opendal version is updated to 0.47 -iceberg_opendal = { package = "opendal", version = "0.47" } icelake = { workspace = true } indexmap = { version = "2.2.6", features = ["serde"] } itertools = { workspace = true } @@ -83,7 +81,13 @@ mysql_common = { version = "0.32", default-features = false, features = [ ] } nexmark = { version = "0.2", features = ["serde"] } num-bigint = "0.4" -opendal = "0.45" +opendal = { version = "0.47", features = [ + "executors-tokio", + "services-fs", + "services-gcs", + "services-memory", + "services-s3", +] } openssl = "0.10" parking_lot = { workspace = true } paste = "1" diff --git a/src/connector/src/sink/iceberg/mock_catalog.rs b/src/connector/src/sink/iceberg/mock_catalog.rs index f61fb0b096bc7..f9d60965b1d30 100644 --- a/src/connector/src/sink/iceberg/mock_catalog.rs +++ b/src/connector/src/sink/iceberg/mock_catalog.rs @@ -16,11 +16,11 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; -use iceberg_opendal::services::Memory; -use iceberg_opendal::Operator; use icelake::catalog::{Catalog, UpdateTable}; use icelake::types::{Field, PartitionField, Schema, Struct, TableMetadata}; use icelake::{Table, TableIdentifier}; +use opendal::services::Memory; +use opendal::Operator; /// A mock catalog for iceberg used for plan test. pub struct MockCatalog; diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 07765df40a383..8d1085a094a12 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -109,7 +109,7 @@ impl OpendalReader { let object_name = split.name.clone(); let reader = op - .reader_with(&object_name) + .read_with(&object_name) .range(split.offset as u64..) .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. .await?; From 77433bd02df326dda374e5cf460fb4bd296a8a42 Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Mon, 24 Jun 2024 16:29:49 +0800 Subject: [PATCH 5/9] fix: append only table with connector having retention seconds (#17387) --- e2e_test/source/basic/ttl_table_with_con.slt | 32 ++++++++++++++++++++ src/frontend/src/handler/create_table.rs | 9 +----- src/frontend/src/handler/create_table_as.rs | 7 ----- src/frontend/src/utils/with_options.rs | 5 ++- 4 files changed, 37 insertions(+), 16 deletions(-) create mode 100644 e2e_test/source/basic/ttl_table_with_con.slt diff --git a/e2e_test/source/basic/ttl_table_with_con.slt b/e2e_test/source/basic/ttl_table_with_con.slt new file mode 100644 index 0000000000000..6a232bbdee2a2 --- /dev/null +++ b/e2e_test/source/basic/ttl_table_with_con.slt @@ -0,0 +1,32 @@ +statement ok +create table t (v1 int, v2 varchar) APPEND ONLY with ( + connector = 'kafka', + topic = 'kafka_1_partition_topic', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest', + retention_seconds = 5 +) FORMAT PLAIN ENCODE JSON; + +statement ok +flush; + +# Wait enough time to ensure SourceExecutor consumes all Kafka data. +sleep 1s + +query IT rowsort +select * from t +---- +1 1 +2 22 +3 333 +4 4444 + +statement ok +select pg_sleep(10); + +query I +select * from t; +---- + +statement ok +drop table t; \ No newline at end of file diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index de81115e628f1..8d32382346c51 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -536,14 +536,12 @@ pub(crate) fn gen_create_table_plan( for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) } - let with_properties = context.with_options().inner().clone(); gen_create_table_plan_without_source( context, table_name, columns, column_defs, constraints, - with_properties, definition, source_watermarks, append_only, @@ -560,7 +558,6 @@ pub(crate) fn gen_create_table_plan_without_source( columns: Vec, column_defs: Vec, constraints: Vec, - with_properties: BTreeMap, definition: String, source_watermarks: Vec, append_only: bool, @@ -598,7 +595,6 @@ pub(crate) fn gen_create_table_plan_without_source( context.into(), name, columns, - with_properties, pk_column_ids, row_id_index, definition, @@ -629,7 +625,6 @@ fn gen_table_plan_with_source( context, source_catalog.name, source_catalog.columns, - source_catalog.with_properties.clone(), source_catalog.pk_col_ids, source_catalog.row_id_index, source_catalog.definition, @@ -649,7 +644,6 @@ fn gen_table_plan_inner( context: OptimizerContextRef, table_name: String, columns: Vec, - with_properties: BTreeMap, pk_column_ids: Vec, row_id_index: Option, definition: String, @@ -664,8 +658,7 @@ fn gen_table_plan_inner( schema_id: SchemaId, ) -> Result<(PlanRef, PbTable)> { let session = context.session_ctx().clone(); - let with_properties = WithOptions::new(with_properties); - let retention_seconds = with_properties.retention_seconds(); + let retention_seconds = context.with_options().retention_seconds(); let is_external_source = source_catalog.is_some(); let source_node: PlanRef = LogicalSource::new( source_catalog.map(|source| Rc::new(source.clone())), diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index bdd6e9d078e90..9a01d2919086e 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -90,19 +90,12 @@ pub async fn handle_create_as( let (graph, source, table) = { let context = OptimizerContext::from_handler_args(handler_args.clone()); - let properties = handler_args - .with_options - .inner() - .clone() - .into_iter() - .collect(); let (plan, table) = gen_create_table_plan_without_source( context, table_name.clone(), columns, vec![], vec![], - properties, "".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` vec![], // No watermark should be defined in for `CREATE TABLE AS` append_only, diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 92c65786afdeb..a6984d687bc74 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -82,7 +82,10 @@ impl WithOptions { pub fn into_connector_props(self) -> BTreeMap { self.inner .into_iter() - .filter(|(key, _)| key != OverwriteOptions::STREAMING_RATE_LIMIT_KEY) + .filter(|(key, _)| { + key != OverwriteOptions::STREAMING_RATE_LIMIT_KEY + && key != options::RETENTION_SECONDS + }) .collect() } From 8fe655f91fb26b083394a559689ae11a6f47dc1b Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 24 Jun 2024 17:28:32 +0800 Subject: [PATCH 6/9] refactor: minor refactor on source column desc (#17417) --- src/common/src/catalog/column.rs | 14 + .../src/parser/additional_columns.rs | 252 ++++++++---------- src/connector/src/parser/mod.rs | 1 + src/connector/src/source/manager.rs | 79 ++++-- src/connector/src/source/reader/desc.rs | 40 +-- src/frontend/src/handler/create_source.rs | 13 +- .../src/optimizer/plan_node/stream_source.rs | 6 +- .../optimizer/plan_node/stream_source_scan.rs | 7 +- 8 files changed, 206 insertions(+), 206 deletions(-) diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index bb6c8b7a39903..b3065defea2a2 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -324,6 +324,20 @@ pub struct ColumnCatalog { } impl ColumnCatalog { + pub fn visible(column_desc: ColumnDesc) -> Self { + Self { + column_desc, + is_hidden: false, + } + } + + pub fn hidden(column_desc: ColumnDesc) -> Self { + Self { + column_desc, + is_hidden: true, + } + } + /// Get the column catalog's is hidden. pub fn is_hidden(&self) -> bool { self.is_hidden diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index f50429f716073..54b1120d06438 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -114,7 +114,7 @@ pub fn gen_default_addition_col_name( }) } -pub fn build_additional_column_catalog( +pub fn build_additional_column_desc( column_id: ColumnId, connector_name: &str, additional_col_type: &str, @@ -123,7 +123,7 @@ pub fn build_additional_column_catalog( data_type: Option<&str>, reject_unknown_connector: bool, is_cdc_backfill_table: bool, -) -> ConnectorResult { +) -> ConnectorResult { let compatible_columns = match ( get_supported_additional_columns(connector_name, is_cdc_backfill_table), reject_unknown_connector, @@ -154,120 +154,94 @@ pub fn build_additional_column_catalog( ) }); - let catalog = match additional_col_type { - "key" => ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - column_name, - column_id, - DataType::Bytea, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})), - }, - ), - is_hidden: false, - }, - "timestamp" => ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - column_name, - column_id, - DataType::Timestamptz, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Timestamp( - AdditionalColumnTimestamp {}, - )), - }, - ), - is_hidden: false, - }, - "partition" => ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - column_name, - column_id, - DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Partition( - AdditionalColumnPartition {}, - )), - }, - ), - is_hidden: false, - }, - "offset" => ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - column_name, - column_id, - DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})), - }, - ), - is_hidden: false, - }, - "file" => ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - column_name, - column_id, - DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})), - }, - ), - is_hidden: false, - }, - "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type), - "database_name" => ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - column_name, - column_id, - DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::DatabaseName( - AdditionalDatabaseName {}, - )), - }, - ), - is_hidden: false, - }, - "schema_name" => ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - column_name, - column_id, - DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})), - }, - ), - is_hidden: false, - }, + let col_desc = match additional_col_type { + "key" => ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Bytea, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})), + }, + ), - "table_name" => ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - column_name, - column_id, - DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})), - }, - ), - is_hidden: false, - }, - "collection_name" => ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - column_name, - column_id, - DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::CollectionName( - AdditionalCollectionName {}, - )), - }, - ), - is_hidden: false, - }, + "timestamp" => ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Timestamptz, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Timestamp( + AdditionalColumnTimestamp {}, + )), + }, + ), + "partition" => ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Partition( + AdditionalColumnPartition {}, + )), + }, + ), + "offset" => ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})), + }, + ), + + "file" => ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})), + }, + ), + "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type), + "database_name" => ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::DatabaseName( + AdditionalDatabaseName {}, + )), + }, + ), + "schema_name" => ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})), + }, + ), + "table_name" => ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})), + }, + ), + "collection_name" => ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::CollectionName( + AdditionalCollectionName {}, + )), + }, + ), _ => unreachable!(), }; - Ok(catalog) + Ok(col_desc) } /// Utility function for adding partition and offset columns to the columns, if not specified by the user. @@ -278,7 +252,7 @@ pub fn build_additional_column_catalog( pub fn source_add_partition_offset_cols( columns: &[ColumnCatalog], connector_name: &str, -) -> ([bool; 2], [ColumnCatalog; 2]) { +) -> ([bool; 2], [ColumnDesc; 2]) { let mut columns_exist = [false; 2]; let mut last_column_id = max_column_id(columns); @@ -292,7 +266,7 @@ pub fn source_add_partition_offset_cols( last_column_id = last_column_id.next(); if compat_col_types.contains(col_type) { Some( - build_additional_column_catalog( + build_additional_column_desc( last_column_id, connector_name, col_type, @@ -313,13 +287,13 @@ pub fn source_add_partition_offset_cols( assert_eq!(additional_columns.len(), 2); use risingwave_pb::plan_common::additional_column::ColumnType; assert_matches::assert_matches!( - additional_columns[0].column_desc.additional_column, + additional_columns[0].additional_column, AdditionalColumn { column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)), } ); assert_matches::assert_matches!( - additional_columns[1].column_desc.additional_column, + additional_columns[1].additional_column, AdditionalColumn { column_type: Some(ColumnType::Offset(_)), } @@ -350,7 +324,7 @@ fn build_header_catalog( col_name: &str, inner_field_name: Option<&str>, data_type: Option<&str>, -) -> ColumnCatalog { +) -> ColumnDesc { if let Some(inner) = inner_field_name { let (data_type, pb_data_type) = { if let Some(type_name) = data_type { @@ -381,32 +355,26 @@ fn build_header_catalog( ) } }; - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - col_name, - column_id, - data_type, - AdditionalColumn { - column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader { - inner_field: inner.to_string(), - data_type: Some(pb_data_type), - })), - }, - ), - is_hidden: false, - } + ColumnDesc::named_with_additional_column( + col_name, + column_id, + data_type, + AdditionalColumn { + column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader { + inner_field: inner.to_string(), + data_type: Some(pb_data_type), + })), + }, + ) } else { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - col_name, - column_id, - DataType::List(get_kafka_header_item_datatype().into()), - AdditionalColumn { - column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})), - }, - ), - is_hidden: false, - } + ColumnDesc::named_with_additional_column( + col_name, + column_id, + DataType::List(get_kafka_header_item_datatype().into()), + AdditionalColumn { + column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})), + }, + ) } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 26cf746b535dc..c11c833457a99 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1043,6 +1043,7 @@ impl ParserConfig { #[derive(Debug, Clone, Default)] pub struct CommonParserConfig { + /// Note: this is created by `SourceDescBuilder::builder` pub rw_columns: Vec, } diff --git a/src/connector/src/source/manager.rs b/src/connector/src/source/manager.rs index a5584f6af83d5..731d0c4ff8ae8 100644 --- a/src/connector/src/source/manager.rs +++ b/src/connector/src/source/manager.rs @@ -21,26 +21,28 @@ use risingwave_common::catalog::{ use risingwave_common::types::DataType; use risingwave_pb::plan_common::{AdditionalColumn, ColumnDescVersion}; -/// `SourceColumnDesc` is used to describe a column in the Source and is used as the column -/// counterpart in `StreamScan` +/// `SourceColumnDesc` is used to describe a column in the Source. +/// +/// See the implementation of `From<&ColumnDesc>` for the difference between `SourceColumnDesc` and [`ColumnDesc`]. #[derive(Clone, Debug)] pub struct SourceColumnDesc { pub name: String, pub data_type: DataType, pub column_id: ColumnId, pub fields: Vec, + /// `additional_column` and `column_type` are orthogonal + /// `additional_column` is used to indicate the column is from which part of the message + /// `column_type` is used to indicate the type of the column, only used in cdc scenario + pub additional_column: AdditionalColumn, + // ------ + // Fields above are the same in `ColumnDesc`. + // Fields below are specific to `SourceColumnDesc`. + // ------ pub column_type: SourceColumnType, - /// `is_pk` is used to indicate whether the column is part of the primary key columns. pub is_pk: bool, - /// `is_hidden_addition_col` is used to indicate whether the column is a hidden addition column. pub is_hidden_addition_col: bool, - - /// `additional_column` and `column_type` are orthogonal - /// `additional_column` is used to indicate the column is from which part of the message - /// `column_type` is used to indicate the type of the column, only used in cdc scenario - pub additional_column: AdditionalColumn, } /// `SourceColumnType` is used to indicate the type of a column emitted by the Source. @@ -121,32 +123,63 @@ impl SourceColumnDesc { } impl From<&ColumnDesc> for SourceColumnDesc { - fn from(c: &ColumnDesc) -> Self { - let column_type = SourceColumnType::from_name(c.name.as_str()); + fn from( + ColumnDesc { + data_type, + column_id, + name, + field_descs, + additional_column, + // ignored fields below + generated_or_default_column, + type_name: _, + description: _, + version: _, + }: &ColumnDesc, + ) -> Self { + debug_assert!( + generated_or_default_column.is_none(), + "source column should not be generated or default: {:?}", + generated_or_default_column.as_ref().unwrap() + ); Self { - name: c.name.clone(), - data_type: c.data_type.clone(), - column_id: c.column_id, - fields: c.field_descs.clone(), - column_type, + name: name.clone(), + data_type: data_type.clone(), + column_id: *column_id, + fields: field_descs.clone(), + additional_column: additional_column.clone(), + // additional fields below + column_type: SourceColumnType::from_name(name), is_pk: false, is_hidden_addition_col: false, - additional_column: c.additional_column.clone(), } } } impl From<&SourceColumnDesc> for ColumnDesc { - fn from(s: &SourceColumnDesc) -> Self { + fn from( + SourceColumnDesc { + name, + data_type, + column_id, + fields, + additional_column, + // ignored fields below + column_type: _, + is_pk: _, + is_hidden_addition_col: _, + }: &SourceColumnDesc, + ) -> Self { ColumnDesc { - data_type: s.data_type.clone(), - column_id: s.column_id, - name: s.name.clone(), - field_descs: s.fields.clone(), + data_type: data_type.clone(), + column_id: *column_id, + name: name.clone(), + field_descs: fields.clone(), + additional_column: additional_column.clone(), + // additional fields below type_name: "".to_string(), generated_or_default_column: None, description: None, - additional_column: s.additional_column.clone(), version: ColumnDescVersion::Pr13707, } } diff --git a/src/connector/src/source/reader/desc.rs b/src/connector/src/source/reader/desc.rs index d9076e6a78fa9..44d2effe51606 100644 --- a/src/connector/src/source/reader/desc.rs +++ b/src/connector/src/source/reader/desc.rs @@ -15,9 +15,8 @@ use std::collections::BTreeMap; use std::sync::Arc; -use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::catalog::{ColumnCatalog, ColumnDesc}; +use risingwave_common::catalog::ColumnCatalog; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::PbStreamSourceInfo; use risingwave_pb::plan_common::PbColumnCatalog; @@ -53,7 +52,7 @@ pub struct FsSourceDesc { #[derive(Clone)] pub struct SourceDescBuilder { - columns: Vec, + columns: Vec, metrics: Arc, row_id_index: Option, with_properties: BTreeMap, @@ -63,7 +62,6 @@ pub struct SourceDescBuilder { } impl SourceDescBuilder { - #[allow(clippy::too_many_arguments)] pub fn new( columns: Vec, metrics: Arc, @@ -74,7 +72,7 @@ impl SourceDescBuilder { pk_indices: Vec, ) -> Self { Self { - columns, + columns: columns.into_iter().map(ColumnCatalog::from).collect(), metrics, row_id_index, with_properties, @@ -92,25 +90,18 @@ impl SourceDescBuilder { .get(UPSTREAM_SOURCE_KEY) .map(|s| s.to_lowercase()) .unwrap(); - let columns = self - .columns - .iter() - .map(|c| ColumnCatalog::from(c.clone())) - .collect_vec(); let (columns_exist, additional_columns) = - source_add_partition_offset_cols(&columns, &connector_name); + source_add_partition_offset_cols(&self.columns, &connector_name); let mut columns: Vec<_> = self .columns .iter() - .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap()))) + .map(|c| SourceColumnDesc::from(&c.column_desc)) .collect(); for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) { if !existed { - columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc( - &c.column_desc, - )); + columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc(c)); } } @@ -184,9 +175,8 @@ impl SourceDescBuilder { pub mod test_utils { use std::collections::BTreeMap; - use risingwave_common::catalog::{ColumnDesc, Schema}; + use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema}; use risingwave_pb::catalog::StreamSourceInfo; - use risingwave_pb::plan_common::ColumnCatalog; use super::{SourceDescBuilder, DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE}; @@ -201,16 +191,12 @@ pub mod test_utils { .fields .iter() .enumerate() - .map(|(i, f)| ColumnCatalog { - column_desc: Some( - ColumnDesc::named( - f.name.clone(), - (i as i32).into(), // use column index as column id - f.data_type.clone(), - ) - .to_protobuf(), - ), - is_hidden: false, + .map(|(i, f)| { + ColumnCatalog::visible(ColumnDesc::named( + f.name.clone(), + (i as i32).into(), // use column index as column id + f.data_type.clone(), + )) }) .collect(); SourceDescBuilder { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 4c5a8abdec005..a271e9b48f617 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -29,7 +29,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::types::DataType; use risingwave_connector::parser::additional_columns::{ - build_additional_column_catalog, get_supported_additional_columns, + build_additional_column_desc, get_supported_additional_columns, }; use risingwave_connector::parser::{ fetch_json_schema_and_map_to_columns, AvroParserConfig, DebeziumAvroParserConfig, @@ -616,7 +616,7 @@ pub fn handle_addition_columns( let data_type_name: Option = item .header_inner_expect_type .map(|dt| format!("{:?}", dt).to_lowercase()); - columns.push(build_additional_column_catalog( + let col = build_additional_column_desc( latest_col_id.next(), connector_name.as_str(), item.column_type.real_value().as_str(), @@ -625,7 +625,8 @@ pub fn handle_addition_columns( data_type_name.as_deref(), true, is_cdc_backfill_table, - )?); + )?; + columns.push(ColumnCatalog::visible(col)); } Ok(()) @@ -945,7 +946,7 @@ fn check_and_add_timestamp_column(with_properties: &WithOptions, columns: &mut V } // add a hidden column `_rw_kafka_timestamp` to each message from Kafka source - let mut catalog = build_additional_column_catalog( + let col = build_additional_column_desc( ColumnId::placeholder(), KAFKA_CONNECTOR, "timestamp", @@ -956,9 +957,7 @@ fn check_and_add_timestamp_column(with_properties: &WithOptions, columns: &mut V false, ) .unwrap(); - catalog.is_hidden = true; - - columns.push(catalog); + columns.push(ColumnCatalog::hidden(col)); } } diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 865d444f8c154..40dbbc469f52e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -17,6 +17,7 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::catalog::ColumnCatalog; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::parser::additional_columns::source_add_partition_offset_cols; use risingwave_pb::stream_plan::stream_node::PbNodeBody; @@ -50,10 +51,9 @@ impl StreamSource { &core.column_catalog, &source_catalog.connector_name(), ); - for (existed, mut c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { - c.is_hidden = true; + for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { if !existed { - core.column_catalog.push(c); + core.column_catalog.push(ColumnCatalog::hidden(c)); } } } diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index 02f794fe55a9f..6d1d75d8e46aa 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -17,7 +17,7 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::Field; +use risingwave_common::catalog::{ColumnCatalog, Field}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::OrderType; @@ -62,10 +62,9 @@ impl StreamSourceScan { &core.column_catalog, &source_catalog.connector_name(), ); - for (existed, mut c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { - c.is_hidden = true; + for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { if !existed { - core.column_catalog.push(c); + core.column_catalog.push(ColumnCatalog::hidden(c)); } } } From 613fb51132a4772d5f7ce1a321ad60d0092bb081 Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 24 Jun 2024 17:30:46 +0800 Subject: [PATCH 7/9] fix(optimizer): fix apply topn transpose rule (#17386) --- .../batch/subquery/lateral_subquery.slt.part | 29 +++++++++++++++++++ .../testdata/input/lateral_subquery.yaml | 15 ++++++++++ .../testdata/output/lateral_subquery.yaml | 28 ++++++++++++++++++ .../rule/apply_topn_transpose_rule.rs | 4 ++- 4 files changed, 75 insertions(+), 1 deletion(-) diff --git a/e2e_test/batch/subquery/lateral_subquery.slt.part b/e2e_test/batch/subquery/lateral_subquery.slt.part index 04e93a3d397ce..98077487828e4 100644 --- a/e2e_test/batch/subquery/lateral_subquery.slt.part +++ b/e2e_test/batch/subquery/lateral_subquery.slt.part @@ -82,3 +82,32 @@ drop table all_sales; statement ok drop table salesperson; +statement ok +CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + +statement ok +INSERT INTO r VALUES +('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 2, 2), +('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 1, 3), +('2024-06-20T19:00:23Z'::TIMESTAMPTZ, 1, 2), +('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 2, 1), +('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 1, 2), +('2024-06-20T19:00:25Z'::TIMESTAMPTZ, 2, 1); + +query TII rowsort +SELECT e.ts AS e_ts, d.* +FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e +JOIN LATERAL +( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC +)d on true; +---- +2024-06-20 19:01:00+00:00 2024-06-20 19:00:22+00:00 1 3 +2024-06-20 19:01:00+00:00 2024-06-20 19:00:24+00:00 1 2 + +statement ok +DROP TABLE r CASCADE; diff --git a/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml index 869bbdf6d7136..8b9126f18d641 100644 --- a/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml @@ -119,3 +119,18 @@ ) AS b ON TRUE; expected_outputs: - stream_plan +- name: https://github.com/risingwavelabs/risingwave/issues/17382 + sql: | + CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + SELECT e.ts AS e_ts, d.* + FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e + JOIN LATERAL + ( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC + )d on true; + expected_outputs: + - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml index 39639b3ebb647..815890d6a73b8 100644 --- a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml @@ -242,3 +242,31 @@ └─StreamExchange { dist: HashShard(t1.c1, $expr3) } └─StreamProject { exprs: [t1.c1, t1.c2, t1.c3, AtTimeZone(t1.c4, 'UTC':Varchar) as $expr2, t1.c4::Date as $expr3, t1._row_id] } └─StreamTableScan { table: t1, columns: [t1.c1, t1.c2, t1.c3, t1.c4, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } +- name: https://github.com/risingwavelabs/risingwave/issues/17382 + sql: | + CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + SELECT e.ts AS e_ts, d.* + FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e + JOIN LATERAL + ( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC + )d on true; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: Inner, predicate: 1:Int32 IS NOT DISTINCT FROM 1:Int32 AND '2024-06-20 19:01:00+00:00':Timestamptz IS NOT DISTINCT FROM '2024-06-20 19:01:00+00:00':Timestamptz, output: ['2024-06-20 19:01:00+00:00':Timestamptz, r.ts, r.src_id, r.dev_id] } + ├─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + │ └─BatchValues { rows: [['2024-06-20 19:01:00+00:00':Timestamptz, 1:Int32]] } + └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + └─BatchGroupTopN { order: [r.src_id ASC, r.dev_id ASC, r.ts DESC], limit: 1, offset: 0, group_key: [1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz, r.src_id, r.dev_id] } + └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz, r.src_id, r.dev_id) } + └─BatchHashJoin { type: Inner, predicate: 1:Int32 = r.src_id AND (r.ts <= '2024-06-20 19:01:00+00:00':Timestamptz), output: all } + ├─BatchExchange { order: [], dist: HashShard(1:Int32) } + │ └─BatchHashAgg { group_key: [1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + │ └─BatchValues { rows: [[1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz]] } + └─BatchExchange { order: [], dist: HashShard(r.src_id) } + └─BatchScan { table: r, columns: [r.ts, r.src_id, r.dev_id], distribution: SomeShard } diff --git a/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs index 9a6884b33e9a2..dbdb180b1358c 100644 --- a/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs @@ -49,7 +49,8 @@ impl Rule for ApplyTopNTransposeRule { apply.clone().decompose(); assert_eq!(join_type, JoinType::Inner); let topn: &LogicalTopN = right.as_logical_top_n()?; - let (topn_input, limit, offset, with_ties, mut order, group_key) = topn.clone().decompose(); + let (topn_input, limit, offset, with_ties, mut order, mut group_key) = + topn.clone().decompose(); let apply_left_len = left.schema().len(); @@ -73,6 +74,7 @@ impl Rule for ApplyTopNTransposeRule { .column_orders .iter_mut() .for_each(|ord| ord.column_index += apply_left_len); + group_key.iter_mut().for_each(|idx| *idx += apply_left_len); let new_group_key = (0..apply_left_len).chain(group_key).collect_vec(); LogicalTopN::new(new_apply, limit, offset, with_ties, order, new_group_key) }; From cdbd982d55613636672fe2f5524d819a9e992571 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Mon, 24 Jun 2024 19:00:42 +0800 Subject: [PATCH 8/9] refactor(meta): move `finish_streaming_job` into catalog manager (#17414) --- src/meta/src/manager/catalog/mod.rs | 61 +++++++++++++++++++++ src/meta/src/rpc/ddl_controller.rs | 83 ++--------------------------- 2 files changed, 64 insertions(+), 80 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 3aeab64bef128..9ba9333cee8e1 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1073,6 +1073,67 @@ impl CatalogManager { Ok(()) } + /// `finish_stream_job` finishes a stream job and clean some states. + pub async fn finish_stream_job( + &self, + mut stream_job: StreamingJob, + internal_tables: Vec, + ) -> MetaResult { + // 1. finish procedure. + let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec(); + + // Update the corresponding 'created_at' field. + stream_job.mark_created(); + + let version = match stream_job { + StreamingJob::MaterializedView(table) => { + creating_internal_table_ids.push(table.id); + self.finish_create_table_procedure(internal_tables, table) + .await? + } + StreamingJob::Sink(sink, target_table) => { + let sink_id = sink.id; + + let mut version = self + .finish_create_sink_procedure(internal_tables, sink) + .await?; + + if let Some((table, source)) = target_table { + version = self + .finish_replace_table_procedure(&source, &table, None, Some(sink_id), None) + .await?; + } + + version + } + StreamingJob::Table(source, table, ..) => { + creating_internal_table_ids.push(table.id); + if let Some(source) = source { + self.finish_create_table_procedure_with_source(source, table, internal_tables) + .await? + } else { + self.finish_create_table_procedure(internal_tables, table) + .await? + } + } + StreamingJob::Index(index, table) => { + creating_internal_table_ids.push(table.id); + self.finish_create_index_procedure(internal_tables, index, table) + .await? + } + StreamingJob::Source(source) => { + self.finish_create_source_procedure(source, internal_tables) + .await? + } + }; + + // 2. unmark creating tables. + self.unmark_creating_tables(&creating_internal_table_ids, false) + .await; + + Ok(version) + } + /// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`. pub async fn finish_create_table_procedure( &self, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index e1c6ebb0f32a8..cc5d7dd8971ad 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1317,8 +1317,9 @@ impl DdlController { }; tracing::debug!(id = job_id, "finishing stream job"); - let version = self - .finish_stream_job(mgr, stream_job, internal_tables) + let version = mgr + .catalog_manager + .finish_stream_job(stream_job, internal_tables) .await?; tracing::debug!(id = job_id, "finished stream job"); @@ -1757,84 +1758,6 @@ impl DdlController { Ok(()) } - /// `finish_stream_job` finishes a stream job and clean some states. - async fn finish_stream_job( - &self, - mgr: &MetadataManagerV1, - mut stream_job: StreamingJob, - internal_tables: Vec
, - ) -> MetaResult { - // 1. finish procedure. - let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec(); - - // Update the corresponding 'created_at' field. - stream_job.mark_created(); - - let version = match stream_job { - StreamingJob::MaterializedView(table) => { - creating_internal_table_ids.push(table.id); - mgr.catalog_manager - .finish_create_table_procedure(internal_tables, table) - .await? - } - StreamingJob::Sink(sink, target_table) => { - let sink_id = sink.id; - - let mut version = mgr - .catalog_manager - .finish_create_sink_procedure(internal_tables, sink) - .await?; - - if let Some((table, source)) = target_table { - let streaming_job = - StreamingJob::Table(source, table, TableJobType::Unspecified); - - version = self - .finish_replace_table( - mgr.catalog_manager.clone(), - &streaming_job, - None, - Some(sink_id), - None, - ) - .await?; - } - - version - } - StreamingJob::Table(source, table, ..) => { - creating_internal_table_ids.push(table.id); - if let Some(source) = source { - mgr.catalog_manager - .finish_create_table_procedure_with_source(source, table, internal_tables) - .await? - } else { - mgr.catalog_manager - .finish_create_table_procedure(internal_tables, table) - .await? - } - } - StreamingJob::Index(index, table) => { - creating_internal_table_ids.push(table.id); - mgr.catalog_manager - .finish_create_index_procedure(internal_tables, index, table) - .await? - } - StreamingJob::Source(source) => { - mgr.catalog_manager - .finish_create_source_procedure(source, internal_tables) - .await? - } - }; - - // 2. unmark creating tables. - mgr.catalog_manager - .unmark_creating_tables(&creating_internal_table_ids, false) - .await; - - Ok(version) - } - async fn drop_table_inner( &self, source_id: Option, From da9bd03524e0e5635ad06dfb4b0aa6110ba3dee8 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 24 Jun 2024 22:35:30 +0800 Subject: [PATCH 9/9] feat(expr): streaming `generate_series` ends with `now()` (#17371) Signed-off-by: Richard Chien --- ci/scripts/e2e-test-parallel.sh | 8 +- e2e_test/streaming/now.slt | 48 +++ proto/stream_plan.proto | 12 + src/common/src/array/stream_chunk_builder.rs | 5 + src/common/src/util/epoch.rs | 7 +- .../input/generate_series_with_now.yaml | 30 ++ .../tests/testdata/output/expr.yaml | 6 +- .../output/generate_series_with_now.yaml | 35 ++ src/frontend/src/binder/expr/function.rs | 8 +- src/frontend/src/expr/mod.rs | 2 +- src/frontend/src/expr/type_inference/func.rs | 54 ++- .../src/optimizer/logical_optimization.rs | 11 + .../src/optimizer/plan_node/convert.rs | 5 - .../src/optimizer/plan_node/generic/mod.rs | 2 + .../src/optimizer/plan_node/generic/now.rs | 106 +++++ .../src/optimizer/plan_node/logical_now.rs | 33 +- .../plan_node/logical_project_set.rs | 12 +- .../src/optimizer/plan_node/stream_now.rs | 47 +- .../plan_visitor/cardinality_visitor.rs | 8 +- src/frontend/src/optimizer/rule/mod.rs | 2 + .../stream/filter_with_now_to_join_rule.rs | 4 +- .../stream/generate_series_with_now_rule.rs | 86 ++++ src/frontend/src/optimizer/rule/stream/mod.rs | 1 + src/stream/src/executor/mod.rs | 2 +- src/stream/src/executor/now.rs | 402 +++++++++++++++--- src/stream/src/from_proto/now.rs | 40 +- src/stream/src/lib.rs | 1 + 27 files changed, 838 insertions(+), 139 deletions(-) create mode 100644 e2e_test/streaming/now.slt create mode 100644 src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml create mode 100644 src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml create mode 100644 src/frontend/src/optimizer/plan_node/generic/now.rs create mode 100644 src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs diff --git a/ci/scripts/e2e-test-parallel.sh b/ci/scripts/e2e-test-parallel.sh index 5f16a4c817871..c366ef07fc209 100755 --- a/ci/scripts/e2e-test-parallel.sh +++ b/ci/scripts/e2e-test-parallel.sh @@ -38,21 +38,21 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i echo "--- e2e, ci-3streaming-2serving-3fe, streaming" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" --label "parallel" kill_cluster echo "--- e2e, ci-3streaming-2serving-3fe, batch" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" -sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" --label "parallel" +sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel" kill_cluster echo "--- e2e, ci-3streaming-2serving-3fe, generated" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" --label "parallel" kill_cluster diff --git a/e2e_test/streaming/now.slt b/e2e_test/streaming/now.slt new file mode 100644 index 0000000000000..ad086f2202e7a --- /dev/null +++ b/e2e_test/streaming/now.slt @@ -0,0 +1,48 @@ +# In madsim test, we cannot spawn process. +skipif madsim +# In parallel test, we cannot get the DB name. +skipif parallel +# TODO: Later if we introduce a new `now()`-like function that returns the time of statement execution, +# we'll be able to directly create MV without `./risedev psql` and so that we can remove these `skipif`. +system ok +./risedev psql -c " +create materialized view mv as +select * from generate_series( + to_timestamp($(date +%s)) - interval '10 second', + now(), + interval '1 second' +); +" + +skipif madsim +skipif parallel +statement ok +flush; + +skipif madsim +skipif parallel +query I +select count(*) >= 10 from mv; +---- +t + +skipif madsim +skipif parallel +sleep 2s + +skipif madsim +skipif parallel +statement ok +flush; + +skipif madsim +skipif parallel +query I +select count(*) >= 12 from mv; +---- +t + +skipif madsim +skipif parallel +statement ok +drop materialized view mv; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 223a23813f68d..a43e34bde8df3 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -725,9 +725,21 @@ message RowIdGenNode { uint64 row_id_index = 1; } +message NowModeUpdateCurrent {} + +message NowModeGenerateSeries { + data.Datum start_timestamp = 1; + data.Datum interval = 2; +} + message NowNode { // Persists emitted 'now'. catalog.Table state_table = 1; + + oneof mode { + NowModeUpdateCurrent update_current = 101; + NowModeGenerateSeries generate_series = 102; + } } message ValuesNode { diff --git a/src/common/src/array/stream_chunk_builder.rs b/src/common/src/array/stream_chunk_builder.rs index a13cc3676792a..c44d313fffffe 100644 --- a/src/common/src/array/stream_chunk_builder.rs +++ b/src/common/src/array/stream_chunk_builder.rs @@ -104,6 +104,11 @@ impl StreamChunkBuilder { } } + /// Get the current number of rows in the builder. + pub fn size(&self) -> usize { + self.size + } + /// Append an iterator of output index and datum to the builder, return a chunk if the builder /// is full. /// diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index a067c689f669d..56dbdf6c54da9 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -174,10 +174,11 @@ impl EpochPair { Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST) } } -/// As most unit tests initializ a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0. + +/// As most unit tests initialize a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0. /// This method is to turn a a random epoch into a well shifted value. -pub const fn test_epoch(value: u64) -> u64 { - value << EPOCH_AVAILABLE_BITS +pub const fn test_epoch(value_millis: u64) -> u64 { + value_millis << EPOCH_AVAILABLE_BITS } /// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch. diff --git a/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml new file mode 100644 index 0000000000000..e121aba41ff6f --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml @@ -0,0 +1,30 @@ +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamptz, + now(), + interval '1 hour' + ); + expected_outputs: + - logical_plan + - optimized_logical_plan_for_stream + - stream_plan +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported + now(), + interval '1 hour' + ); + expected_outputs: + - binder_error +- sql: | + select * from generate_series( + now() - interval '1 hour', + now(), + interval '1 hour' + ); + expected_outputs: + - stream_error +- sql: | + select * from unnest(array[now(), now()]); + expected_outputs: + - stream_error diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml index 4ba572a54e600..f88f7c4d69b76 100644 --- a/src/frontend/planner_test/tests/testdata/output/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml @@ -543,7 +543,7 @@ Failed to bind expression: v1 >= now() Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: forbid now in select for stream sql: | create table t (v1 timestamp with time zone, v2 timestamp with time zone); @@ -552,7 +552,7 @@ Failed to bind expression: now() Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: forbid now in agg filter for stream sql: | create table t (v1 timestamp with time zone, v2 int); @@ -561,7 +561,7 @@ Failed to bind expression: sum(v2) FILTER(WHERE v1 >= now()) Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: typo pg_teminate_backend sql: | select pg_teminate_backend(1); diff --git a/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml new file mode 100644 index 0000000000000..4c8d71f987351 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml @@ -0,0 +1,35 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamptz, + now(), + interval '1 hour' + ); + logical_plan: |- + LogicalProject { exprs: [generate_series] } + └─LogicalTableFunction { table_function: GenerateSeries('2024-06-21 17:36:00':Varchar::Timestamptz, Now, '01:00:00':Interval) } + optimized_logical_plan_for_stream: 'LogicalNow { output: [ts] }' + stream_plan: |- + StreamMaterialize { columns: [generate_series], stream_key: [generate_series], pk_columns: [generate_series], pk_conflict: NoCheck, watermark_columns: [generate_series] } + └─StreamNow { output: [ts] } +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported + now(), + interval '1 hour' + ); + binder_error: function generate_series(timestamp without time zone, timestamp with time zone, interval) does not exist +- sql: | + select * from generate_series( + now() - interval '1 hour', + now(), + interval '1 hour' + ); + stream_error: |- + Not supported: General `now()` function in streaming queries + HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns. +- sql: | + select * from unnest(array[now(), now()]); + stream_error: |- + Not supported: General `now()` function in streaming queries + HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns. diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 09c05a29695dc..b9a3b27825abf 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1572,11 +1572,15 @@ impl Binder { if self.is_for_stream() && !matches!( self.context.clause, - Some(Clause::Where) | Some(Clause::Having) | Some(Clause::JoinOn) + Some(Clause::Where) + | Some(Clause::Having) + | Some(Clause::JoinOn) + | Some(Clause::From) ) { return Err(ErrorCode::InvalidInputSyntax(format!( - "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: {:?}. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information", + "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: {:?}. \ + Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information", self.context.clause )) .into()); diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 9dd5f7be1d53d..89142d0e9b237 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -413,7 +413,7 @@ macro_rules! impl_has_variant { }; } -impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction} +impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, Now} #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct InequalityInputPair { diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 9f28dfeb74c8c..0fecee8ab45c0 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -20,6 +20,7 @@ use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::AggKind; pub use risingwave_expr::sig::*; +use risingwave_pb::expr::table_function::PbType as PbTableFuncType; use super::{align_types, cast_ok_base, CastContext}; use crate::error::{ErrorCode, Result}; @@ -36,13 +37,24 @@ pub fn infer_type_with_sigmap( sig_map: &FunctionRegistry, ) -> Result { // special cases - if let FuncName::Scalar(func_type) = func_name - && let Some(res) = infer_type_for_special(func_type, inputs).transpose() - { - return res; - } - if let FuncName::Aggregate(AggKind::Grouping) = func_name { - return Ok(DataType::Int32); + match &func_name { + FuncName::Scalar(func_type) => { + if let Some(res) = infer_type_for_special(*func_type, inputs).transpose() { + return res; + } + } + FuncName::Table(func_type) => { + if let Some(res) = infer_type_for_special_table_function(*func_type, inputs).transpose() + { + return res; + } + } + FuncName::Aggregate(agg_kind) => { + if *agg_kind == AggKind::Grouping { + return Ok(DataType::Int32); + } + } + _ => {} } let actuals = inputs @@ -634,6 +646,34 @@ fn infer_type_for_special( } } +fn infer_type_for_special_table_function( + func_type: PbTableFuncType, + inputs: &mut [ExprImpl], +) -> Result> { + match func_type { + PbTableFuncType::GenerateSeries => { + if inputs.len() < 3 { + // let signature map handle this + return Ok(None); + } + match ( + inputs[0].return_type(), + inputs[1].return_type(), + inputs[2].return_type(), + ) { + (DataType::Timestamptz, DataType::Timestamptz, DataType::Interval) => { + // This is to allow `generate_series('2024-06-20 00:00:00'::timestamptz, now(), interval '1 day')`, + // which in streaming mode will be further converted to `StreamNow`. + Ok(Some(DataType::Timestamptz)) + } + // let signature map handle the rest + _ => Ok(None), + } + } + _ => Ok(None), + } +} + /// From all available functions in `sig_map`, find and return the best matching `FuncSign` for the /// provided `func_name` and `inputs`. This not only support exact function signature match, but can /// also match `substr(varchar, smallint)` or even `substr(varchar, unknown)` to `substr(varchar, diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index d452626bb9418..931a645b3d680 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -118,6 +118,14 @@ static DAG_TO_TREE: LazyLock = LazyLock::new(|| { ) }); +static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock = LazyLock::new(|| { + OptimizationStage::new( + "Convert GENERATE_SERIES Ends With NOW", + vec![GenerateSeriesWithNowRule::create()], + ApplyOrder::TopDown, + ) +}); + static TABLE_FUNCTION_TO_PROJECT_SET: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Table Function To Project Set", @@ -572,6 +580,9 @@ impl LogicalOptimizer { } plan = plan.optimize_by_rules(&SET_OPERATION_MERGE); plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN); + // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode. + // Should be applied before converting table function to project set. + plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW); // In order to unnest a table function, we need to convert it into a `project_set` first. plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_PROJECT_SET); diff --git a/src/frontend/src/optimizer/plan_node/convert.rs b/src/frontend/src/optimizer/plan_node/convert.rs index db961b3b1e20a..6f98073304d8b 100644 --- a/src/frontend/src/optimizer/plan_node/convert.rs +++ b/src/frontend/src/optimizer/plan_node/convert.rs @@ -84,11 +84,6 @@ pub fn stream_enforce_eowc_requirement( } Ok(StreamEowcSort::new(plan, watermark_col_idx).into()) } - } else if !emit_on_window_close && plan.emit_on_window_close() { - Err(ErrorCode::InternalError( - "Some bad thing happened, the generated plan is not correct.".to_string(), - ) - .into()) } else { Ok(plan) } diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 00db5730e8038..392f073371843 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -78,6 +78,8 @@ mod cte_ref; pub use cte_ref::*; mod recursive_union; pub use recursive_union::*; +mod now; +pub use now::*; pub trait DistillUnit { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a>; diff --git a/src/frontend/src/optimizer/plan_node/generic/now.rs b/src/frontend/src/optimizer/plan_node/generic/now.rs new file mode 100644 index 0000000000000..911217d064214 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/now.rs @@ -0,0 +1,106 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use educe::Educe; +use enum_as_inner::EnumAsInner; +use pretty_xmlish::{Pretty, Str, XmlNode}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::{DataType, Interval, Timestamptz}; + +use super::{DistillUnit, GenericPlanNode}; +use crate::optimizer::plan_node::utils::childless_record; +use crate::optimizer::property::FunctionalDependencySet; +use crate::OptimizerContextRef; + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct Now { + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, + + pub mode: Mode, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumAsInner)] +pub enum Mode { + /// Emit current timestamp on startup, update it on barrier. + UpdateCurrent, + /// Generate a series of timestamps starting from `start_timestamp` with `interval`. + /// Keep generating new timestamps on barrier. + GenerateSeries { + start_timestamp: Timestamptz, + interval: Interval, + }, +} + +impl GenericPlanNode for Now { + fn functional_dependency(&self) -> crate::optimizer::property::FunctionalDependencySet { + FunctionalDependencySet::new(1) // only one column and no dependency + } + + fn schema(&self) -> risingwave_common::catalog::Schema { + Schema::new(vec![Field { + data_type: DataType::Timestamptz, + name: String::from(if self.mode.is_update_current() { + "now" + } else { + "ts" + }), + sub_fields: vec![], + type_name: String::default(), + }]) + } + + fn stream_key(&self) -> Option> { + match self.mode { + Mode::UpdateCurrent => Some(vec![]), + Mode::GenerateSeries { .. } => Some(vec![0]), + } + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } +} + +impl Now { + pub fn update_current(ctx: OptimizerContextRef) -> Self { + Self::new_inner(ctx, Mode::UpdateCurrent) + } + + pub fn generate_series( + ctx: OptimizerContextRef, + start_timestamp: Timestamptz, + interval: Interval, + ) -> Self { + Self::new_inner( + ctx, + Mode::GenerateSeries { + start_timestamp, + interval, + }, + ) + } + + fn new_inner(ctx: OptimizerContextRef, mode: Mode) -> Self { + Self { ctx, mode } + } +} + +impl DistillUnit for Now { + fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a> { + childless_record(name, vec![("mode", Pretty::debug(&self.mode))]) + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index f9c33eb3d9cc1..ea34037c8977a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -14,10 +14,8 @@ use pretty_xmlish::XmlNode; use risingwave_common::bail; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; -use super::generic::GenericPlanRef; +use super::generic::{self, GenericPlanRef, Mode}; use super::utils::{childless_record, Distill}; use super::{ ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef, @@ -26,30 +24,25 @@ use super::{ use crate::error::Result; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::FunctionalDependencySet; use crate::utils::ColIndexMapping; -use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalNow { pub base: PlanBase, + core: generic::Now, } impl LogicalNow { - pub fn new(ctx: OptimizerContextRef) -> Self { - let schema = Schema::new(vec![Field { - data_type: DataType::Timestamptz, - name: String::from("now"), - sub_fields: vec![], - type_name: String::default(), - }]); - let base = PlanBase::new_logical( - ctx, - schema, - Some(vec![]), - FunctionalDependencySet::default(), - ); - Self { base } + pub fn new(core: generic::Now) -> Self { + let base = PlanBase::new_logical_with_core(&core); + Self { base, core } + } + + pub fn max_one_row(&self) -> bool { + match self.core.mode { + Mode::UpdateCurrent => true, + Mode::GenerateSeries { .. } => false, + } } } @@ -91,7 +84,7 @@ impl ToStream for LogicalNow { /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)` fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - Ok(StreamNow::new(self.clone(), self.ctx()).into()) + Ok(StreamNow::new(self.core.clone()).into()) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 6f31b35a21ee0..8f6966ece6c70 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -22,7 +22,7 @@ use super::{ LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamProjectSet, ToBatch, ToStream, }; -use crate::error::Result; +use crate::error::{ErrorCode, Result}; use crate::expr::{ collect_input_refs, Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef, TableFunction, @@ -400,6 +400,16 @@ impl ToStream for LogicalProjectSet { // TODO: implement to_stream_with_dist_required like LogicalProject fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { + if self.select_list().iter().any(|item| item.has_now()) { + // User may use `now()` in table function in a wrong way, because we allow `now()` in `FROM` clause. + return Err(ErrorCode::NotSupported( + "General `now()` function in streaming queries".to_string(), + "Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns." + .to_string(), + ) + .into()); + } + let new_input = self.input().to_stream(ctx)?; let mut new_logical = self.core.clone(); new_logical.input = new_input; diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index d27321c08d06b..22a0d2c5fb0fb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -14,46 +14,39 @@ use fixedbitset::FixedBitSet; use pretty_xmlish::XmlNode; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; +use risingwave_common::types::Datum; +use risingwave_common::util::value_encoding::DatumToProtoExt; +use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::NowNode; +use risingwave_pb::stream_plan::{PbNowModeGenerateSeries, PbNowModeUpdateCurrent, PbNowNode}; +use super::generic::Mode; use super::stream::prelude::*; use super::utils::{childless_record, Distill, TableCatalogBuilder}; -use super::{ExprRewritable, LogicalNow, PlanBase, StreamNode}; +use super::{generic, ExprRewritable, PlanBase, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::{Distribution, FunctionalDependencySet}; +use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamNow { pub base: PlanBase, + core: generic::Now, } impl StreamNow { - pub fn new(_logical: LogicalNow, ctx: OptimizerContextRef) -> Self { - let schema = Schema::new(vec![Field { - data_type: DataType::Timestamptz, - name: String::from("now"), - sub_fields: vec![], - type_name: String::default(), - }]); + pub fn new(core: generic::Now) -> Self { let mut watermark_columns = FixedBitSet::with_capacity(1); watermark_columns.set(0, true); - let base = PlanBase::new_stream( - ctx, - schema, - Some(vec![]), - FunctionalDependencySet::default(), + let base = PlanBase::new_stream_with_core( + &core, Distribution::Single, - false, - false, // TODO(rc): derive EOWC property from input + core.mode.is_generate_series(), // append only + core.mode.is_generate_series(), // emit on window close watermark_columns, ); - Self { base } + Self { base, core } } } @@ -83,8 +76,18 @@ impl StreamNode for StreamNow { let table_catalog = internal_table_catalog_builder .build(dist_keys, 0) .with_id(state.gen_table_id_wrapped()); - NodeBody::Now(NowNode { + NodeBody::Now(PbNowNode { state_table: Some(table_catalog.to_internal_table_prost()), + mode: Some(match &self.core.mode { + Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}), + Mode::GenerateSeries { + start_timestamp, + interval, + } => PbNowMode::GenerateSeries(PbNowModeGenerateSeries { + start_timestamp: Some(Datum::Some((*start_timestamp).into()).to_protobuf()), + interval: Some(Datum::Some((*interval).into()).to_protobuf()), + }), + }), }) } } diff --git a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs index c81bf539a5713..07459b59b1d5f 100644 --- a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs @@ -174,8 +174,12 @@ impl PlanVisitor for CardinalityVisitor { } } - fn visit_logical_now(&mut self, _plan: &plan_node::LogicalNow) -> Cardinality { - 1.into() + fn visit_logical_now(&mut self, plan: &plan_node::LogicalNow) -> Cardinality { + if plan.max_one_row() { + 1.into() + } else { + Cardinality::unknown() + } } fn visit_logical_expand(&mut self, plan: &plan_node::LogicalExpand) -> Cardinality { diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 9364a6f2b7f5b..fd06402c7497f 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -92,6 +92,7 @@ pub use top_n_on_index_rule::*; mod stream; pub use stream::bushy_tree_join_ordering_rule::*; pub use stream::filter_with_now_to_join_rule::*; +pub use stream::generate_series_with_now_rule::*; pub use stream::split_now_and_rule::*; pub use stream::split_now_or_rule::*; pub use stream::stream_project_merge_rule::*; @@ -203,6 +204,7 @@ macro_rules! for_all_rules { , { SplitNowAndRule } , { SplitNowOrRule } , { FilterWithNowToJoinRule } + , { GenerateSeriesWithNowRule } , { TopNOnIndexRule } , { TrivialProjectToValuesRule } , { UnionInputValuesMergeRule } diff --git a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs index 498696589c81b..cbdb65b4528a5 100644 --- a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs +++ b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs @@ -18,7 +18,7 @@ use risingwave_pb::plan_common::JoinType; use crate::expr::{ try_derive_watermark, ExprRewriter, FunctionCall, InputRef, WatermarkDerivation, }; -use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::generic::{self, GenericPlanRef}; use crate::optimizer::plan_node::{LogicalFilter, LogicalJoin, LogicalNow}; use crate::optimizer::rule::{BoxedRule, Rule}; use crate::optimizer::PlanRef; @@ -63,7 +63,7 @@ impl Rule for FilterWithNowToJoinRule { for now_filter in now_filters { new_plan = LogicalJoin::new( new_plan, - LogicalNow::new(plan.ctx()).into(), + LogicalNow::new(generic::Now::update_current(plan.ctx())).into(), JoinType::LeftSemi, Condition { conjunctions: vec![now_filter.into()], diff --git a/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs b/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs new file mode 100644 index 0000000000000..665967f6660b0 --- /dev/null +++ b/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs @@ -0,0 +1,86 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::DataType; +use risingwave_pb::expr::table_function::PbType as PbTableFuncType; + +use crate::expr::{Expr, ExprRewriter}; +use crate::optimizer::plan_node::{generic, LogicalNow}; +use crate::optimizer::rule::{BoxedRule, Rule}; +use crate::PlanRef; + +pub struct GenerateSeriesWithNowRule {} +impl Rule for GenerateSeriesWithNowRule { + fn apply(&self, plan: PlanRef) -> Option { + let ctx = plan.ctx(); + let table_func = plan.as_logical_table_function()?.table_function(); + + if !table_func.args.iter().any(|arg| arg.has_now()) { + return None; + } + + if !(table_func.function_type == PbTableFuncType::GenerateSeries + && table_func.args.len() == 3 + && table_func.args[0].return_type() == DataType::Timestamptz + && table_func.args[1].is_now() + && table_func.args[2].return_type() == DataType::Interval) + { + // only convert `generate_series(const timestamptz, now(), const interval)` + ctx.warn_to_user( + "`now()` is currently only supported in `generate_series(timestamptz, timestamptz, interval)` function as `stop`. \ + You may not using it correctly. Please kindly check the document." + ); + return None; + } + + let start_timestamp = ctx + .session_timezone() + .rewrite_expr(table_func.args[0].clone()) + .try_fold_const() + .transpose() + .ok() + .flatten() + .flatten(); + let interval = ctx + .session_timezone() + .rewrite_expr(table_func.args[2].clone()) + .try_fold_const() + .transpose() + .ok() + .flatten() + .flatten(); + + if start_timestamp.is_none() || interval.is_none() { + ctx.warn_to_user( + "When using `generate_series` with `now()`, the `start` and `step` must be non-NULL constants", + ); + return None; + } + + Some( + LogicalNow::new(generic::Now::generate_series( + ctx, + start_timestamp.unwrap().into_timestamptz(), + interval.unwrap().into_interval(), + )) + .into(), + ) + } +} + +impl GenerateSeriesWithNowRule { + pub fn create() -> BoxedRule { + Box::new(Self {}) + } +} diff --git a/src/frontend/src/optimizer/rule/stream/mod.rs b/src/frontend/src/optimizer/rule/stream/mod.rs index cc86298e766e8..539d9048cff60 100644 --- a/src/frontend/src/optimizer/rule/stream/mod.rs +++ b/src/frontend/src/optimizer/rule/stream/mod.rs @@ -14,6 +14,7 @@ pub(crate) mod bushy_tree_join_ordering_rule; pub(crate) mod filter_with_now_to_join_rule; +pub(crate) mod generate_series_with_now_rule; pub(crate) mod split_now_and_rule; pub(crate) mod split_now_or_rule; pub(crate) mod stream_project_merge_rule; diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 6d5c5867060de..a6a2152283668 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -129,7 +129,7 @@ pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; pub use mview::*; pub use no_op::NoOpExecutor; -pub use now::NowExecutor; +pub use now::*; pub use over_window::*; pub use project::ProjectExecutor; pub use project_set::*; diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 43316fef71094..049eee7f8c724 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -15,30 +15,65 @@ use std::ops::Bound; use std::ops::Bound::Unbounded; +use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row; +use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef}; +use risingwave_expr::capture_context; +use risingwave_expr::expr::{ + build_func_non_strict, EvalErrorReport, ExpressionBoxExt, InputRefExpression, + LiteralExpression, NonStrictExpression, +}; +use risingwave_expr::expr_context::TIME_ZONE; use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::executor::prelude::*; +use crate::task::ActorEvalErrorReport; pub struct NowExecutor { data_types: Vec, + mode: NowMode, + eval_error_report: ActorEvalErrorReport, + /// Receiver of barrier channel. barrier_receiver: UnboundedReceiver, state_table: StateTable, } +pub enum NowMode { + /// Emit current timestamp on startup, update it on barrier. + UpdateCurrent, + /// Generate a series of timestamps starting from `start_timestamp` with `interval`. + /// Keep generating new timestamps on barrier. + GenerateSeries { + start_timestamp: Timestamptz, + interval: Interval, + }, +} + +enum ModeVars { + UpdateCurrent, + GenerateSeries { + chunk_builder: StreamChunkBuilder, + add_interval_expr: NonStrictExpression, + }, +} + impl NowExecutor { pub fn new( data_types: Vec, + mode: NowMode, + eval_error_report: ActorEvalErrorReport, barrier_receiver: UnboundedReceiver, state_table: StateTable, ) -> Self { Self { data_types, + mode, + eval_error_report, barrier_receiver, state_table, } @@ -48,24 +83,43 @@ impl NowExecutor { async fn execute_inner(self) { let Self { data_types, + mode, + eval_error_report, barrier_receiver, mut state_table, } = self; + let max_chunk_size = crate::config::chunk_size(); + // Whether the executor is paused. let mut paused = false; // The last timestamp **sent** to the downstream. let mut last_timestamp: Datum = None; + // Whether the first barrier is handled and `last_timestamp` is initialized. let mut initialized = false; + let mut mode_vars = match &mode { + NowMode::UpdateCurrent => ModeVars::UpdateCurrent, + NowMode::GenerateSeries { interval, .. } => { + // in most cases there won't be more than one row except for the first time + let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1)); + let add_interval_expr = + build_add_interval_expr_captured(*interval, eval_error_report)?; + ModeVars::GenerateSeries { + chunk_builder, + add_interval_expr, + } + } + }; + const MAX_MERGE_BARRIER_SIZE: usize = 64; #[for_await] for barriers in UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE) { - let mut timestamp = None; + let mut curr_timestamp = None; if barriers.len() > 1 { warn!( "handle multiple barriers at once in now executor: {}", @@ -74,7 +128,7 @@ impl NowExecutor { } for barrier in barriers { if !initialized { - // Handle the first barrier. + // Handle the initial barrier. state_table.init_epoch(barrier.epoch); let state_row = { let sub_range: &(Bound, Bound) = @@ -97,7 +151,7 @@ impl NowExecutor { } // Extract timestamp from the current epoch. - timestamp = Some(barrier.get_curr_epoch().as_scalar()); + curr_timestamp = Some(barrier.get_curr_epoch().as_scalar()); // Update paused state. if let Some(mutation) = barrier.mutation.as_deref() { @@ -116,28 +170,94 @@ impl NowExecutor { continue; } - let stream_chunk = if last_timestamp.is_some() { - let last_row = row::once(&last_timestamp); - let row = row::once(×tamp); - state_table.update(last_row, row); + match (&mode, &mut mode_vars) { + (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => { + let chunk = if last_timestamp.is_some() { + let last_row = row::once(&last_timestamp); + let row = row::once(&curr_timestamp); + state_table.update(last_row, row); + + StreamChunk::from_rows( + &[(Op::Delete, last_row), (Op::Insert, row)], + &data_types, + ) + } else { + let row = row::once(&curr_timestamp); + state_table.insert(row); + + StreamChunk::from_rows(&[(Op::Insert, row)], &data_types) + }; - StreamChunk::from_rows(&[(Op::Delete, last_row), (Op::Insert, row)], &data_types) - } else { - let row = row::once(×tamp); - state_table.insert(row); + yield Message::Chunk(chunk); + last_timestamp.clone_from(&curr_timestamp) + } + ( + NowMode::GenerateSeries { + start_timestamp, .. + }, + ModeVars::GenerateSeries { + chunk_builder, + ref add_interval_expr, + }, + ) => { + if last_timestamp.is_none() { + // We haven't emit any timestamp yet. Let's emit the first one and populate the state table. + let first = Some((*start_timestamp).into()); + let first_row = row::once(&first); + let _ = chunk_builder.append_row(Op::Insert, first_row); + state_table.insert(first_row); + last_timestamp = first; + } - StreamChunk::from_rows(&[(Op::Insert, row)], &data_types) - }; + // Now let's step through the timestamps from the last timestamp to the current timestamp. + // We use `last_row` as a temporary cursor to track the progress, and won't touch `last_timestamp` + // until the end of the loop, so that `last_timestamp` is always synced with the state table. + let mut last_row = OwnedRow::new(vec![last_timestamp.clone()]); + + loop { + if chunk_builder.size() >= max_chunk_size { + // Manually yield the chunk when size exceeds the limit. We don't want to use chunk builder + // with limited size here because the initial capacity can be too large for most cases. + // Basically only the first several chunks can potentially exceed the `max_chunk_size`. + if let Some(chunk) = chunk_builder.take() { + yield Message::Chunk(chunk); + } + } + + let next = add_interval_expr.eval_row_infallible(&last_row).await; + if DefaultOrdered(next.to_datum_ref()) + > DefaultOrdered(curr_timestamp.to_datum_ref()) + { + // We only increase the timestamp to the current timestamp. + break; + } + + let next_row = OwnedRow::new(vec![next]); + let _ = chunk_builder.append_row(Op::Insert, &next_row); + last_row = next_row; + } + + if let Some(chunk) = chunk_builder.take() { + yield Message::Chunk(chunk); + } - yield Message::Chunk(stream_chunk); + // Update the last timestamp. + state_table.update(row::once(&last_timestamp), &last_row); + last_timestamp = last_row + .into_inner() + .into_vec() + .into_iter() + .exactly_one() + .unwrap(); + } + _ => unreachable!(), + } yield Message::Watermark(Watermark::new( 0, DataType::Timestamptz, - timestamp.clone().unwrap(), + curr_timestamp.unwrap(), )); - - last_timestamp = timestamp; } } } @@ -148,36 +268,54 @@ impl Execute for NowExecutor { } } +#[capture_context(TIME_ZONE)] +pub fn build_add_interval_expr( + time_zone: &str, + interval: Interval, + eval_error_report: impl EvalErrorReport + 'static, +) -> risingwave_expr::Result { + let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0); + let interval = LiteralExpression::new(DataType::Interval, Some(interval.into())); + let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into())); + + use risingwave_pb::expr::expr_node::PbType as PbExprType; + build_func_non_strict( + PbExprType::AddWithTimeZone, + DataType::Timestamptz, + vec![ + timestamptz_input.boxed(), + interval.boxed(), + time_zone.boxed(), + ], + eval_error_report, + ) +} + #[cfg(test)] mod tests { - use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::test_prelude::StreamChunkTestExt; - use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::types::test_utils::IntervalTestExt; + use risingwave_common::util::epoch::test_epoch; use risingwave_storage::memory::MemoryStateStore; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; - use super::NowExecutor; - use crate::common::table::state_table::StateTable; + use super::*; use crate::executor::test_utils::StreamExecutorTestExt; - use crate::executor::{ - Barrier, BoxedMessageStream, Execute, Mutation, StreamExecutorResult, Watermark, - }; #[tokio::test] async fn test_now() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier - tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1)) - .unwrap(); + tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -188,7 +326,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -199,14 +337,17 @@ mod tests { ) ); - tx.send(Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16)) - .unwrap(); + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(2), + test_epoch(1), + )) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -218,7 +359,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -230,21 +371,25 @@ mod tests { ); // No more messages until the next barrier - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Recovery - drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(&state_store).await; - tx.send(Barrier::with_prev_epoch_for_test(3 << 16, 1 << 16)) - .unwrap(); + drop((tx, now)); + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(3), + test_epoch(2), + )) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), + // the last chunk was not checkpointed so the deleted old value should be `001` StreamChunk::from_pretty( " TZ - 2021-04-01T00:00:00.001Z @@ -253,7 +398,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -265,28 +410,32 @@ mod tests { ); // Recovery with paused - drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(&state_store).await; - tx.send(Barrier::new_test_barrier(4 << 16).with_mutation(Mutation::Pause)) - .unwrap(); + drop((tx, now)); + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; + tx.send( + Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(3)) + .with_mutation(Mutation::Pause), + ) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // There should be no messages until `Resume` - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Resume barrier tx.send( - Barrier::with_prev_epoch_for_test(5 << 16, 4 << 16).with_mutation(Mutation::Resume), + Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4)) + .with_mutation(Mutation::Resume), ) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), StreamChunk::from_pretty( @@ -297,7 +446,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -314,29 +463,30 @@ mod tests { #[tokio::test] async fn test_now_start_with_paused() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier - tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1).with_mutation(Mutation::Pause)) + tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause)) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // There should be no messages until `Resume` - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Resume barrier tx.send( - Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16).with_mutation(Mutation::Resume), + Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1)) + .with_mutation(Mutation::Resume), ) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -347,7 +497,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -359,7 +509,121 @@ mod tests { ); // No more messages until the next barrier - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); + + Ok(()) + } + + #[tokio::test] + async fn test_now_generate_series() -> StreamExecutorResult<()> { + TIME_ZONE::scope("UTC".to_string(), test_now_generate_series_inner()).await + } + + async fn test_now_generate_series_inner() -> StreamExecutorResult<()> { + let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); // 2021-03-31 23:59:50 UTC + let interval = Interval::from_millis(1000); // 1s interval + + let state_store = create_state_store(); + let (tx, mut now) = create_executor( + NowMode::GenerateSeries { + start_timestamp, + interval, + }, + &state_store, + ) + .await; + + // Init barrier + tx.send(Barrier::new_test_barrier(test_epoch(1000))) + .unwrap(); + now.next_unwrap_ready_barrier()?; + + // Initial timestamps + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!(chunk.cardinality(), 12); // seconds from 23:59:50 to 00:00:01 (inclusive) + + assert_eq!( + now.next_unwrap_ready_watermark()?, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap()) + ) + ); + + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(2000), + test_epoch(1000), + )) + .unwrap(); + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(3000), + test_epoch(2000), + )) + .unwrap(); + + now.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; + + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!( + chunk.compact(), + StreamChunk::from_pretty( + " TZ + + 2021-04-01T00:00:02.000Z + + 2021-04-01T00:00:03.000Z" + ) + ); + + let watermark = now.next_unwrap_ready_watermark()?; + assert_eq!( + watermark, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap()) + ) + ); + + // Recovery + drop((tx, now)); + let (tx, mut now) = create_executor( + NowMode::GenerateSeries { + start_timestamp, + interval, + }, + &state_store, + ) + .await; + + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(4000), + test_epoch(3000), + )) + .unwrap(); + + now.next_unwrap_ready_barrier()?; + + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!( + chunk.compact(), + StreamChunk::from_pretty( + " TZ + + 2021-04-01T00:00:02.000Z + + 2021-04-01T00:00:03.000Z + + 2021-04-01T00:00:04.000Z" + ) + ); + + let watermark = now.next_unwrap_ready_watermark()?; + assert_eq!( + watermark, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap()) + ) + ); Ok(()) } @@ -369,6 +633,7 @@ mod tests { } async fn create_executor( + mode: NowMode, state_store: &MemoryStateStore, ) -> (UnboundedSender, BoxedMessageStream) { let table_id = TableId::new(1); @@ -384,8 +649,17 @@ mod tests { let (sender, barrier_receiver) = unbounded_channel(); - let now_executor = - NowExecutor::new(vec![DataType::Timestamptz], barrier_receiver, state_table); + let eval_error_report = ActorEvalErrorReport { + actor_context: ActorContext::for_test(123), + identity: "NowExecutor".into(), + }; + let now_executor = NowExecutor::new( + vec![DataType::Timestamptz], + mode, + eval_error_report, + barrier_receiver, + state_table, + ); (sender, now_executor.boxed().execute()) } } diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index a917cd136f3b4..9eac7caa13557 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::stream_plan::NowNode; +use anyhow::Context; +use risingwave_common::types::{DataType, Datum}; +use risingwave_common::util::value_encoding::DatumFromProtoExt; +use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; +use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries}; use risingwave_storage::StateStore; use tokio::sync::mpsc::unbounded_channel; use super::ExecutorBuilder; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; -use crate::executor::{Executor, NowExecutor}; +use crate::executor::{Executor, NowExecutor, NowMode}; use crate::task::ExecutorParams; pub struct NowExecutorBuilder; @@ -37,11 +41,43 @@ impl ExecutorBuilder for NowExecutorBuilder { .create_actor_context .register_sender(params.actor_context.id, sender); + let mode = if let Ok(pb_mode) = node.get_mode() { + match pb_mode { + PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent, + PbNowMode::GenerateSeries(PbNowModeGenerateSeries { + start_timestamp, + interval, + }) => { + let start_timestamp = Datum::from_protobuf( + start_timestamp.as_ref().unwrap(), + &DataType::Timestamptz, + ) + .context("`start_timestamp` field is not decodable")? + .context("`start_timestamp` field should not be NULL")? + .into_timestamptz(); + let interval = + Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval) + .context("`interval` field is not decodable")? + .context("`interval` field should not be NULL")? + .into_interval(); + NowMode::GenerateSeries { + start_timestamp, + interval, + } + } + } + } else { + // default to `UpdateCurrent` for backward-compatibility + NowMode::UpdateCurrent + }; + let state_table = StateTable::from_table_catalog(node.get_state_table()?, store, None).await; let exec = NowExecutor::new( params.info.schema.data_types(), + mode, + params.eval_error_report, barrier_receiver, state_table, ); diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 51a735af0f5a7..a199e4c8acb9f 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -40,6 +40,7 @@ #![feature(btree_cursors)] #![feature(assert_matches)] #![feature(try_blocks)] +#![feature(result_flattening)] // required by `capture_context` use std::sync::Arc;