Skip to content

Commit

Permalink
feat: (with Semi-join)support equal condition in temporal filter with…
Browse files Browse the repository at this point in the history
… complex expression (#14098)
  • Loading branch information
st1page authored Dec 22, 2023
1 parent f01eeb5 commit 4bab13a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
expected_outputs:
- stream_plan
- stream_dist_plan
- name: Temporal filter with equal condition
sql: |-
create table t1 (ts timestamp with time zone);
select * from t1 where date_trunc('week', now()) = date_trunc('week',ts);
expected_outputs:
- stream_plan
- stream_dist_plan
- name: Temporal filter with `now()` in upper bound on append only table
sql: |-
create table t1 (ts timestamp with time zone) APPEND ONLY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,75 @@
├── distribution key: [ 1 ]
└── read pk prefix len hint: 1
- name: Temporal filter with equal condition
sql: |-
create table t1 (ts timestamp with time zone);
select * from t1 where date_trunc('week', now()) = date_trunc('week',ts);
stream_plan: |-
StreamMaterialize { columns: [ts, t1._row_id(hidden), $expr1(hidden)], stream_key: [t1._row_id, $expr1], pk_columns: [t1._row_id, $expr1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(t1._row_id, $expr1) }
└─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr2, output: [t1.ts, t1._row_id, $expr1] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [t1.ts, DateTrunc('week':Varchar, t1.ts, 'UTC':Varchar) as $expr1, t1._row_id] }
│ └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard($expr2) }
└─StreamProject { exprs: [DateTrunc('week':Varchar, now, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] }
└─StreamNow { output: [now] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [ts, t1._row_id(hidden), $expr1(hidden)], stream_key: [t1._row_id, $expr1], pk_columns: [t1._row_id, $expr1], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamExchange Hash([1, 2]) from 1
Fragment 1
StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr2, output: [t1.ts, t1._row_id, $expr1] }
├── left table: 0
├── right table: 2
├── left degree table: 1
├── right degree table: 3
├── StreamExchange Hash([1]) from 2
└── StreamExchange Hash([0]) from 3
Fragment 2
StreamProject { exprs: [t1.ts, DateTrunc('week':Varchar, t1.ts, 'UTC':Varchar) as $expr1, t1._row_id] }
└── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } { state table: 4 }
├── Upstream
└── BatchPlanNode
Fragment 3
StreamProject { exprs: [DateTrunc('week':Varchar, now, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] }
└── StreamNow { output: [now] } { state table: 5 }
Table 0
├── columns: [ t1_ts, $expr1, t1__row_id ]
├── primary key: [ $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 1 ]
└── read pk prefix len hint: 1
Table 1 { columns: [ $expr1, t1__row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 2 { columns: [ $expr2 ], primary key: [ $0 ASC ], value indices: [ 0 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 3 { columns: [ $expr2, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 4
├── columns: [ vnode, _row_id, t1_backfill_finished, t1_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0
Table 5 { columns: [ now ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 }
Table 4294967294
├── columns: [ ts, t1._row_id, $expr1 ]
├── primary key: [ $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 1, 2 ]
└── read pk prefix len hint: 2
- name: Temporal filter with `now()` in upper bound on append only table
sql: |-
create table t1 (ts timestamp with time zone) APPEND ONLY;
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,16 @@ impl ExprImpl {
None
}
}
ty @ ExprType::Equal => {
let (_, op1, op2) = function_call.clone().decompose_as_binary();
if op1.count_nows() == 0 && op1.has_input_ref() && op2.count_nows() > 0 {
Some((op1, ty, op2))
} else if op2.count_nows() == 0 && op2.has_input_ref() && op1.count_nows() > 0 {
Some((op2, Self::reverse_comparison(ty), op1))
} else {
None
}
}
_ => None,
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@ impl Rule for FilterWithNowToJoinRule {
if let Some((input_expr, cmp, now_expr)) = expr.as_now_comparison_cond() {
let now_expr = rewriter.rewrite_expr(now_expr);

// as a sanity check, ensure that this expression will derive a watermark
// on the output of the now executor
debug_assert_eq!(
try_derive_watermark(&now_expr),
WatermarkDerivation::Watermark(lhs_len)
);

now_filters.push(FunctionCall::new(cmp, vec![input_expr, now_expr]).unwrap());
// ensure that this expression will derive a watermark
if try_derive_watermark(&now_expr) != WatermarkDerivation::Watermark(lhs_len) {
remainder.push(expr.clone());
} else {
now_filters.push(FunctionCall::new(cmp, vec![input_expr, now_expr]).unwrap());
}
} else {
remainder.push(expr.clone());
}
Expand Down

0 comments on commit 4bab13a

Please sign in to comment.