diff --git a/e2e_test/source/basic/ttl_table_with_con.slt b/e2e_test/source/basic/ttl_table_with_con.slt new file mode 100644 index 0000000000000..6a232bbdee2a2 --- /dev/null +++ b/e2e_test/source/basic/ttl_table_with_con.slt @@ -0,0 +1,32 @@ +statement ok +create table t (v1 int, v2 varchar) APPEND ONLY with ( + connector = 'kafka', + topic = 'kafka_1_partition_topic', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest', + retention_seconds = 5 +) FORMAT PLAIN ENCODE JSON; + +statement ok +flush; + +# Wait enough time to ensure SourceExecutor consumes all Kafka data. +sleep 1s + +query IT rowsort +select * from t +---- +1 1 +2 22 +3 333 +4 4444 + +statement ok +select pg_sleep(10); + +query I +select * from t; +---- + +statement ok +drop table t; \ No newline at end of file diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index c542762702053..00440cdae37de 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -536,14 +536,12 @@ pub(crate) fn gen_create_table_plan( for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) } - let with_properties = context.with_options().inner().clone(); gen_create_table_plan_without_source( context, table_name, columns, column_defs, constraints, - with_properties, definition, source_watermarks, append_only, @@ -560,7 +558,6 @@ pub(crate) fn gen_create_table_plan_without_source( columns: Vec, column_defs: Vec, constraints: Vec, - with_properties: BTreeMap, definition: String, source_watermarks: Vec, append_only: bool, @@ -598,7 +595,6 @@ pub(crate) fn gen_create_table_plan_without_source( context.into(), name, columns, - with_properties, pk_column_ids, row_id_index, definition, @@ -629,7 +625,6 @@ fn gen_table_plan_with_source( context, source_catalog.name, source_catalog.columns, - source_catalog.with_properties.clone(), source_catalog.pk_col_ids, source_catalog.row_id_index, source_catalog.definition, @@ -649,7 +644,6 @@ fn gen_table_plan_inner( context: OptimizerContextRef, table_name: String, columns: Vec, - with_properties: BTreeMap, pk_column_ids: Vec, row_id_index: Option, definition: String, @@ -664,8 +658,7 @@ fn gen_table_plan_inner( schema_id: SchemaId, ) -> Result<(PlanRef, PbTable)> { let session = context.session_ctx().clone(); - let with_properties = WithOptions::new(with_properties); - let retention_seconds = with_properties.retention_seconds(); + let retention_seconds = context.with_options().retention_seconds(); let is_external_source = source_catalog.is_some(); let source_node: PlanRef = LogicalSource::new( source_catalog.map(|source| Rc::new(source.clone())), diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index bdd6e9d078e90..9a01d2919086e 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -90,19 +90,12 @@ pub async fn handle_create_as( let (graph, source, table) = { let context = OptimizerContext::from_handler_args(handler_args.clone()); - let properties = handler_args - .with_options - .inner() - .clone() - .into_iter() - .collect(); let (plan, table) = gen_create_table_plan_without_source( context, table_name.clone(), columns, vec![], vec![], - properties, "".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` vec![], // No watermark should be defined in for `CREATE TABLE AS` append_only, diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 92c65786afdeb..a6984d687bc74 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -82,7 +82,10 @@ impl WithOptions { pub fn into_connector_props(self) -> BTreeMap { self.inner .into_iter() - .filter(|(key, _)| key != OverwriteOptions::STREAMING_RATE_LIMIT_KEY) + .filter(|(key, _)| { + key != OverwriteOptions::STREAMING_RATE_LIMIT_KEY + && key != options::RETENTION_SECONDS + }) .collect() }