Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): ban jsonb in aggregation stream key #14442

Merged
merged 5 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
- name: jsonb in group by
sql: |
create table t1 (v1 jsonb, v2 int);
create table t2 (v3 jsonb, v4 int);
select v2 from t1 group by v2, v1;
expected_outputs:
- stream_error
- name: jsonb in union
sql: |
create table t1 (v1 jsonb, v2 int);
create table t2 (v3 jsonb, v4 int);
select v1, v2 from t1 union select v3, v4 from t2;
expected_outputs:
- stream_error
- name: jsonb in distinct
sql: |
create table t1 (v1 jsonb, v2 int);
select distinct v1 from t1;
expected_outputs:
- stream_error
- name: jsonb in TopN by group
sql: |
create table t1 (v1 jsonb, v2 int);
SELECT v1 FROM (
SELECT v1, rank() OVER (PARTITION BY v1 ORDER BY v2) AS rank
FROM t1)
WHERE rank <= 2;
expected_outputs:
- stream_error
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- name: jsonb in group by
sql: |
create table t1 (v1 jsonb, v2 int);
create table t2 (v3 jsonb, v4 int);
select v2 from t1 group by v2, v1;
stream_error: |-
Not supported: Column t1.v1 should not be in the aggregation group key because it has data type Jsonb
HINT: Use a large Jsonb as part of the stream key is unexpected. If you are sure your Jsonb is small, use `set XXX true`
- name: jsonb in union
sql: |
create table t1 (v1 jsonb, v2 int);
create table t2 (v3 jsonb, v4 int);
select v1, v2 from t1 union select v3, v4 from t2;
stream_error: |-
Not supported: Column t1.v1 should not be in the union because it has data type Jsonb
HINT: Use a large Jsonb as part of the stream key is unexpected. If you are sure your Jsonb is small, use `set XXX true`
- name: jsonb in distinct
sql: |
create table t1 (v1 jsonb, v2 int);
select distinct v1 from t1;
stream_error: |-
Not supported: Column t1.v1 should not be in the aggregation group key because it has data type Jsonb
HINT: Use a large Jsonb as part of the stream key is unexpected. If you are sure your Jsonb is small, use `set XXX true`
- name: jsonb in TopN by group
sql: |
create table t1 (v1 jsonb, v2 int);
SELECT v1 FROM (
SELECT v1, rank() OVER (PARTITION BY v1 ORDER BY v2) AS rank
FROM t1)
WHERE rank <= 2;
stream_error: |-
Not supported: Column t1.v1 should not be in the over window partition key because it has data type Jsonb
HINT: Use a large Jsonb as part of the stream key is unexpected. If you are sure your Jsonb is small, use `set XXX true`
8 changes: 7 additions & 1 deletion src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use self::plan_node::{
};
#[cfg(debug_assertions)]
use self::plan_visitor::InputRefValidator;
use self::plan_visitor::{has_batch_exchange, CardinalityVisitor};
use self::plan_visitor::{has_batch_exchange, CardinalityVisitor, StreamKeyChecker};
use self::property::{Cardinality, RequiredDist};
use self::rule::*;
use crate::catalog::table_catalog::{TableType, TableVersion};
Expand Down Expand Up @@ -376,6 +376,12 @@ impl PlanRoot {

let plan = match self.plan.convention() {
Convention::Logical => {
if let Some(err) = StreamKeyChecker.visit(self.plan.clone()) {
return Err(ErrorCode::NotSupported(
err,
"Use a large Jsonb as part of the stream key is unexpected. If you are sure your Jsonb is small, use `set XXX true`".to_string(),
).into());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should check the stream key at least after the logical optimization phase because after column pruning, some unnecessary columns check could be skipped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan nodes after optimization can be different from the plan after binding and cause inaccurate error messages.

The columns checked are group by order by partition by, which is unlikely to be pruned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for union, it seems hard to say? e.g. select a, b, c from ( select * from t1 union all select * from t2)

Copy link
Contributor

@st1page st1page Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean `union` here ? actually I am not sure about the behavior here... can we prune the column here? in other words, I consider they are not equalevant ```SQL SELECT a,b FROM ( SELECT distinct a,b,c FROM t ) t;

SELECT distinct a,b FROM FROM t;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed an issue about union all

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agg can be created during subquery unnesting. See the sample query here: #7981 (comment)

Btw is join key a concern as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this or only handles user explicitly write agg queries. We need a fallback to handle other cases.

Yes, join can be a concern.

let plan = self.gen_optimized_logical_plan_for_stream()?;

let (plan, out_col_change) = {
Expand Down
16 changes: 10 additions & 6 deletions src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) fn has_repeated_element(slice: &[usize]) -> bool {
(1..slice.len()).any(|i| slice[i..].contains(&slice[i - 1]))
}

impl<PlanRef> Join<PlanRef> {
impl<PlanRef: GenericPlanRef> Join<PlanRef> {
pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
self.on = self.on.clone().rewrite_expr(r);
}
Expand All @@ -55,6 +55,13 @@ impl<PlanRef> Join<PlanRef> {
self.on.visit_expr(v);
}

pub fn eq_indexes(&self) -> Vec<(usize, usize)> {
let left_len = self.left.schema().len();
let right_len = self.right.schema().len();
let eq_predicate = EqJoinPredicate::create(left_len, right_len, self.on.clone());
eq_predicate.eq_indexes()
}

pub fn new(
left: PlanRef,
right: PlanRef,
Expand Down Expand Up @@ -169,10 +176,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
let left_len = self.left.schema().len();
let right_len = self.right.schema().len();
let eq_predicate = EqJoinPredicate::create(left_len, right_len, self.on.clone());

let eq_indexes = self.eq_indexes();
let left_pk = self.left.stream_key()?;
let right_pk = self.right.stream_key()?;
let l2i = self.l2i_col_mapping();
Expand All @@ -197,7 +201,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {

let either_or_both = self.add_which_join_key_to_pk();

for (lk, rk) in eq_predicate.eq_indexes() {
for (lk, rk) in eq_indexes {
match either_or_both {
EitherOrBoth::Left(_) => {
// Remove right-side join-key column it from pk_indices.
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ impl LogicalJoin {
self.core.join_type
}

/// Get the eq join key of the logical join.
pub fn eq_indexes(&self) -> Vec<(usize, usize)> {
self.core.eq_indexes()
}

/// Get the output indices of the logical join.
pub fn output_indices(&self) -> &Vec<usize> {
&self.core.output_indices
Expand Down
129 changes: 129 additions & 0 deletions src/frontend/src/optimizer/plan_visitor/jsonb_stream_key_checker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::FieldDisplay;
use risingwave_common::types::DataType;

use super::{DefaultBehavior, Merge};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{PlanNode, *};
use crate::optimizer::plan_visitor::PlanVisitor;

#[derive(Debug, Clone, Default)]
pub struct StreamKeyChecker;

impl StreamKeyChecker {
fn visit_inputs(&mut self, plan: &impl PlanNode) -> Option<String> {
let results = plan.inputs().into_iter().map(|input| self.visit(input));
Self::default_behavior().apply(results)
}
}

impl PlanVisitor for StreamKeyChecker {
type Result = Option<String>;

type DefaultBehavior = impl DefaultBehavior<Self::Result>;

fn default_behavior() -> Self::DefaultBehavior {
Merge(|a: Option<String>, b| a.or(b))
}

fn visit_logical_dedup(&mut self, plan: &LogicalDedup) -> Self::Result {
let input = plan.input();
let schema = input.schema();
let data_types = schema.data_types();
for idx in plan.dedup_cols() {
if data_types[*idx] == DataType::Jsonb {
return Some(format!(
"Column {} should not be in the distinct key because it has data type Jsonb",
FieldDisplay(&schema[*idx])
));
}
}
self.visit_inputs(plan)
}

fn visit_logical_top_n(&mut self, plan: &LogicalTopN) -> Self::Result {
let input = plan.input();
let schema = input.schema();
let data_types = schema.data_types();
for idx in plan.group_key() {
if data_types[*idx] == DataType::Jsonb {
return Some(format!(
"Column {} should not be in the TopN group key because it has data type Jsonb",
FieldDisplay(&schema[*idx])
));
}
}
for idx in plan
.topn_order()
.column_orders
.iter()
.map(|c| c.column_index)
{
if data_types[idx] == DataType::Jsonb {
return Some(format!(
"Column {} should not be in the TopN order key because it has data type Jsonb",
FieldDisplay(&schema[idx])
));
}
}
self.visit_inputs(plan)
}

fn visit_logical_union(&mut self, plan: &LogicalUnion) -> Self::Result {
for field in &plan.inputs()[0].schema().fields {
if field.data_type() == DataType::Jsonb {
return Some(format!(
"Column {} should not be in the union because it has data type Jsonb",
FieldDisplay(field)
));
}
}
self.visit_inputs(plan)
}

fn visit_logical_agg(&mut self, plan: &LogicalAgg) -> Self::Result {
let input = plan.input();
let schema = input.schema();
let data_types = schema.data_types();
for idx in plan.group_key().indices() {
if data_types[idx] == DataType::Jsonb {
return Some(format!("Column {} should not be in the aggregation group key because it has data type Jsonb", FieldDisplay(&schema[idx])));
yuhao-su marked this conversation as resolved.
Show resolved Hide resolved
}
}
self.visit_inputs(plan)
}

fn visit_logical_over_window(&mut self, plan: &LogicalOverWindow) -> Self::Result {
let input = plan.input();
let schema = input.schema();
let data_types = schema.data_types();

for func in plan.window_functions() {
for idx in func.partition_by.iter().map(|e| e.index()) {
if data_types[idx] == DataType::Jsonb {
return Some(format!("Column {} should not be in the over window partition key because it has data type Jsonb", FieldDisplay(&schema[idx])));
}
}

for idx in func.order_by.iter().map(|c| c.column_index) {
if data_types[idx] == DataType::Jsonb {
return Some(format!("Column {} should not be in the over window order by key because it has data type Jsonb", FieldDisplay(&schema[idx])));
}
}
}
self.visit_inputs(plan)
}
}
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_visitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ mod side_effect_visitor;
pub use side_effect_visitor::*;
mod cardinality_visitor;
pub use cardinality_visitor::*;
mod jsonb_stream_key_checker;
pub use jsonb_stream_key_checker::*;

use crate::for_all_plan_nodes;
use crate::optimizer::plan_node::*;
Expand Down
Loading