diff --git a/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml b/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml index e47581447d8b5..1fc6df6613a98 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml @@ -9,6 +9,6 @@ create table s(x int) append only; explain create sink ss into t from s with (type = 'append-only'); explain_output: | - StreamProject { exprs: [s.x, Proctime as $expr1, (Proctime - '00:01:00':Interval) as $expr2, null:Serial, null:Timestamptz], output_watermarks: [$expr1, $expr2] } + StreamProject { exprs: [s.x, Proctime as $expr1, (Proctime - '00:01:00':Interval) as $expr2, null:Serial], output_watermarks: [$expr1, $expr2] } └─StreamSink { type: append-only, columns: [x, s._row_id(hidden)] } └─StreamTableScan { table: s, columns: [x, _row_id] } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 9d8887016d88d..5fef576697bf6 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -286,10 +286,11 @@ pub async fn gen_sink_plan( ))); } + let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp(); let exprs = derive_default_column_project_for_sink( &sink_catalog, sink_plan.schema(), - table_catalog.columns(), + &table_columns_without_rw_timestamp, user_specified_columns, )?; @@ -297,8 +298,9 @@ pub async fn gen_sink_plan( sink_plan = StreamProject::new(logical_project).into(); - let exprs = - LogicalSource::derive_output_exprs_from_generated_columns(table_catalog.columns())?; + let exprs = LogicalSource::derive_output_exprs_from_generated_columns( + &table_columns_without_rw_timestamp, + )?; if let Some(exprs) = exprs { let logical_project = generic::Project::new(exprs, sink_plan);