Skip to content

Commit

Permalink
Merge branch 'main' into xxh/fix-mqtt-source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Oct 24, 2024
2 parents f176ff2 + e6f830b commit 7ff35f6
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,33 @@
expected_outputs:
- batch_plan
- stream_plan
- name: improve join transpose rule to handle join with output indices
sql: |
with rawdata as (
select 'first' as w, '{"x":{"value":123},"y":{"value":[1,2,3]},"z":{"value":[{"a":4,"b":5},{"a":6,"b":7}]}}'::jsonb as rawjson
union all
select 'second', '{"x":{"value":456},"y":{"value":[7,8,9]},"z":{"value":[{"a":0,"b":1},{"a":2,"b":3}]}}'::jsonb
)
select
array(
select
array(
select 1
from jsonb_array_elements(
case
when jsonb_typeof(o.value->'value') = 'array' then o.value->'value'
else '[]'::jsonb
end
) with ordinality as x(v, i), jsonb_each (
case
when jsonb_typeof(v) = 'object' then v
else '{}'::jsonb
end
) as j
)
from jsonb_each(rawjson) AS o
where jsonb_typeof(o.value) = 'object' and o.value ? 'value'
) as kv
from rawdata
expected_outputs:
- optimized_logical_plan_for_batch
Original file line number Diff line number Diff line change
Expand Up @@ -2122,3 +2122,74 @@
│ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t6.l) }
└─StreamTableScan { table: t6, columns: [t6.k, t6.l, t6._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t6._row_id], pk: [_row_id], dist: UpstreamHashShard(t6._row_id) }
- name: improve join transpose rule to handle join with output indices
sql: |
with rawdata as (
select 'first' as w, '{"x":{"value":123},"y":{"value":[1,2,3]},"z":{"value":[{"a":4,"b":5},{"a":6,"b":7}]}}'::jsonb as rawjson
union all
select 'second', '{"x":{"value":456},"y":{"value":[7,8,9]},"z":{"value":[{"a":0,"b":1},{"a":2,"b":3}]}}'::jsonb
)
select
array(
select
array(
select 1
from jsonb_array_elements(
case
when jsonb_typeof(o.value->'value') = 'array' then o.value->'value'
else '[]'::jsonb
end
) with ordinality as x(v, i), jsonb_each (
case
when jsonb_typeof(v) = 'object' then v
else '{}'::jsonb
end
) as j
)
from jsonb_each(rawjson) AS o
where jsonb_typeof(o.value) = 'object' and o.value ? 'value'
) as kv
from rawdata
optimized_logical_plan_for_batch: |-
LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: [$expr6] }
├─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Coalesce(array_agg($expr5) filter(IsNotNull(1:Int32)), ARRAY[]:List(List(Int32))) as $expr6] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [array_agg($expr5) filter(IsNotNull(1:Int32))] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5, 1:Int32] }
├─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
│ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5, 1:Int32] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom($expr1, $expr2) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5] }
├─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr1] }
│ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) }
│ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] }
│ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
│ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2, Coalesce(array_agg(1:Int32) filter(IsNotNull(1:Int32)), ARRAY[]:List(Int32)) as $expr5] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2], aggs: [array_agg(1:Int32) filter(IsNotNull(1:Int32))] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom($expr2, $expr3) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2, 1:Int32, 1:Int32] }
├─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2], aggs: [] }
│ └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr2] }
│ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) }
│ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] }
│ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
│ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3, 1:Int32, 1:Int32] }
└─LogicalJoin { type: Inner, on: IsNotDistinctFrom(JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))) AND IsNotDistinctFrom(JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))) AND IsNotDistinctFrom($expr3, $expr4) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3] }
├─LogicalProjectSet { select_list: [$0, $1, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] }
│ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3], aggs: [] }
│ └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr3] }
│ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) }
│ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] }
│ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
│ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
└─LogicalProjectSet { select_list: [$0, $1, $2, $3, JsonbEach(Case((JsonbTypeof($3) = 'object':Varchar), $3, '{}':Jsonb))] }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))], aggs: [] }
└─LogicalProjectSet { select_list: [$0, $1, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4], aggs: [] }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr4] }
└─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) }
└─LogicalProjectSet { select_list: [$0, JsonbEach($0)] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
└─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
14 changes: 2 additions & 12 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
LazyLock::new(|| {
OptimizationStage::new(
"General Unnesting(Translate Apply)",
vec![
TranslateApplyRule::create(true),
// Separate the project from a join if necessary because `ApplyJoinTransposeRule`
// can't handle a join with `output_indices`.
ProjectJoinSeparateRule::create(),
],
vec![TranslateApplyRule::create(true)],
ApplyOrder::TopDown,
)
});
Expand All @@ -213,12 +208,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage>
LazyLock::new(|| {
OptimizationStage::new(
"General Unnesting(Translate Apply)",
vec![
TranslateApplyRule::create(false),
// Separate the project from a join if necessary because `ApplyJoinTransposeRule`
// can't handle a join with `output_indices`.
ProjectJoinSeparateRule::create(),
],
vec![TranslateApplyRule::create(false)],
ApplyOrder::TopDown,
)
});
Expand Down
16 changes: 11 additions & 5 deletions src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::expr::{
InputRef,
};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary};
use crate::optimizer::plan_node::{
LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNode, PlanTreeNodeBinary,
};
use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder};
use crate::optimizer::rule::apply_offset_rewriter::ApplyCorrelatedIndicesConverter;
use crate::optimizer::PlanRef;
Expand Down Expand Up @@ -122,10 +124,14 @@ impl Rule for ApplyJoinTransposeRule {
return None;
}

assert!(
join.output_indices_are_trivial(),
"ApplyJoinTransposeRule requires the join containing no output indices, so make sure ProjectJoinSeparateRule is always applied before this rule"
);
// ApplyJoinTransposeRule requires the join containing no output indices, so make sure ProjectJoinSeparateRule is always applied before this rule.
// As this rule will be applied until we reach the fixed point, if the join has output indices, apply ProjectJoinSeparateRule first and return is safety.
if !join.output_indices_are_trivial() {
let new_apply_right = crate::optimizer::rule::ProjectJoinSeparateRule::create()
.apply(join.clone().into())
.unwrap();
return Some(apply.clone_with_inputs(&[apply_left, new_apply_right]));
}

let (push_left, push_right) = match join.join_type() {
// `LeftSemi`, `LeftAnti`, `LeftOuter` can only push to left side if it's right side has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl ProjectJoinSeparateRule {
impl Rule for ProjectJoinSeparateRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let join = plan.as_logical_join()?;
if join.is_full_out() {
if join.output_indices_are_trivial() {
None
} else {
let (left, right, on, join_type, output_indices) = join.clone().decompose();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl StreamManagerService for StreamServiceImpl {
let dependencies = self
.metadata_manager
.catalog_controller
.list_object_dependencies()
.list_created_object_dependencies()
.await?;

Ok(Response::new(ListObjectDependenciesResponse {
Expand Down
Loading

0 comments on commit 7ff35f6

Please sign in to comment.