Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Nov 4, 2024
1 parent 4b105f7 commit 928f52d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
create table s(x int) append only;
explain create sink ss into t from s with (type = 'append-only');
explain_output: |
StreamProject { exprs: [s.x, Proctime as $expr1, (Proctime - '00:01:00':Interval) as $expr2, null:Serial, null:Timestamptz], output_watermarks: [$expr1, $expr2] }
StreamProject { exprs: [s.x, Proctime as $expr1, (Proctime - '00:01:00':Interval) as $expr2, null:Serial], output_watermarks: [$expr1, $expr2] }
└─StreamSink { type: append-only, columns: [x, s._row_id(hidden)] }
└─StreamTableScan { table: s, columns: [x, _row_id] }
8 changes: 5 additions & 3 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,19 +286,21 @@ pub async fn gen_sink_plan(
)));
}

let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
let exprs = derive_default_column_project_for_sink(
&sink_catalog,
sink_plan.schema(),
table_catalog.columns(),
&table_columns_without_rw_timestamp,
user_specified_columns,
)?;

let logical_project = generic::Project::new(exprs, sink_plan);

sink_plan = StreamProject::new(logical_project).into();

let exprs =
LogicalSource::derive_output_exprs_from_generated_columns(table_catalog.columns())?;
let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
&table_columns_without_rw_timestamp,
)?;

if let Some(exprs) = exprs {
let logical_project = generic::Project::new(exprs, sink_plan);
Expand Down

0 comments on commit 928f52d

Please sign in to comment.