Skip to content

Commit

Permalink
fix LogicalNow column pruning
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Jun 26, 2024
1 parent 7cbb3a9 commit 91e740b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,27 @@
select * from unnest(array[now(), now()]);
expected_outputs:
- stream_error
- sql: |
select 1::int as constant from generate_series(
'2024-06-21 17:36:00'::timestamptz,
now(),
interval '1 hour'
);
expected_outputs:
- logical_plan
- optimized_logical_plan_for_stream
- stream_plan
- sql: |
select
extract('year' from t) as year,
extract('month' from t) as month,
1::int as constant
from generate_series(
'2024-01-01 00:00:00+00'::timestamptz,
now(),
interval '1 month'
) as s(t);
expected_outputs:
- logical_plan
- optimized_logical_plan_for_stream
- stream_plan
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,36 @@
stream_error: |-
Not supported: General `now()` function in streaming queries
HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns.
- sql: |
select 1::int as constant from generate_series(
'2024-06-21 17:36:00'::timestamptz,
now(),
interval '1 hour'
);
logical_plan: |-
LogicalProject { exprs: [1:Int32] }
└─LogicalTableFunction { table_function: GenerateSeries('2024-06-21 17:36:00':Varchar::Timestamptz, Now, '01:00:00':Interval) }
optimized_logical_plan_for_stream: 'LogicalValues { rows: [], schema: Schema { fields: [1:Int32:Int32] } }'
stream_plan: |-
StreamMaterialize { columns: [constant, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamValues { rows: [] }
- sql: |
select
extract('year' from t) as year,
extract('month' from t) as month,
1::int as constant
from generate_series(
'2024-01-01 00:00:00+00'::timestamptz,
now(),
interval '1 month'
) as s(t);
logical_plan: |-
LogicalProject { exprs: [Extract('YEAR':Varchar, generate_series) as $expr1, Extract('MONTH':Varchar, generate_series) as $expr2, 1:Int32] }
└─LogicalTableFunction { table_function: GenerateSeries('2024-01-01 00:00:00+00:00':Timestamptz, Now, '1 mon':Interval) }
optimized_logical_plan_for_stream: |-
LogicalProject { exprs: [Extract('YEAR':Varchar, ts) as $expr1, Extract('MONTH':Varchar, ts) as $expr2, 1:Int32] }
└─LogicalNow { output: [ts] }
stream_plan: |-
StreamMaterialize { columns: [year, month, constant, ts(hidden)], stream_key: [ts], pk_columns: [ts], pk_conflict: NoCheck, watermark_columns: [ts(hidden)] }
└─StreamProject { exprs: [Extract('YEAR':Varchar, ts, 'UTC':Varchar) as $expr1, Extract('MONTH':Varchar, ts, 'UTC':Varchar) as $expr2, 1:Int32, ts], output_watermarks: [ts] }
└─StreamNow { output: [ts] }
15 changes: 11 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

use pretty_xmlish::XmlNode;
use risingwave_common::bail;
use risingwave_common::catalog::Schema;

use super::generic::{self, GenericPlanRef, Mode};
use super::utils::{childless_record, Distill};
use super::{
ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef,
PredicatePushdown, RewriteStreamContext, StreamNow, ToBatch, ToStream, ToStreamContext,
ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter, LogicalValues,
PlanBase, PlanRef, PredicatePushdown, RewriteStreamContext, StreamNow, ToBatch, ToStream,
ToStreamContext,
};
use crate::error::Result;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
Expand Down Expand Up @@ -96,7 +98,12 @@ impl ToBatch for LogicalNow {

/// The trait for column pruning, only logical plan node will use it, though all plan node impl it.
impl ColPrunable for LogicalNow {
fn prune_col(&self, _required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
self.clone().into()
fn prune_col(&self, required_cols: &[usize], _: &mut ColumnPruningContext) -> PlanRef {
if required_cols.is_empty() {
LogicalValues::new(vec![], Schema::empty().clone(), self.ctx()).into()
} else {
assert_eq!(required_cols, &[0], "we only output one column");
self.clone().into()
}
}
}

0 comments on commit 91e740b

Please sign in to comment.