diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index eeed357487198..48bd5c8d580c3 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -76,6 +76,7 @@ cluster_start sqllogictest -p 4566 -d dev './e2e_test/ddl/**/*.slt' --junit "batch-ddl-${profile}" sqllogictest -p 4566 -d dev './e2e_test/background_ddl/basic.slt' --junit "batch-ddl-${profile}" sqllogictest -p 4566 -d dev './e2e_test/visibility_mode/*.slt' --junit "batch-${profile}" +sqllogictest -p 4566 -d dev './e2e_test/ttl/ttl.slt' sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt' sqllogictest -p 4566 -d test './e2e_test/database/test.slt' diff --git a/e2e_test/ddl/table/table.slt.part b/e2e_test/ddl/table/table.slt.part index 2e7c744ba2536..d814a5beee10c 100644 --- a/e2e_test/ddl/table/table.slt.part +++ b/e2e_test/ddl/table/table.slt.part @@ -258,4 +258,4 @@ statement ok drop table t1; statement ok -drop table t2; +drop table t2; \ No newline at end of file diff --git a/e2e_test/ttl/ttl.slt b/e2e_test/ttl/ttl.slt new file mode 100644 index 0000000000000..8361e19365edb --- /dev/null +++ b/e2e_test/ttl/ttl.slt @@ -0,0 +1,38 @@ +statement error +create table t(v int) with (retention_seconds = 5); + +statement ok +create table t(v int) APPEND ONLY with (retention_seconds = 5); + +statement ok +create index i on t(v); + +statement ok +insert into t values(1); + +statement ok +flush; + +query I +select * from t; +---- +1 + +query I +select * from i; +---- +1 + +statement ok +select pg_sleep(10); + +query I +select * from t; +---- + +query I +select * from i; +---- + +statement ok +drop table t; \ No newline at end of file diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index c687f56fad8e8..e1b263a64db18 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; +use std::num::NonZeroU32; use std::rc::Rc; use std::sync::Arc; @@ -323,6 +324,7 @@ fn assemble_materialize( // LogicalScan(table_desc) let definition = context.normalized_sql().to_owned(); + let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new); let logical_scan = LogicalScan::create( table_name, @@ -395,7 +397,7 @@ fn assemble_materialize( project_required_cols, out_names, ) - .gen_index_plan(index_name, definition) + .gen_index_plan(index_name, definition, retention_seconds) } pub async fn handle_create_index( diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index a6a48a2c2dba9..94ff4eebbc66e 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -282,7 +282,7 @@ pub mod tests { let frontend = LocalFrontend::new(Default::default()).await; frontend.run_sql(sql).await.unwrap(); - let sql = "create materialized view mv1 with (ttl = 300) as select t1.country from t1"; + let sql = "create materialized view mv1 as select t1.country from t1"; frontend.run_sql(sql).await.unwrap(); let session = frontend.session_ref(); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 7fc757b71b6b7..493cfe967f3d3 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -660,6 +660,7 @@ fn gen_table_plan_inner( let mut with_properties = WithOptions::new(with_properties); let connection_id = resolve_privatelink_in_with_option(&mut with_properties, &schema_name, &session)?; + let retention_seconds = with_properties.retention_seconds(); let is_external_source = source_info.is_some(); @@ -724,6 +725,14 @@ fn gen_table_plan_inner( .into()); } + if !append_only && retention_seconds.is_some() { + return Err(ErrorCode::NotSupported( + "Defining retention seconds on table requires the table to be append only.".to_owned(), + "Use the key words `APPEND ONLY`".to_owned(), + ) + .into()); + } + let materialize = plan_root.gen_table_plan( context, name, @@ -735,6 +744,7 @@ fn gen_table_plan_inner( watermark_descs, version, is_external_source, + retention_seconds, )?; let mut table = materialize.table().to_prost(schema_id, database_id); @@ -856,6 +866,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( vec![], Some(col_id_gen.into_version()), true, + None, )?; let mut table = materialize.table().to_prost(schema_id, database_id); diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index f222a41fb48e7..6d3f5a1d73722 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -1,5 +1,20 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::assert_matches::assert_matches; use std::collections::HashMap; +use std::num::NonZeroU32; // Copyright 2024 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -524,6 +539,7 @@ impl PlanRoot { watermark_descs: Vec, version: Option, with_external_source: bool, + retention_seconds: Option, ) -> Result { let stream_plan = self.gen_optimized_stream_plan(false)?; @@ -725,6 +741,7 @@ impl PlanRoot { pk_column_indices, row_id_index, version, + retention_seconds, ) } @@ -748,6 +765,7 @@ impl PlanRoot { definition, TableType::MaterializedView, cardinality, + None, ) } @@ -756,6 +774,7 @@ impl PlanRoot { &mut self, index_name: String, definition: String, + retention_seconds: Option, ) -> Result { let cardinality = self.compute_cardinality(); let stream_plan = self.gen_optimized_stream_plan(false)?; @@ -770,6 +789,7 @@ impl PlanRoot { definition, TableType::Index, cardinality, + retention_seconds, ) } diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 8bfecb64b03d5..6aa63bbdb6cef 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -69,7 +69,6 @@ impl StreamCdcTableScan { &self, state: &mut BuildFragmentGraphState, ) -> TableCatalog { - let _properties = self.ctx().with_options().internal_table_subset(); let mut catalog_builder = TableCatalogBuilder::default(); let upstream_schema = &self.core.get_table_columns(); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index f2acbcf9d258c..792eaf4d7a068 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::assert_matches::assert_matches; +use std::num::NonZeroU32; use fixedbitset::FixedBitSet; use itertools::Itertools; @@ -76,6 +77,7 @@ impl StreamMaterialize { definition: String, table_type: TableType, cardinality: Cardinality, + retention_seconds: Option, ) -> Result { let input = Self::rewrite_input(input, user_distributed_by, table_type)?; // the hidden column name might refer some expr id @@ -94,6 +96,7 @@ impl StreamMaterialize { table_type, None, cardinality, + retention_seconds, )?; Ok(Self::new(input, table)) @@ -116,6 +119,7 @@ impl StreamMaterialize { pk_column_indices: Vec, row_id_index: Option, version: Option, + retention_seconds: Option, ) -> Result { let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?; @@ -131,6 +135,7 @@ impl StreamMaterialize { TableType::Table, version, Cardinality::unknown(), // unknown cardinality for tables + retention_seconds, )?; Ok(Self::new(input, table)) @@ -200,12 +205,12 @@ impl StreamMaterialize { table_type: TableType, version: Option, cardinality: Cardinality, + retention_seconds: Option, ) -> Result { let input = rewritten_input; let value_indices = (0..columns.len()).collect_vec(); let distribution_key = input.distribution().dist_column_indices().to_vec(); - let _properties = input.ctx().with_options().internal_table_subset(); // TODO: remove this let append_only = input.append_only(); let watermark_columns = input.watermark_columns().clone(); @@ -254,8 +259,7 @@ impl StreamMaterialize { incoming_sinks: vec![], initialized_at_cluster_version: None, created_at_cluster_version: None, - // TODO: https://github.com/risingwavelabs/risingwave/issues/14791 - retention_seconds: None, + retention_seconds: retention_seconds.map(|i| i.into()), }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index e9a624d7f3ceb..99165dcd657c9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -155,7 +155,6 @@ impl StreamTableScan { &self, state: &mut BuildFragmentGraphState, ) -> TableCatalog { - let _properties = self.ctx().with_options().internal_table_subset(); let mut catalog_builder = TableCatalogBuilder::default(); let upstream_schema = &self.core.get_table_columns(); diff --git a/src/frontend/src/utils/overwrite_options.rs b/src/frontend/src/utils/overwrite_options.rs index 697da264ab7e4..8d4ff2febfd22 100644 --- a/src/frontend/src/utils/overwrite_options.rs +++ b/src/frontend/src/utils/overwrite_options.rs @@ -17,13 +17,10 @@ use crate::handler::HandlerArgs; #[derive(Debug, Clone, Default)] pub struct OverwriteOptions { pub streaming_rate_limit: Option, - // ttl has been deprecated - pub ttl: Option, } impl OverwriteOptions { const STREAMING_RATE_LIMIT_KEY: &'static str = "streaming_rate_limit"; - const TTL_KEY: &'static str = "ttl"; pub fn new(args: &mut HandlerArgs) -> Self { let streaming_rate_limit = { @@ -41,15 +38,8 @@ impl OverwriteOptions { .map(|limit| limit.get() as u32) } }; - let ttl = args - .with_options - .inner_mut() - .remove(Self::TTL_KEY) - // FIXME(tabVersion): validate the value - .map(|x| x.parse::().unwrap()); Self { streaming_rate_limit, - ttl, } } } diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 184596a37a3f8..633bcf29354f5 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -103,13 +103,6 @@ impl WithOptions { Self { inner } } - /// Get the subset of the options for internal table catalogs. - /// - /// Currently only `retention_seconds` is included. - pub fn internal_table_subset(&self) -> Self { - self.subset([options::RETENTION_SECONDS]) - } - pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool { if let Some(inner_val) = self.inner.get(key) { if inner_val.eq_ignore_ascii_case(val) {