From 85926ce5319dafcd78c2e8dca5b8e3ac66ecc75a Mon Sep 17 00:00:00 2001 From: aceforeverd Date: Mon, 4 Dec 2023 16:06:47 +0800 Subject: [PATCH] feat(online): union all query (#3590) * feat(online): union query * refactor: group_and_sort_optimized Re-impl column to source info in physical node, support multiple result source info, previously it is one result only. Multiple source result is required for SetOperationNode, like union all, every input node may produce the target column. --- cases/plan/create.yaml | 196 ++++---- cases/plan/error_request_query.yaml | 22 - cases/plan/simple_query.yaml | 85 ++-- cases/plan/union_query.yaml | 303 +++++++++++- cases/query/union_query.yml | 449 ++++++++++++++++++ hybridse/include/codec/row.h | 4 + hybridse/include/node/node_base.h | 2 +- hybridse/include/node/node_enum.h | 13 +- hybridse/include/node/node_manager.h | 2 - hybridse/include/node/plan_node.h | 34 +- hybridse/include/node/sql_node.h | 30 +- hybridse/include/vm/catalog.h | 8 +- hybridse/include/vm/physical_op.h | 66 ++- hybridse/include/vm/schemas_context.h | 25 +- hybridse/src/node/node_manager.cc | 6 +- hybridse/src/node/plan_node.cc | 34 +- hybridse/src/node/sql_node.cc | 46 +- hybridse/src/node/sql_node_test.cc | 2 +- .../physical/group_and_sort_optimized.cc | 247 ++++++---- .../physical/group_and_sort_optimized.h | 12 +- hybridse/src/plan/planner.cc | 95 ++-- hybridse/src/plan/planner.h | 6 +- hybridse/src/planv2/ast_node_converter.cc | 73 +-- hybridse/src/planv2/ast_node_converter.h | 1 - .../src/planv2/ast_node_converter_test.cc | 2 + hybridse/src/planv2/planner_v2_test.cc | 18 +- hybridse/src/testing/engine_test_base.cc | 2 + hybridse/src/vm/catalog_wrapper.cc | 140 ++++++ hybridse/src/vm/catalog_wrapper.h | 148 ++++++ hybridse/src/vm/internal/node_helper.cc | 4 +- hybridse/src/vm/mem_catalog.cc | 17 + hybridse/src/vm/physical_op.cc | 128 ++++- hybridse/src/vm/runner.cc | 74 +++ hybridse/src/vm/runner.h | 73 +-- hybridse/src/vm/runner_builder.cc | 27 +- hybridse/src/vm/schemas_context.cc | 10 +- hybridse/src/vm/transform.cc | 41 +- hybridse/src/vm/transform.h | 2 + src/sdk/sql_sdk_test.h | 2 + 39 files changed, 1902 insertions(+), 547 deletions(-) delete mode 100644 cases/plan/error_request_query.yaml create mode 100644 cases/query/union_query.yml diff --git a/cases/plan/create.yaml b/cases/plan/create.yaml index f1076934391..66bb1ee548c 100644 --- a/cases/plan/create.yaml +++ b/cases/plan/create.yaml @@ -437,44 +437,45 @@ cases: | +-is_constant: 0 +-inner_node_list[list]: +-0: - +-node[kQuery]: kQueryUnion - +-union_type: ALL UNION - +-left: - | +-node[kQuery]: kQuerySelect - | +-distinct_opt: false - | +-where_expr: null - | +-group_expr_list: null - | +-having_expr: null - | +-order_expr_list: null - | +-limit: null - | +-select_list[list]: - | | +-0: - | | +-node[kResTarget] - | | +-val: - | | | +-expr[primary] - | | | +-value: 1 - | | | +-type: int32 - | | +-name: - | +-tableref_list: [] - | +-window_list: [] - +-right: - +-node[kQuery]: kQuerySelect - +-distinct_opt: false - +-where_expr: null - +-group_expr_list: null - +-having_expr: null - +-order_expr_list: null - +-limit: null - +-select_list[list]: - | +-0: - | +-node[kResTarget] - | +-val: - | | +-expr[primary] - | | +-value: 2 - | | +-type: int32 - | +-name: - +-tableref_list: [] - +-window_list: [] + +-node[kQuery]: kQuerySetOperation + +-operator: UNION ALL + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[primary] + | | | +-value: 1 + | | | +-type: int32 + | | +-name: + | +-tableref_list: [] + | +-window_list: [] + +-1: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[primary] + | | +-value: 2 + | | +-type: int32 + | +-name: + +-tableref_list: [] + +-window_list: [] - id: 21_create_procedure desc: create procedure, parameters and union distinct query @@ -515,68 +516,63 @@ cases: | +-is_constant: 0 +-inner_node_list[list]: +-0: - +-node[kQuery]: kQueryUnion - +-union_type: DISTINCT UNION - +-left: - | +-node[kQuery]: kQueryUnion - | +-union_type: DISTINCT UNION - | +-left: - | | +-node[kQuery]: kQuerySelect - | | +-distinct_opt: false - | | +-where_expr: null - | | +-group_expr_list: null - | | +-having_expr: null - | | +-order_expr_list: null - | | +-limit: null - | | +-select_list[list]: - | | | +-0: - | | | +-node[kResTarget] - | | | +-val: - | | | | +-expr[primary] - | | | | +-value: 1 - | | | | +-type: int32 - | | | +-name: - | | +-tableref_list: [] - | | +-window_list: [] - | +-right: - | +-node[kQuery]: kQuerySelect - | +-distinct_opt: false - | +-where_expr: null - | +-group_expr_list: null - | +-having_expr: null - | +-order_expr_list: null - | +-limit: null - | +-select_list[list]: - | | +-0: - | | +-node[kResTarget] - | | +-val: - | | | +-expr[primary] - | | | +-value: 2 - | | | +-type: int32 - | | +-name: - | +-tableref_list: [] - | +-window_list: [] - +-right: - +-node[kQuery]: kQuerySelect - +-distinct_opt: false - +-where_expr: null - +-group_expr_list: null - +-having_expr: null - +-order_expr_list: null - +-limit: null - +-select_list[list]: - | +-0: - | +-node[kResTarget] - | +-val: - | | +-expr[primary] - | | +-value: 3 - | | +-type: int32 - | +-name: - +-tableref_list: [] - +-window_list: [] - - - + +-node[kQuery]: kQuerySetOperation + +-operator: UNION DISTINCT + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[primary] + | | | +-value: 1 + | | | +-type: int32 + | | +-name: + | +-tableref_list: [] + | +-window_list: [] + +-1: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[primary] + | | | +-value: 2 + | | | +-type: int32 + | | +-name: + | +-tableref_list: [] + | +-window_list: [] + +-2: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[primary] + | | +-value: 3 + | | +-type: int32 + | +-name: + +-tableref_list: [] + +-window_list: [] - id: 22 desc: create index 1 diff --git a/cases/plan/error_request_query.yaml b/cases/plan/error_request_query.yaml deleted file mode 100644 index 2339ebda71a..00000000000 --- a/cases/plan/error_request_query.yaml +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright 2021 4Paradigm -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cases: - - id: 0 - desc: resolve请求主表失败 - mode: batch-unsupport - sql: | - select col1, col2 from t1 union all select c1 + c2 as col1, col2 from t2; - expect: - msg: Non-support kUnionPlan Op in online serving diff --git a/cases/plan/simple_query.yaml b/cases/plan/simple_query.yaml index 20c7d8fc042..66cc542fbc0 100644 --- a/cases/plan/simple_query.yaml +++ b/cases/plan/simple_query.yaml @@ -466,53 +466,54 @@ cases: SELECT * FROM t1 UNION ALL SELECT * FROM t3 CONFIG ( zone = 'middle' ); expect: node_tree_str: | - +-node[kQuery]: kQueryUnion + +-node[kQuery]: kQuerySetOperation +-config_options: | +-zone: | +-expr[primary] | +-value: middle | +-type: string - +-union_type: ALL UNION - +-left: - | +-node[kQuery]: kQuerySelect - | +-distinct_opt: false - | +-where_expr: null - | +-group_expr_list: null - | +-having_expr: null - | +-order_expr_list: null - | +-limit: null - | +-select_list[list]: - | | +-0: - | | +-node[kResTarget] - | | +-val: - | | | +-expr[all] - | | +-name: - | +-tableref_list[list]: - | | +-0: - | | +-node[kTableRef]: kTable - | | +-table: t1 - | | +-alias: - | +-window_list: [] - +-right: - +-node[kQuery]: kQuerySelect - +-distinct_opt: false - +-where_expr: null - +-group_expr_list: null - +-having_expr: null - +-order_expr_list: null - +-limit: null - +-select_list[list]: - | +-0: - | +-node[kResTarget] - | +-val: - | | +-expr[all] - | +-name: - +-tableref_list[list]: - | +-0: - | +-node[kTableRef]: kTable - | +-table: t3 - | +-alias: - +-window_list: [] + +-operator: UNION ALL + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t1 + | | +-alias: + | +-window_list: [] + +-1: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[all] + | +-name: + +-tableref_list[list]: + | +-0: + | +-node[kTableRef]: kTable + | +-table: t3 + | +-alias: + +-window_list: [] - id: array desc: | array query diff --git a/cases/plan/union_query.yaml b/cases/plan/union_query.yaml index 9f6ac186775..2326ba1b2b0 100644 --- a/cases/plan/union_query.yaml +++ b/cases/plan/union_query.yaml @@ -13,29 +13,298 @@ # limitations under the License. cases: + - id: 0 + desc: DISTINCT UNION t1 t2 + sql: SELECT * FROM t1 UNION DISTINCT SELECT * FROM t2 UNION DISTINCT SELECT * FROM t3; + expect: + node_tree_str: | + +-node[kQuery]: kQuerySetOperation + +-operator: UNION DISTINCT + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t1 + | | +-alias: + | +-window_list: [] + +-1: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t2 + | | +-alias: + | +-window_list: [] + +-2: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[all] + | +-name: + +-tableref_list[list]: + | +-0: + | +-node[kTableRef]: kTable + | +-table: t3 + | +-alias: + +-window_list: [] - id: 1 - desc: 简单UNION两张表 - mode: request-unsupport + desc: UNION ALL t1 t2 sql: SELECT * FROM t1 UNION ALL SELECT * FROM t2; + expect: + node_tree_str: | + +-node[kQuery]: kQuerySetOperation + +-operator: UNION ALL + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t1 + | | +-alias: + | +-window_list: [] + +-1: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[all] + | +-name: + +-tableref_list[list]: + | +-0: + | +-node[kTableRef]: kTable + | +-table: t2 + | +-alias: + +-window_list: [] + plan_tree_str: | + +-[kQueryPlan] + +-[kSetOperationPlan] + +-operator: UNION ALL + +-children[list]: + +-[kQueryPlan] + +-[kProjectPlan] + +-table: t1 + +-project_list_vec[list]: + +-[kProjectList] + +-projects on table [list]: + +-[kProjectNode] + +-[0]*: * + +-[kTablePlan] + +-table: t1 + +-[kQueryPlan] + +-[kProjectPlan] + +-table: t2 + +-project_list_vec[list]: + +-[kProjectList] + +-projects on table [list]: + +-[kProjectNode] + +-[0]*: * + +-[kTablePlan] + +-table: t2 - id: 2 - desc: DISTINCT UNION t1 t2 - mode: request-unsupport - sql: SELECT * FROM t1 UNION DISTINCT SELECT * FROM t2; + desc: UNION must follow by ALL or DISTINCT + sql: SELECT * FROM t1 UNION select * FROM t2; + expect: + success: false + msg: | + Syntax error: Expected keyword ALL or keyword DISTINCT but got keyword SELECT [at 1:24] + SELECT * FROM t1 UNION select * FROM t2; + ^ - id: 3 - desc: UNION ALL t1 t2 - mode: request-unsupport - sql: SELECT * FROM t1 UNION ALL SELECT * FROM t2; + desc: UNION with exact DINSTINCT TYPE + sql: SELECT * FROM t1 UNION ALL select * FROM t2 UNION DISTINCT SELECT * FROM t3; + expect: + success: false + msg: | + Syntax error: Different set operations cannot be used in the same query without using parentheses for grouping [at 1:45] + SELECT * FROM t1 UNION ALL select * FROM t2 UNION DISTINCT SELECT * FROM t3; + ^ - id: 4 - desc: UNION ALL t1 t2 t3 - mode: request-unsupport - sql: SELECT * FROM t1 UNION ALL SELECT * FROM t2 UNION SELECT * FROM t3; + desc: for query with order by or limit, must quote with parantheses + sql: SELECT * FROM t1 LIMIT 10 UNION ALL select * FROM t2 + expect: + success: false + msg: | + Syntax error: Expected end of input but got keyword UNION [at 1:27] + SELECT * FROM t1 LIMIT 10 UNION ALL select * FROM t2 + ^ - id: 5 - desc: 两个拼表子查询UNION - mode: request-unsupport - sql: SELECT * FROM t1 left join t2 on t1.col1 = t2.col2 UNION ALL SELECT * FROM t3 UNION SELECT * FROM t4; + sql: (SELECT * FROM t1 LIMIT 10) UNION ALL (select * FROM t2 UNION DISTINCT select * from t3) + expect: + node_tree_str: | + +-node[kQuery]: kQuerySetOperation + +-operator: UNION ALL + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: + | | +-node[kLimit] + | | +-limit_cnt: 10 + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t1 + | | +-alias: + | +-window_list: [] + +-1: + +-node[kQuery]: kQuerySetOperation + +-operator: UNION DISTINCT + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t2 + | | +-alias: + | +-window_list: [] + +-1: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[all] + | +-name: + +-tableref_list[list]: + | +-0: + | +-node[kTableRef]: kTable + | +-table: t3 + | +-alias: + +-window_list: [] + plan_tree_str: | + +-[kQueryPlan] + +-[kSetOperationPlan] + +-operator: UNION ALL + +-children[list]: + +-[kQueryPlan] + +-[kLimitPlan] + +-limit_cnt: 10 + +-[kProjectPlan] + +-table: t1 + +-project_list_vec[list]: + +-[kProjectList] + +-projects on table [list]: + +-[kProjectNode] + +-[0]*: * + +-[kTablePlan] + +-table: t1 + +-[kQueryPlan] + +-[kSetOperationPlan] + +-operator: UNION DISTINCT + +-children[list]: + +-[kQueryPlan] + +-[kProjectPlan] + +-table: t2 + +-project_list_vec[list]: + +-[kProjectList] + +-projects on table [list]: + +-[kProjectNode] + +-[0]*: * + +-[kTablePlan] + +-table: t2 + +-[kQueryPlan] + +-[kProjectPlan] + +-table: t3 + +-project_list_vec[list]: + +-[kProjectList] + +-projects on table [list]: + +-[kProjectNode] + +-[0]*: * + +-[kTablePlan] + +-table: t3 - id: 6 - desc: 两个复杂子查询UNION + desc: config option only append at end of query statement, not expression sql: | - SELECT sum(COL1) as col1sum, * FROM t1 where col2 > 10 group by COL1, COL2 having col1sum > 0 order by - COL1+COL2 limit 10 UNION ALL SELECT sum(COL1) as col1sum, * FROM t1 group by COL1, COL2 having sum(COL1) > 0; + (SELECT * FROM t1 LIMIT 10 config ( c = 'c')) + UNION SELECT * FROM t2 + config (c = 'c') + expect: + success: false + msg: | + Syntax error: Expected ")" but got keyword CONFIG [at 1:28] + (SELECT * FROM t1 LIMIT 10 config ( c = 'c')) + ^ diff --git a/cases/query/union_query.yml b/cases/query/union_query.yml new file mode 100644 index 00000000000..1ad6ab41e27 --- /dev/null +++ b/cases/query/union_query.yml @@ -0,0 +1,449 @@ +cases: + - id: 0 + inputs: + - name: t1 + columns: ["id string","c2 int","c4 timestamp"] + indexs: ["index1:id:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - ["cc",40,1000] + - ["dd",50,1000] + - name: t2 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",1, 2000] + - ["bb",2, 3000] + - ["cc",3, 3000] + - name: t3 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",4, 4000] + - ["bb",5, 2000] + - ["cc",6, 3000] + - ["dd",7, 3000] + # Key + TS 完全相同情况下, SQL 语义上顺序是 undefined. + # 在线模式实现上遵守此隐式规则: 更右边的 排在输出临时表的更后面, 因此首先被 LAST JOIN 选中. + # 此规则不具有一致性, 离线模式不一定遵守. + # Segment 迭代器是从 TS 最大 -> 最小 + sql: | + select t1.id as c1, tx.c1 as tx1, tx.val as val + from t1 last join ( + select * from t2 union all select * from t3 + ) tx + on t1.id = tx.c1 + batch_plan: | + SIMPLE_PROJECT(sources=(t1.id -> c1, tx.c1 -> tx1, tx.val)) + JOIN(type=LastJoin, condition=, left_keys=(), right_keys=(), index_keys=(t1.id)) + DATA_PROVIDER(table=t1) + RENAME(name=tx) + SET_OPERATION(UNION ALL) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(t1.id -> c1, tx.c1 -> tx1, tx.val)) + REQUEST_JOIN(type=kJoinTypeConcat) + DATA_PROVIDER(request=t1) + REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, condition=, left_keys=(), right_keys=(), index_keys=(#4)) + SIMPLE_PROJECT(sources=(#4 -> t1.id)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SET_OPERATION(UNION ALL) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + expect: + order: c1 + columns: ["c1 string", "tx1 string", "val int"] + data: | + aa, aa, 4 + bb, bb, 2 + cc, cc, 6 + dd, dd, 7 + - id: 1 + desc: select project over union + inputs: + - name: t1 + columns: ["id string","c2 int","c4 timestamp"] + indexs: ["index1:id:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - ["cc",40,1000] + - ["dd",50,1000] + - name: t2 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",1, 2000] + - ["bb",2, 3000] + - ["cc",3, 3000] + - name: t3 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",4, 4000] + - ["bb",5, 2000] + - ["cc",6, 3000] + - ["dd",7, 3000] + sql: | + select t1.id as c1, tx.key as tx1, tx.val as val + from t1 last join ( + select c1 as key, val, c4 from + (select * from t2 union all select * from t3) + ) tx + on t1.id = tx.key + batch_plan: | + SIMPLE_PROJECT(sources=(t1.id -> c1, tx.key -> tx1, tx.val)) + JOIN(type=LastJoin, condition=, left_keys=(), right_keys=(), index_keys=(t1.id)) + DATA_PROVIDER(table=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(c1 -> key, val, c4)) + SET_OPERATION(UNION ALL) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(t1.id -> c1, tx.key -> tx1, tx.val)) + REQUEST_JOIN(type=kJoinTypeConcat) + DATA_PROVIDER(request=t1) + REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, condition=, left_keys=(), right_keys=(), index_keys=(#4)) + SIMPLE_PROJECT(sources=(#4 -> t1.id)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(c1 -> key, val, c4)) + SET_OPERATION(UNION ALL) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + expect: + order: c1 + columns: ["c1 string", "tx1 string", "val int"] + data: | + aa, aa, 4 + bb, bb, 2 + cc, cc, 6 + dd, dd, 7 + - id: 2 + desc: lastjoin(union(filter(t2), t3)) + mode: request-unsupport + inputs: + - name: t1 + columns: ["id string","c2 int","c4 timestamp"] + indexs: ["index1:id:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - ["cc",40,1000] + - ["dd",50,1000] + - name: t2 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",1, 2000] + - ["bb",2, 3000] + - ["cc",3, 3000] + - name: t3 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",4, 4000] + - ["bb",5, 2000] + - ["cc",6, 3000] + - ["dd",7, 3000] + # FILTER optimzed t2, LASTJOIN optimzed t3, optimized index key not match + sql: | + select t1.id as c1, tx.c1 as tx1, tx.val as val + from t1 last join ( + select * from t2 where c1 = 'aa' union all select * from t3 + ) tx + order by tx.c4 + on t1.id = tx.c1 + batch_plan: | + SIMPLE_PROJECT(sources=(t1.id -> c1, tx.c1 -> tx1, tx.val)) + JOIN(type=LastJoin, right_sort=(tx.c4 ASC), condition=, left_keys=(t1.id), right_keys=(tx.c1), index_keys=) + DATA_PROVIDER(table=t1) + RENAME(name=tx) + SET_OPERATION(UNION ALL) + FILTER_BY(condition=, left_keys=(), right_keys=(), index_keys=(aa)) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + DATA_PROVIDER(table=t3) + expect: + order: c1 + columns: ["c1 string", "tx1 string", "val int"] + data: | + aa, aa, 4 + bb, bb, 5 + cc, cc, 6 + dd, dd, 7 + - id: 3 + desc: lastjoin(filter(union) + inputs: + - name: t1 + columns: ["id string","c2 int","c4 timestamp"] + indexs: ["index1:id:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - ["cc",40,1000] + - ["dd",50,1000] + - name: t2 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",1, 2000] + - ["bb",2, 3000] + - ["cc",3, 3000] + - name: t3 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",4, 4000] + - ["bb",5, 2000] + - ["cc",6, 3000] + - ["dd",7, 3000] + sql: | + select t1.id as c1, tx.c1 as tx1, tx.val as val + from t1 last join ( + select * from + (select * from t2 union all select * from t3) + where val < 5 + ) tx + on t1.id = tx.c1 + batch_plan: | + SIMPLE_PROJECT(sources=(t1.id -> c1, tx.c1 -> tx1, tx.val)) + JOIN(type=LastJoin, condition=, left_keys=(), right_keys=(), index_keys=(t1.id)) + DATA_PROVIDER(table=t1) + RENAME(name=tx) + FILTER_BY(condition=val < 5, left_keys=, right_keys=, index_keys=) + SET_OPERATION(UNION ALL) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(t1.id -> c1, tx.c1 -> tx1, tx.val)) + REQUEST_JOIN(type=kJoinTypeConcat) + DATA_PROVIDER(request=t1) + REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, condition=, left_keys=(), right_keys=(), index_keys=(#4)) + SIMPLE_PROJECT(sources=(#4 -> t1.id)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + FILTER_BY(condition=val < 5, left_keys=, right_keys=, index_keys=) + SET_OPERATION(UNION ALL) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + expect: + order: c1 + columns: ["c1 string", "tx1 string", "val int"] + data: | + aa, aa, 4 + bb, bb, 2 + cc, cc, 3 + dd, NULL, NULL + - id: 4 + desc: lastjoin(filter(union) + inputs: + - name: t1 + columns: ["id string","c2 int","c4 timestamp"] + indexs: ["index1:id:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - ["cc",40,1000] + - ["dd",50,1000] + - name: t2 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",1, 2000] + - ["bb",2, 3000] + - ["cc",3, 3000] + - name: t3 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",4, 4000] + - ["bb",5, 2000] + - ["cc",6, 3000] + - ["dd",7, 3000] + sql: | + select t1.id as c1, tx.c1 as tx1, tx.val as val + from t1 last join ( + select * from + (select * from t2 union all select * from t3) + where c1 = "bb" + ) tx + order by tx.c4 + on t1.id = tx.c1 + batch_plan: | + SIMPLE_PROJECT(sources=(t1.id -> c1, tx.c1 -> tx1, tx.val)) + JOIN(type=LastJoin, right_sort=(ASC), condition=, left_keys=(t1.id), right_keys=(tx.c1), index_keys=) + DATA_PROVIDER(table=t1) + RENAME(name=tx) + FILTER_BY(condition=, left_keys=(), right_keys=(), index_keys=(bb)) + SET_OPERATION(UNION ALL) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + request_plan: | + SIMPLE_PROJECT(sources=(t1.id -> c1, tx.c1 -> tx1, tx.val)) + REQUEST_JOIN(type=LastJoin, right_sort=(ASC), condition=, left_keys=(t1.id), right_keys=(tx.c1), index_keys=) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + FILTER_BY(condition=, left_keys=(), right_keys=(), index_keys=(bb)) + SET_OPERATION(UNION ALL) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + expect: + order: c1 + columns: ["c1 string", "tx1 string", "val int"] + data: | + aa, NULL, NULL + bb, bb, 2 + cc, NULL, NULL + dd, NULL, NULL + - id: 5 + desc: union(filter(t2), filter(t3)) + inputs: + - name: t0 + columns: ["c1 string","id int","c4 timestamp"] + indexs: ["index1:id:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - name: t1 + columns: ["id string","c2 int","c4 timestamp"] + indexs: ["index1:id:c4", "index2:c2:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - name: t2 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",1, 2000] + - ["bb",2, 3000] + - ["cc",3, 3000] + - name: t3 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",4, 4000] + - ["bb",5, 2000] + - ["cc",6, 3000] + - ["dd",7, 3000] + sql: | + select t0.id, tr.* + from t0 + last join ( + select t1.id as c1, t1.c2, tx.c1 as tx1, tx.val as val + from t1 left join ( + select * from t2 where val != 1 union all select * from t3 where val < 6 + ) tx + on t1.id = tx.c1 + ) tr + on t0.id = tr.c2 and tr.val > 4 + request_plan: | + SIMPLE_PROJECT(sources=(t0.id, tr.c1, tr.c2, tr.tx1, tr.val)) + REQUEST_JOIN(type=LastJoin, condition=tr.val > 4, left_keys=(), right_keys=(), index_keys=(t0.id)) + DATA_PROVIDER(request=t0) + RENAME(name=tr) + SIMPLE_PROJECT(sources=(t1.id -> c1, t1.c2, tx.c1 -> tx1, tx.val)) + REQUEST_JOIN(type=LeftJoin, condition=, left_keys=(), right_keys=(), index_keys=(t1.id)) + DATA_PROVIDER(type=Partition, table=t1, index=index2) + RENAME(name=tx) + SET_OPERATION(UNION ALL) + FILTER_BY(condition=val != 1, left_keys=, right_keys=, index_keys=) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + FILTER_BY(condition=val < 6, left_keys=, right_keys=, index_keys=) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + expect: + order: id + columns: ["id int", "c1 string", "c2 int", "tx1 string", "val int"] + data: | + 20, NULL, NULL, NULL, NULL + 30, bb, 30, bb, 5 + - id: 6 + desc: nested union + inputs: + - name: t1 + columns: ["id string","c2 int","c4 timestamp"] + indexs: ["index1:id:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - ["cc",40,1000] + - ["dd",50,1000] + - name: t2 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",1, 2000] + - ["bb",2, 3000] + - ["cc",3, 3000] + - name: t3 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",4, 4000] + - ["bb",5, 2000] + - ["cc",6, 3000] + - ["dd",7, 3000] + - name: t4 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["bb",8, 10000] + - ["cc",9, 10000] + sql: | + select t1.id as c1, tx.c1 as tx1, tx.val as val + from t1 last join ( + select * from t2 union all + (select * from t3 where val < 6 union all select * from t4) + ) tx + on t1.id = tx.c1 + request_plan: | + SIMPLE_PROJECT(sources=(t1.id -> c1, tx.c1 -> tx1, tx.val)) + REQUEST_JOIN(type=LastJoin, condition=, left_keys=(), right_keys=(), index_keys=(t1.id)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SET_OPERATION(UNION ALL) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + SET_OPERATION(UNION ALL) + FILTER_BY(condition=val < 6, left_keys=, right_keys=, index_keys=) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + DATA_PROVIDER(type=Partition, table=t4, index=index1) + expect: + order: c1 + columns: ["c1 string", "tx1 string", "val int"] + data: | + aa, aa, 4 + bb, bb, 8 + cc, cc, 9 + dd, NULL, NULL + + # unsupported all modes + - id: 7 + desc: window over union + mode: rtidb-unsupport + inputs: + - name: t2 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",1, 2000] + - ["bb",2, 3000] + - ["cc",3, 3000] + - name: t3 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",4, 4000] + - ["bb",5, 2000] + - ["cc",6, 3000] + - ["dd",7, 3000] + sql: | + select c1, sum(val) over w as agg + from ( + select * from t2 union all select * from t3 + ) tx + window w as (partition by c1 order by c4 + rows_range between 2s preceding and current row) + diff --git a/hybridse/include/codec/row.h b/hybridse/include/codec/row.h index 69158d41e85..09ba4741090 100644 --- a/hybridse/include/codec/row.h +++ b/hybridse/include/codec/row.h @@ -66,6 +66,10 @@ class Row { // > 0 iff "*this" > "b" int compare(const Row &b) const; + friend bool operator<(const Row &lhs, const Row &rhs) { return lhs.compare(rhs) < 0; } + friend bool operator>(const Row &lhs, const Row &rhs) { return lhs.compare(rhs) > 0; } + friend bool operator==(const Row &lhs, const Row &rhs) { return lhs.compare(rhs) == 0; } + int8_t **GetRowPtrs() const; int32_t GetRowPtrCnt() const; diff --git a/hybridse/include/node/node_base.h b/hybridse/include/node/node_base.h index 97ba5cf8e49..8aa678c90a8 100644 --- a/hybridse/include/node/node_base.h +++ b/hybridse/include/node/node_base.h @@ -89,7 +89,7 @@ class NodeBase : public base::FeBaseObject { template bool EqualsOverride(const T* other, Pred&& pred) const { auto lhs = dynamic_cast(this); - if (lhs != nullptr) { + if (lhs == nullptr) { return false; } diff --git a/hybridse/include/node/node_enum.h b/hybridse/include/node/node_enum.h index baa3bdb2afe..7c9ebf0ecbe 100644 --- a/hybridse/include/node/node_enum.h +++ b/hybridse/include/node/node_enum.h @@ -112,9 +112,9 @@ enum TableRefType { }; enum QueryType { - kQuerySelect, + kQuerySelect = 0, kQuerySub, - kQueryUnion, + kQuerySetOperation, }; enum ExprType { kExprUnknow = -1, @@ -255,8 +255,11 @@ enum JoinType { kJoinTypeCross, // AKA commma join }; -enum UnionType { kUnionTypeDistinct, kUnionTypeAll }; - +enum class SetOperationType { + UNION, + EXCEPT, + INTERSECT, +}; enum CmdType { kCmdCreateDatabase = 0, kCmdUseDatabase, @@ -304,7 +307,7 @@ enum PlanType { kPlanTypeFilter, kPlanTypeTable, kPlanTypeJoin, - kPlanTypeUnion, + kPlanTypeSetOperation, kPlanTypeSort, kPlanTypeGroup, kPlanTypeDistinct, diff --git a/hybridse/include/node/node_manager.h b/hybridse/include/node/node_manager.h index e70f0a59564..6949faf6f88 100644 --- a/hybridse/include/node/node_manager.h +++ b/hybridse/include/node/node_manager.h @@ -104,8 +104,6 @@ class NodeManager { ExprListNode *group_expr_list, ExprNode *having_expr, ExprNode *order_expr_list, SqlNodeList *window_list_ptr, SqlNode *limit_ptr); - QueryNode *MakeUnionQueryNode(QueryNode *left, QueryNode *right, - bool is_all); TableRefNode *MakeTableNode(const std::string &name, const std::string &alias); TableRefNode *MakeTableNode(const std::string& db, diff --git a/hybridse/include/node/plan_node.h b/hybridse/include/node/plan_node.h index 096ae86aae1..3085b27c699 100644 --- a/hybridse/include/node/plan_node.h +++ b/hybridse/include/node/plan_node.h @@ -151,16 +151,6 @@ class JoinPlanNode : public BinaryPlanNode { const ExprNode *condition_; }; -class UnionPlanNode : public BinaryPlanNode { - public: - UnionPlanNode(PlanNode *left, PlanNode *right, bool is_all) - : BinaryPlanNode(kPlanTypeUnion, left, right), is_all(is_all) {} - void Print(std::ostream &output, const std::string &org_tab) const override; - virtual bool Equals(const PlanNode *that) const; - const bool is_all; - std::shared_ptr config_options_; -}; - class CrossProductPlanNode : public BinaryPlanNode { public: CrossProductPlanNode(PlanNode *left, PlanNode *right) : BinaryPlanNode(kPlanTypeJoin, left, right) {} @@ -204,6 +194,7 @@ class ProjectPlanNode : public UnaryPlanNode { class QueryPlanNode : public UnaryPlanNode { public: + QueryPlanNode() : UnaryPlanNode(kPlanTypeQuery) {} explicit QueryPlanNode(PlanNode *node) : UnaryPlanNode(node, kPlanTypeQuery) {} ~QueryPlanNode() {} void Print(std::ostream &output, const std::string &org_tab) const override; @@ -213,6 +204,29 @@ class QueryPlanNode : public UnaryPlanNode { std::shared_ptr config_options_; }; +class SetOperationPlanNode : public MultiChildPlanNode { + public: + SetOperationPlanNode(SetOperationType type, absl::Span input, bool distinct) + : MultiChildPlanNode(kPlanTypeSetOperation), op_type_(type), inputs_(input), distinct_(distinct) { + for (auto n : input) { + AddChild(n); + } + } + ~SetOperationPlanNode() override {} + + const absl::Span &inputs() const { return inputs_; } + SetOperationType op_type() const { return op_type_; } + bool distinct() const { return distinct_; } + + void Print(std::ostream &output, const std::string &org_tab) const override; + bool Equals(const PlanNode *that) const override; + + private: + SetOperationType op_type_; + absl::Span inputs_; + bool distinct_ = false; +}; + class WithClauseEntryPlanNode : public UnaryPlanNode { public: WithClauseEntryPlanNode(std::string alias, QueryPlanNode *query) diff --git a/hybridse/include/node/sql_node.h b/hybridse/include/node/sql_node.h index 30f7a6cc34a..8d641ad8283 100644 --- a/hybridse/include/node/sql_node.h +++ b/hybridse/include/node/sql_node.h @@ -55,6 +55,8 @@ std::string NameOfSqlNodeType(const SqlNodeType &type); absl::string_view CmdTypeName(const CmdType type); +std::string SetOperatorName(SetOperationType type, bool dis); + inline const std::string ExplainTypeName(const ExplainType &explain_type) { switch (explain_type) { case kExplainLogical: @@ -177,8 +179,8 @@ inline const std::string QueryTypeName(const QueryType &type) { switch (type) { case kQuerySelect: return "kQuerySelect"; - case kQueryUnion: - return "kQueryUnion"; + case kQuerySetOperation: + return "kQuerySetOperation"; case kQuerySub: return "kQuerySub"; default: { @@ -782,15 +784,23 @@ class SelectQueryNode : public QueryNode { bool last_item) const; }; -class UnionQueryNode : public QueryNode { +class SetOperationNode : public QueryNode { public: - UnionQueryNode(const QueryNode *left, const QueryNode *right, bool is_all) - : QueryNode(kQueryUnion), left_(left), right_(right), is_all_(is_all) {} - void Print(std::ostream &output, const std::string &org_tab) const; - virtual bool Equals(const SqlNode *node) const; - const QueryNode *left_; - const QueryNode *right_; - const bool is_all_; + SetOperationNode(SetOperationType type, absl::Span input, bool distinct) + : QueryNode(kQuerySetOperation), op_type_(type), inputs_(input), distinct_(distinct) {} + ~SetOperationNode() override {} + + const absl::Span &inputs() const { return inputs_; } + SetOperationType op_type() const { return op_type_; } + bool distinct() const { return distinct_; } + + void Print(std::ostream &output, const std::string &org_tab) const override; + bool Equals(const SqlNode *node) const override; + + private: + SetOperationType op_type_; + absl::Span inputs_; + bool distinct_ = false; }; class ParameterExpr : public ExprNode { diff --git a/hybridse/include/vm/catalog.h b/hybridse/include/vm/catalog.h index 4bd007645bd..62b429b0292 100644 --- a/hybridse/include/vm/catalog.h +++ b/hybridse/include/vm/catalog.h @@ -257,6 +257,8 @@ class TableHandler : public DataHandler { virtual std::shared_ptr GetTablet(const std::string& index_name, const std::vector& pks) { return std::shared_ptr(); } + + static std::shared_ptr Cast(std::shared_ptr in); }; /// \brief A table dataset's error handler, representing a error table @@ -335,9 +337,7 @@ class PartitionHandler : public TableHandler { // Return null by default RowIterator* GetRawIterator() override { return nullptr; } - std::unique_ptr GetWindowIterator(const std::string& idx_name) override { - return std::unique_ptr(); - } + using TableHandler::GetWindowIterator; /// Return WindowIterator to iterate datasets /// segment-by-segment. @@ -366,6 +366,8 @@ class PartitionHandler : public TableHandler { const std::string GetHandlerTypeName() override { return "PartitionHandler"; } + + static std::shared_ptr Cast(std::shared_ptr in); }; /// \brief A wrapper of table handler which is used as a asynchronous row diff --git a/hybridse/include/vm/physical_op.h b/hybridse/include/vm/physical_op.h index dd51c73bfd1..03b0be96e07 100644 --- a/hybridse/include/vm/physical_op.h +++ b/hybridse/include/vm/physical_op.h @@ -47,7 +47,7 @@ enum PhysicalOpType { kPhysicalOpRename, kPhysicalOpDistinct, kPhysicalOpJoin, - kPhysicalOpUnion, + kPhysicalOpSetOperation, kPhysicalOpWindow, kPhysicalOpIndexSeek, kPhysicalOpRequestUnion, @@ -153,6 +153,7 @@ class FnComponent { // Sort Component can only handle single order expressions class Sort : public FnComponent { public: + Sort() = default; explicit Sort(const node::OrderByNode *orders) : orders_(orders) {} virtual ~Sort() {} @@ -184,6 +185,8 @@ class Sort : public FnComponent { base::Status ReplaceExpr(const passes::ExprReplacer &replacer, node::NodeManager *nm, Sort *out) const; + Sort ShadowCopy() { return Sort(orders_); } + const node::OrderByNode *orders_; }; @@ -285,7 +288,6 @@ class Key : public FnComponent { const node::ExprListNode *keys() const { return keys_; } void set_keys(const node::ExprListNode *keys) { keys_ = keys; } - const node::ExprListNode *PhysicalProjectNode() const { return keys_; } const std::string FnDetail() const { return "keys=" + fn_info_.fn_name(); } void ResolvedRelatedColumns( @@ -296,6 +298,8 @@ class Key : public FnComponent { base::Status ReplaceExpr(const passes::ExprReplacer &replacer, node::NodeManager *nm, Key *out) const; + Key ShadowCopy() { return Key(keys_); } + const node::ExprListNode *keys_; }; @@ -384,11 +388,18 @@ class PhysicalOpNode : public node::NodeBase { // get a list of PhysicalOpNodes that current node depends on virtual std::vector GetDependents() const; + // trace from column ID in current node, to possible producre nodes (one level down) + // multiple results may returned, indicating column of ID `col_id` may comes from any of the result list + virtual absl::StatusOr TraceColID(size_t col_id) const; + + virtual absl::StatusOr TraceColID(absl::string_view col_name) const; + + absl::StatusOr TraceLastDescendants(size_t col_id) const; + const std::vector &GetProducers() const { return producers_; } std::vector &producers() { return producers_; } - void UpdateProducer(int i, PhysicalOpNode *producer); void AddProducer(PhysicalOpNode *producer) { producers_.push_back(producer); @@ -445,7 +456,7 @@ class PhysicalOpNode : public node::NodeBase { // limit always >= 0 so it is safe to do that int32_t GetLimitCntValue() const { return limit_cnt_.value_or(-1); } - bool IsSameSchema(const vm::Schema &schema, const vm::Schema &exp_schema) const; + static bool IsSameSchema(const codec::Schema* schema, const codec::Schema* exp_schema); // `lhs` schema contains `rhs` and is start with `rhs` schema // @@ -834,7 +845,6 @@ class PhysicalGroupAggrerationNode : public PhysicalProjectNode { Key group_; }; -class PhysicalUnionNode; class PhysicalJoinNode; class WindowOp { @@ -1410,23 +1420,41 @@ class PhysicalRequestJoinNode : public PhysicalBinaryNode { } }; -class PhysicalUnionNode : public PhysicalBinaryNode { +class PhysicalSetOperationNode : public PhysicalOpNode { public: - PhysicalUnionNode(PhysicalOpNode *left, PhysicalOpNode *right, bool is_all) - : PhysicalBinaryNode(left, right, kPhysicalOpUnion, true), - is_all_(is_all) { - output_type_ = kSchemaTypeTable; + PhysicalSetOperationNode(node::SetOperationType type, absl::Span inputs, bool distinct) + : PhysicalOpNode(kPhysicalOpSetOperation, false), op_type_(type), distinct_(distinct) { + for (auto n : inputs) { + AddProducer(n); + } + bool group_optimized = true; + for (auto n : producers_) { + if (n-> GetOutputType() != kSchemaTypeGroup) { + group_optimized = false; + break; + } + } + + if (group_optimized && op_type_ == node::SetOperationType::UNION) { + output_type_ = kSchemaTypeGroup; + } else { + output_type_ = kSchemaTypeTable; + } } - virtual ~PhysicalUnionNode() {} - base::Status InitSchema(PhysicalPlanContext *) override; - virtual void Print(std::ostream &output, const std::string &tab) const; + ~PhysicalSetOperationNode() override {} - base::Status WithNewChildren(node::NodeManager *nm, - const std::vector &children, + base::Status InitSchema(PhysicalPlanContext *) override; + void Print(std::ostream &output, const std::string &tab) const override; + base::Status WithNewChildren(node::NodeManager *nm, const std::vector &children, PhysicalOpNode **out) override; - const bool is_all_; - static PhysicalUnionNode *CastFrom(PhysicalOpNode *node); + absl::StatusOr TraceColID(size_t col_id) const override; + + absl::StatusOr TraceColID(absl::string_view col_name) const override; + + node::SetOperationType op_type_; + const bool distinct_ = false; + static PhysicalSetOperationNode *CastFrom(PhysicalOpNode *node); }; class PhysicalPostRequestUnionNode : public PhysicalBinaryNode { @@ -1515,9 +1543,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { << "Fail to add window union : producer is empty or null"; return false; } - if (output_request_row() && - !IsSameSchema(*node->GetOutputSchema(), - *producers_[0]->GetOutputSchema())) { + if (output_request_row() && !IsSameSchema(node->GetOutputSchema(), producers_[0]->GetOutputSchema())) { LOG(WARNING) << "Union Table and window input schema aren't consistent"; return false; diff --git a/hybridse/include/vm/schemas_context.h b/hybridse/include/vm/schemas_context.h index b2e68d9477a..d5c8536addd 100644 --- a/hybridse/include/vm/schemas_context.h +++ b/hybridse/include/vm/schemas_context.h @@ -79,6 +79,13 @@ class SchemaSource { std::vector source_child_column_ids_; }; +// backtrace info for column in current node to producer nodes +// [(producer index, producer column id)] +typedef std::vector> ColProducerTraceInfo; + +// [(source node, source column id)] +typedef std::vector> ColLastDescendantTraceInfo; + /** * Utility context to resolve column spec into detailed column information. * This class should be explicitly initialized with schema source list info @@ -140,16 +147,13 @@ class SchemasContext { /** * Resolve source column by relation name and column name recursively. - * If it can be resolved in current node, `child_path_id` is -1, - * else `child_path_id` is the index of the child which the column + * If it can't be resolved in current node, `child_path_idx` is -1, + * otherwise `child_path_idx` is the index of the child which the column * is resolved from. */ - base::Status ResolveColumnID(const std::string& db_name, - const std::string& relation_name, - const std::string& column_name, - size_t* column_id, int* child_path_idx, - size_t* child_column_id, - size_t* source_column_id, + base::Status ResolveColumnID(const std::string& db_name, const std::string& relation_name, + const std::string& column_name, size_t* column_id, int* child_path_idx, + size_t* child_column_id, size_t* source_column_id, const PhysicalOpNode** source_node) const; /** @@ -244,6 +248,9 @@ class SchemasContext { void BuildTrivial(const std::vector& schemas); void BuildTrivial(const std::string& default_db, const std::vector& tables); + // {db}.{table}({col_name}:{col_type}, ...) + std::string ReadableString() const { return ""; } + std::string DebugString() const; friend std::ostream& operator<<(std::ostream& os, const SchemasContext& sc) { return os << sc.DebugString(); } @@ -266,7 +273,7 @@ class SchemasContext { std::map>> column_name_map_; // child source mapping - // child idx -> (child column id -> column idx) + // child idx -> (child column id (in child node) -> column id (in current node)) std::map> child_source_map_; // schema source parts diff --git a/hybridse/src/node/node_manager.cc b/hybridse/src/node/node_manager.cc index f60ba20d6b2..86d51249e19 100644 --- a/hybridse/src/node/node_manager.cc +++ b/hybridse/src/node/node_manager.cc @@ -19,6 +19,7 @@ #include #include #include +#include "node/sql_node.h" namespace hybridse { namespace node { @@ -43,11 +44,6 @@ QueryNode *NodeManager::MakeSelectQueryNode(bool is_distinct, SqlNodeList *selec return node_ptr; } -QueryNode *NodeManager::MakeUnionQueryNode(QueryNode *left, QueryNode *right, bool is_all) { - UnionQueryNode *node_ptr = new UnionQueryNode(left, right, is_all); - RegisterNode(node_ptr); - return node_ptr; -} TableRefNode *NodeManager::MakeTableNode(const std::string &name, const std::string &alias) { return MakeTableNode("", name, alias); } diff --git a/hybridse/src/node/plan_node.cc b/hybridse/src/node/plan_node.cc index 9a613c8d0a4..f601696e605 100644 --- a/hybridse/src/node/plan_node.cc +++ b/hybridse/src/node/plan_node.cc @@ -42,7 +42,7 @@ bool PlanNode::Equals(const PlanNode *that) const { if (nullptr == that || type_ != that->type_) { return false; } - return PlanListEquals(this->children_, that->children_); + return type_ == that->GetType() && PlanListEquals(this->children_, that->children_); } void PlanNode::PrintChildren(std::ostream &output, const std::string &tab) const { @@ -184,8 +184,8 @@ std::string NameOfPlanNodeType(const PlanType &type) { return std::string("kTablePlan"); case kPlanTypeJoin: return "kJoinPlan"; - case kPlanTypeUnion: - return "kUnionPlan"; + case kPlanTypeSetOperation: + return "kSetOperationPlan"; case kPlanTypeSort: return "kSortPlan"; case kPlanTypeGroup: @@ -638,33 +638,17 @@ bool JoinPlanNode::Equals(const PlanNode *node) const { BinaryPlanNode::Equals(that); } -void UnionPlanNode::Print(std::ostream &output, - const std::string &org_tab) const { +void SetOperationPlanNode::Print(std::ostream &output, const std::string &org_tab) const { PlanNode::Print(output, org_tab); output << "\n"; - std::string tab = org_tab + INDENT; - PrintValue(output, tab, is_all ? "ALL" : "DISTINCT", "union_type", false); - if (config_options_ != nullptr) { - output << "\n"; - PrintValue(output, tab, config_options_.get(), "config_options", false); - } + PrintValue(output, org_tab + INDENT, SetOperatorName(op_type_, distinct_), "operator", false); output << "\n"; PrintChildren(output, org_tab); } -bool UnionPlanNode::Equals(const PlanNode *node) const { - if (nullptr == node) { - return false; - } - - if (this == node) { - return true; - } - - if (type_ != node->type_) { - return false; - } - const UnionPlanNode *that = dynamic_cast(node); - return this->is_all == that->is_all && BinaryPlanNode::Equals(that); +bool SetOperationPlanNode::Equals(const PlanNode *node) const { + auto casted = dynamic_cast(node); + return MultiChildPlanNode::Equals(node) && casted && op_type_ == casted->op_type() && + distinct_ == casted->distinct_; } void QueryPlanNode::Print(std::ostream &output, const std::string &org_tab) const { diff --git a/hybridse/src/node/sql_node.cc b/hybridse/src/node/sql_node.cc index a0e8e0bec8f..9114bad2d53 100644 --- a/hybridse/src/node/sql_node.cc +++ b/hybridse/src/node/sql_node.cc @@ -1216,6 +1216,18 @@ absl::string_view CmdTypeName(const CmdType type) { return "undefined cmd type"; } +std::string SetOperatorName(SetOperationType type, bool dis) { + std::string distinct = dis ? "DISTINCT" : "ALL"; + switch (type) { + case SetOperationType::UNION: + return "UNION " + distinct; + case SetOperationType::EXCEPT: + return "EXCEPT " + distinct; + case SetOperationType::INTERSECT: + return "INTERSECT " + distinct; + } +} + std::string DataTypeName(DataType type) { auto &map = GetDataTypeNamesMap(); auto it = map.find(type); @@ -2027,23 +2039,6 @@ bool JoinNode::Equals(const SqlNode *node) const { ExprEquals(this->orders_, that->orders_) && SqlEquals(this->left_, that->right_); } -void UnionQueryNode::Print(std::ostream &output, const std::string &org_tab) const { - QueryNode::Print(output, org_tab); - const std::string tab = org_tab + INDENT + SPACE_ED; - output << "\n"; - PrintValue(output, tab, is_all_ ? "ALL UNION" : "DISTINCT UNION", "union_type", false); - output << "\n"; - PrintSqlNode(output, tab, left_, "left", false); - output << "\n"; - PrintSqlNode(output, tab, right_, "right", true); -} -bool UnionQueryNode::Equals(const SqlNode *node) const { - if (!QueryNode::Equals(node)) { - return false; - } - const UnionQueryNode *that = dynamic_cast(node); - return this->is_all_ && that->is_all_ && SqlEquals(this->left_, that->right_); -} void QueryExpr::Print(std::ostream &output, const std::string &org_tab) const { ExprNode::Print(output, org_tab); const std::string tab = org_tab + INDENT + SPACE_ED; @@ -2734,5 +2729,22 @@ std::string DropPathAction::DebugString() const { return absl::Substitute("DropPathAction ($0)", target_); } +bool SetOperationNode::Equals(const SqlNode *node) const { + auto *rhs = dynamic_cast(node); + return this->QueryNode::Equals(node) && this->op_type() == rhs->op_type() && this->distinct() == rhs->distinct() && + absl::c_equal(this->inputs(), rhs->inputs(), + [](const QueryNode *const l, const QueryNode *const r) { return SqlEquals(l, r); }); +} +void SetOperationNode::Print(std::ostream &output, const std::string &org_tab) const { + QueryNode::Print(output, org_tab); + output << "\n"; + PrintValue(output, org_tab + INDENT, SetOperatorName(op_type_, distinct_), "operator", false); + output << "\n" << org_tab + INDENT << SPACE_ST << "inputs[list]:"; + for (size_t i = 0; i < inputs().size(); i++) { + auto node = inputs()[i]; + output << "\n"; + PrintSqlNode(output, org_tab + INDENT + INDENT, node, std::to_string(i), i + 1 == inputs().size()); + } +} } // namespace node } // namespace hybridse diff --git a/hybridse/src/node/sql_node_test.cc b/hybridse/src/node/sql_node_test.cc index 227cb80dcea..e2938656dcc 100644 --- a/hybridse/src/node/sql_node_test.cc +++ b/hybridse/src/node/sql_node_test.cc @@ -918,7 +918,7 @@ TEST_F(SqlNodeTest, ColumnIdTest) { TEST_F(SqlNodeTest, QueryTypeNameTest) { ASSERT_EQ("kQuerySelect", node::QueryTypeName(node::kQuerySelect)); - ASSERT_EQ("kQueryUnion", node::QueryTypeName(node::kQueryUnion)); + ASSERT_EQ("kQuerySetOperation", node::QueryTypeName(node::kQuerySetOperation)); ASSERT_EQ("kQuerySub", node::QueryTypeName(node::kQuerySub)); } diff --git a/hybridse/src/passes/physical/group_and_sort_optimized.cc b/hybridse/src/passes/physical/group_and_sort_optimized.cc index 2d51b336167..d88369856fd 100644 --- a/hybridse/src/passes/physical/group_and_sort_optimized.cc +++ b/hybridse/src/passes/physical/group_and_sort_optimized.cc @@ -48,8 +48,15 @@ using hybridse::vm::PhysicalSimpleProjectNode; using hybridse::vm::PhysicalWindowAggrerationNode; using hybridse::vm::ProjectType; -static bool ResolveColumnToSourceColumnName(const node::ColumnRefNode* col, const SchemasContext* schemas_ctx, - std::string* db, std::string* table, std::string* source_col); +template +T* ShadowCopy(T* in, std::vector* ct) { + if (in != nullptr) { + auto copy = in->ShadowCopy(); + ct->push_back(copy); + return &ct->back(); + } + return nullptr; +} // ExprNode may be resolving under different SchemasContext later (say one of its descendants context), // with column name etc it may not able to resvole since a column rename may happen in SimpleProject node. @@ -63,18 +70,39 @@ absl::Status GroupAndSortOptimized::BuildExprCache(const node::ExprNode* node, c case node::kExprColumnRef: { auto ref = dynamic_cast(node); - if (expr_cache_.find(ref) != expr_cache_.end()) { - break; + auto& ref_map = expr_cache_[ref]; + + auto db = ref->GetDBName(); + auto rel = ref->GetRelationName(); + auto col_name = ref->GetColumnName(); + size_t column_id; + Status status = sc->ResolveColumnID(db, rel, col_name, &column_id); + if (!status.isOK()) { + return absl::NotFoundError(absl::StrCat("Illegal index column: ", ref->GetExprString())); } - std::string source_col; - std::string source_db; - std::string source_table; - if (!ResolveColumnToSourceColumnName(ref, sc, &source_db, &source_table, &source_col)) { - return absl::InternalError(absl::StrCat("unable to resolve ", ref->GetExprString())); + auto descendants = sc->GetRoot()->TraceLastDescendants(column_id); + if (!descendants.ok()) { + return descendants.status(); } - expr_cache_.emplace(ref, SrcColInfo{source_col, source_table, source_db}); + for (auto& entry : descendants.value()) { + if (entry.first->GetOpType() != vm::kPhysicalOpDataProvider) { + continue; + } + auto* data_node = dynamic_cast(entry.first); + std::string source_col, source_table, source_db; + auto s = data_node->schemas_ctx()->ResolveDbTableColumnByID(entry.second, &source_db, &source_table, + &source_col); + if (!s.isOK()) { + return absl::NotFoundError(s.msg); + } + if (ref_map.find(data_node) != ref_map.end()) { + return absl::AlreadyExistsError( + absl::StrCat("node ", entry.first->GetTreeString(), " already exists")); + } + ref_map[data_node] = {source_col, source_table, source_db}; + } break; } default: @@ -171,11 +199,9 @@ bool GroupAndSortOptimized::Transform(PhysicalOpNode* in, PhysicalOpNode* new_producer; if (!union_op->instance_not_in_window()) { - if (KeysAndOrderFilterOptimized( - union_op->schemas_ctx(), union_op->GetProducer(1), - &union_op->window_.partition_, - &union_op->window_.index_key_, &union_op->window_.sort_, - &new_producer)) { + if (KeysAndOrderFilterOptimized(union_op->GetProducer(1)->schemas_ctx(), union_op->GetProducer(1), + &union_op->window_.partition_, &union_op->window_.index_key_, + &union_op->window_.sort_, &new_producer)) { if (!ResetProducer(plan_ctx_, union_op, 1, new_producer)) { return false; } @@ -278,7 +304,7 @@ bool GroupAndSortOptimized::Transform(PhysicalOpNode* in, return false; } -bool GroupAndSortOptimized::KeysOptimized(const SchemasContext* root_schemas_ctx, +bool GroupAndSortOptimized::KeysOptimized(const vm::SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Key* left_key, Key* index_key, @@ -298,23 +324,29 @@ bool GroupAndSortOptimized::KeysOptimized(const SchemasContext* root_schemas_ctx optimize_info_ = nullptr; }; - auto s = BuildExprCache(left_key->keys(), root_schemas_ctx); - if (!s.ok()) { - return false; - } - s = BuildExprCache(index_key->keys(), root_schemas_ctx); + auto s = BuildExprCache(index_key->keys(), root_schemas_ctx); if (!s.ok()) { + LOG(WARNING) << s; return false; } if (right_key != nullptr) { s = BuildExprCache(right_key->keys(), root_schemas_ctx); if (!s.ok()) { + LOG(WARNING) << s; + return false; + } + } else { + // build cache from left only right_key is empty + auto s = BuildExprCache(left_key->keys(), root_schemas_ctx); + if (!s.ok()) { + LOG(WARNING) << s; return false; } } if (sort != nullptr) { s = BuildExprCache(sort->orders(), root_schemas_ctx); if (!s.ok()) { + LOG(WARNING) << s; return false; } } @@ -349,13 +381,12 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas if (DataProviderType::kProviderTypeTable == scan_op->provider_type_ || DataProviderType::kProviderTypePartition == scan_op->provider_type_) { - auto* table_node = dynamic_cast(scan_op); if (optimize_info_) { if (optimize_info_->left_key == left_key && optimize_info_->index_key == index_key && optimize_info_->right_key == right_key && optimize_info_->sort_key == sort) { if (optimize_info_->optimized != nullptr && - table_node->GetDb() == optimize_info_->optimized->GetDb() && - table_node->GetName() == optimize_info_->optimized->GetName()) { + scan_op->GetDb() == optimize_info_->optimized->GetDb() && + scan_op->GetName() == optimize_info_->optimized->GetName()) { *new_in = optimize_info_->optimized; return true; } @@ -371,8 +402,8 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas if (DataProviderType::kProviderTypeTable == scan_op->provider_type_) { // Apply key columns and order column optimization with all indexes binding to scan_op->table_handler_ // Return false if fail to find an appropriate index - if (!TransformKeysAndOrderExpr(root_schemas_ctx, right_partition, - nullptr == sort ? nullptr : sort->orders_, scan_op->table_handler_, + if (!TransformKeysAndOrderExpr( right_partition, + nullptr == sort ? nullptr : sort->orders_, scan_op, &index_name, &bitmap)) { return false; } @@ -386,8 +417,8 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas index_name = partition_op->index_name_; // Apply key columns and order column optimization with given index name // Return false if given index do not match the keys and order column - if (!TransformKeysAndOrderExpr(root_schemas_ctx, right_partition, - nullptr == sort ? nullptr : sort->orders_, scan_op->table_handler_, + if (!TransformKeysAndOrderExpr( right_partition, + nullptr == sort ? nullptr : sort->orders_, scan_op, &index_name, &bitmap)) { return false; } @@ -621,6 +652,91 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas } *new_in = new_union; return true; + } else if (PhysicalOpType::kPhysicalOpSetOperation == in->GetOpType()) { + auto set_op = dynamic_cast(in); + // keys optimize for each inputs for set operation + std::vector opt_inputs; + opt_inputs.reserve(in->GetProducerCnt()); + bool opt_all = true; + Key* left_key_opt = nullptr; + Key* index_key_opt = nullptr; + Key* right_key_opt = nullptr; + Sort* sort_opt = nullptr; + std::vector alloca_keys; + alloca_keys.reserve(3 * in->GetProducerCnt()); + std::vector alloca_sort; + alloca_sort.reserve(in->GetProducerCnt()); + + for (size_t i = 0; i < in->GetProducerCnt(); i++) { + auto n = in->GetProducer(i); + // expr_cache_.clear(); + // optimize_info_ = nullptr; + PhysicalOpNode* optimized = nullptr; + // copy keys + auto left_key_cp = ShadowCopy(left_key, &alloca_keys); + auto index_key_cp = ShadowCopy(index_key, &alloca_keys); + auto right_key_cp = ShadowCopy(right_key, &alloca_keys); + auto sort_cp = ShadowCopy(sort, &alloca_sort); + + if (!KeysOptimizedImpl(n->schemas_ctx(), n, left_key_cp, index_key_cp, right_key_cp, sort_cp, &optimized)) { + LOG(WARNING) << "unable to optimize operation set input: " << n->GetTreeString(); + opt_all = false; + } + opt_inputs.push_back(optimized == nullptr ? n : optimized); + + if (i == 0) { + left_key_opt = left_key_cp; + index_key_opt = index_key_cp; + right_key_opt = right_key_cp; + sort_opt = sort_cp; + } else { + // check all optimized keys equals + if (!node::SqlEquals(left_key_opt->keys(), left_key_cp->keys())) { + LOG(WARNING) << "[optimizing set operation] optimized left keys not equal: " + << node::ExprString(left_key_opt->keys()) << " vs " + << node::ExprString(left_key_cp->keys()); + return false; + } + if (!node::SqlEquals(index_key_opt->keys(), index_key_cp->keys())) { + LOG(WARNING) << "[optimizing set operation] optimized index keys not equal: " + << node::ExprString(index_key_opt->keys()) << " vs " + << node::ExprString(index_key_cp->keys()); + return false; + } + if (right_key_opt && !node::SqlEquals(right_key_opt->keys(), right_key_cp->keys())) { + LOG(WARNING) << "[optimizing set operation] optimized right keys not equal: " + << node::ExprString(right_key_opt->keys()) << " vs " + << node::ExprString(right_key_cp->keys()); + return false; + } + if (sort_opt && !node::SqlEquals(sort_opt->orders(), sort_cp->orders())) { + LOG(WARNING) << "[optimizing set operation] optimized order keys not equal: " + << node::ExprString(sort_opt->orders()) << " vs " + << node::ExprString(sort_cp->orders()); + return false; + } + } + } + if (opt_all) { + // write keys + left_key->set_keys(left_key_opt->keys()); + index_key->set_keys(index_key_opt->keys()); + if (right_key && right_key_opt) { + right_key->set_keys(right_key_opt->keys()); + } + if (sort && sort_opt) { + sort->set_orders(sort_opt->orders()); + } + vm::PhysicalSetOperationNode* opt_set = nullptr; + if (!plan_ctx_ + ->CreateOp(&opt_set, set_op->op_type_, opt_inputs, set_op->distinct_) + .isOK()) { + return false; + } + *new_in = opt_set; + } + + return opt_all; } return false; } @@ -690,11 +806,8 @@ bool GroupAndSortOptimized::KeyAndOrderOptimized( sort, new_in); } -bool GroupAndSortOptimized::TransformKeysAndOrderExpr(const SchemasContext* root_schemas_ctx, - const node::ExprListNode* groups, - const node::OrderByNode* order, - std::shared_ptr table_handler, - std::string* index_name, +bool GroupAndSortOptimized::TransformKeysAndOrderExpr(const node::ExprListNode* groups, const node::OrderByNode* order, + vm::PhysicalDataProviderNode* data_node, std::string* index_name, IndexBitMap* output_bitmap) { if (nullptr == groups || nullptr == output_bitmap || nullptr == index_name) { DLOG(WARNING) << "fail to transform keys expr : key expr or output " @@ -717,13 +830,18 @@ bool GroupAndSortOptimized::TransformKeysAndOrderExpr(const SchemasContext* root switch (group->expr_type_) { case node::kExprColumnRef: { auto column = dynamic_cast(group); - auto op = expr_cache_.find(column); - if (op == expr_cache_.end()) { + auto oop = expr_cache_.find(column); + if (oop == expr_cache_.end()) { return false; } - if (table_handler->GetName() != op->second.tb_name || - table_handler->GetDatabase() != op->second.db_name) { + auto op = oop->second.find(data_node); + if (op == oop->second.end()) { + return false; + } + + if (data_node-> table_handler_->GetName() != op->second.tb_name || + data_node->table_handler_->GetDatabase() != op->second.db_name) { return false; } @@ -742,13 +860,18 @@ bool GroupAndSortOptimized::TransformKeysAndOrderExpr(const SchemasContext* root auto expr = order->GetOrderExpressionExpr(i); if (nullptr != expr && expr->GetExprType() == node::kExprColumnRef) { auto column = dynamic_cast(expr); - auto op = expr_cache_.find(column); - if (op == expr_cache_.end()) { + auto oop = expr_cache_.find(column); + if (oop == expr_cache_.end()) { + return false; + } + + auto op = oop->second.find(data_node); + if (op == oop->second.end()) { return false; } - if (table_handler->GetName() != op->second.tb_name || - table_handler->GetDatabase() != op->second.db_name) { + if (data_node->table_handler_->GetName() != op->second.tb_name || + data_node->table_handler_->GetDatabase() != op->second.db_name) { return false; } @@ -763,7 +886,7 @@ bool GroupAndSortOptimized::TransformKeysAndOrderExpr(const SchemasContext* root IndexBitMap match_bitmap; // internal structure for MatchBestIndex, initially turn every bit true IndexBitMap state_bitmap(std::vector>(columns.size(), std::make_optional(0))); - if (!MatchBestIndex(columns, order_columns, table_handler, &state_bitmap, index_name, &match_bitmap)) { + if (!MatchBestIndex(columns, order_columns, data_node->table_handler_, &state_bitmap, index_name, &match_bitmap)) { return false; } if (match_bitmap.bitmap.size() != columns.size()) { @@ -900,45 +1023,5 @@ bool GroupAndSortOptimized::MatchBestIndex(const std::vector& colum return succ; } -/** - * Resolve column reference to possible source table's column name - */ -static bool ResolveColumnToSourceColumnName(const node::ColumnRefNode* col, const SchemasContext* schemas_ctx, - std::string* src_db, std::string* src_table, std::string* src_col) { - auto db = col->GetDBName(); - auto rel = col->GetRelationName(); - auto col_name = col->GetColumnName(); - size_t column_id; - int path_idx; - size_t child_column_id; - size_t source_column_id; - const PhysicalOpNode* source; - Status status = schemas_ctx->ResolveColumnID(db, rel, col_name, &column_id, &path_idx, &child_column_id, - &source_column_id, &source); - - if (!status.isOK()) { - LOG(WARNING) << "Illegal index column: " << col->GetExprString(); - return false; - } - if (source == nullptr || - source->GetOpType() != PhysicalOpType::kPhysicalOpDataProvider) { - LOG(WARNING) << "Index column is not from any source table: " - << col->GetExprString(); - return false; - } - - std::string sc_db, sc_table, sc_column; - status = source->schemas_ctx()->ResolveDbTableColumnByID(source_column_id, &sc_db, &sc_table, &sc_column); - if (!status.isOK()) { - LOG(WARNING) << "Illegal source column id #" << source_column_id << " for index column " - << col->GetExprString(); - return false; - } - *src_db = sc_db; - *src_table = sc_table; - *src_col = sc_column; - return true; -} - } // namespace passes } // namespace hybridse diff --git a/hybridse/src/passes/physical/group_and_sort_optimized.h b/hybridse/src/passes/physical/group_and_sort_optimized.h index 2e50571b29d..69b0fcd2b36 100644 --- a/hybridse/src/passes/physical/group_and_sort_optimized.h +++ b/hybridse/src/passes/physical/group_and_sort_optimized.h @@ -107,7 +107,7 @@ class GroupAndSortOptimized : public TransformUpPysicalPass { private: bool Transform(PhysicalOpNode* in, PhysicalOpNode** output); - bool KeysOptimized(const SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Key* left_key, Key* index_key, + bool KeysOptimized(const vm::SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Key* left_key, Key* index_key, Key* right_key, Sort* sort, PhysicalOpNode** new_in); bool KeysOptimizedImpl(const SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Key* left_key, Key* index_key, @@ -138,11 +138,8 @@ class GroupAndSortOptimized : public TransformUpPysicalPass { PhysicalOpNode* in, Key* group, PhysicalOpNode** new_in); - bool TransformKeysAndOrderExpr(const SchemasContext* schemas_ctx, - const node::ExprListNode* groups, - const node::OrderByNode* order, - std::shared_ptr table_handler, - std::string* index, + bool TransformKeysAndOrderExpr(const node::ExprListNode* groups, const node::OrderByNode* order, + vm::PhysicalDataProviderNode* data_node, std::string* index, IndexBitMap* best_bitmap); bool MatchBestIndex(const std::vector& columns, const std::vector& order_columns, @@ -159,7 +156,8 @@ class GroupAndSortOptimized : public TransformUpPysicalPass { // Map ExprNode to source column name // A source column name is the column name in string that refers to a physical table, // only one table got optimized each time - std::unordered_map expr_cache_; + std::unordered_map> + expr_cache_; std::unique_ptr optimize_info_; }; diff --git a/hybridse/src/plan/planner.cc b/hybridse/src/plan/planner.cc index fc350d1ffb6..164dba11f2b 100644 --- a/hybridse/src/plan/planner.cc +++ b/hybridse/src/plan/planner.cc @@ -60,29 +60,56 @@ Planner::Planner(node::NodeManager *manager, const bool is_batch_mode, const boo } } -base::Status Planner::CreateQueryPlan(const node::QueryNode *root, PlanNode **plan_tree) { +base::Status Planner::CreateQueryPlan(const node::QueryNode *root, node::QueryPlanNode **plan_tree) { CHECK_TRUE(nullptr != root, common::kPlanError, "can not create query plan node with null query node"); + + auto out = node_manager_->MakeNode(); + + if (!root->with_clauses_.empty()) { + auto with_list = node_manager_->MakeList(); + for (auto q : root->with_clauses_) { + node::QueryPlanNode *with = nullptr; + // CHECK_TRUE(q->query_->query_type_ == node::kQuerySelect, common::kPlanError, + // "only support select query as with clause entry"); + CHECK_STATUS(CreateQueryPlan(q->query_, &with)); + + auto with_entry = node_manager_->MakeNode(q->alias_, with); + + with_list->data_.push_back(with_entry); + } + out->with_clauses_ = absl::MakeSpan(with_list->data_); + } + + if (root->config_options_ != nullptr) { + out->config_options_ = root->config_options_; + } + switch (root->query_type_) { case node::kQuerySelect: { - node::QueryPlanNode* plan = nullptr; - CHECK_STATUS(CreateSelectQueryPlan(dynamic_cast(root), &plan)); - *plan_tree = plan; + node::PlanNode* query_input = nullptr; + CHECK_STATUS(CreateSelectQueryPlan(dynamic_cast(root), &query_input)); + out->AddChild(query_input); break; } - case node::kQueryUnion: - CHECK_STATUS(CreateUnionQueryPlan(dynamic_cast(root), plan_tree)); + case node::kQuerySetOperation: { + node::SetOperationPlanNode* un = nullptr; + CHECK_STATUS(CreateSetOperationPlan(dynamic_cast(root), &un)); + out->AddChild(un); break; + } default: { FAIL_STATUS(common::kPlanError, "can not create query plan node with invalid query type " + node::QueryTypeName(root->query_type_)); } } + + *plan_tree = out; return base::Status::OK(); } // TODO(chenjing): refactor SELECT query logical plan // Deal with group by clause, order clause, having clause in physical plan instead of logical plan, since we need // schema context for column resolve. -base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, node::QueryPlanNode **plan_tree) { +base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, node::PlanNode **plan_tree) { const node::NodePointVector &table_ref_list = nullptr == root->GetTableRefList() ? std::vector() : root->GetTableRefList()->GetList(); std::vector relation_nodes; @@ -314,46 +341,24 @@ base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, n current_node = node_manager_->MakeLimitPlanNode(current_node, limit_ptr->GetLimitCount()); } - auto out = node_manager_->MakeNode(current_node); - - if (!root->with_clauses_.empty()) { - auto with_list = node_manager_->MakeList(); - for (auto q : root->with_clauses_) { - node::QueryPlanNode *with = nullptr; - CHECK_TRUE(q->query_->query_type_ == node::kQuerySelect, common::kPlanError, - "only support select query as with clause entry"); - CHECK_STATUS(CreateSelectQueryPlan(dynamic_cast(q->query_), &with)); - - auto with_entry = node_manager_->MakeNode(q->alias_, with); - - with_list->data_.push_back(with_entry); - } - out->with_clauses_ = absl::MakeSpan(with_list->data_); - } - - if (root->config_options_ != nullptr) { - out->config_options_ = root->config_options_; - } - *plan_tree = out; + *plan_tree = current_node; return base::Status::OK(); } -base::Status Planner::CreateUnionQueryPlan(const node::UnionQueryNode *root, PlanNode **plan_tree) { +base::Status Planner::CreateSetOperationPlan(const node::SetOperationNode *root, node::SetOperationPlanNode **plan_tree) { CHECK_TRUE(nullptr != root, common::kPlanError, "can not create query plan node with null query node") - node::PlanNode *left_plan = nullptr; - node::PlanNode *right_plan = nullptr; - CHECK_STATUS(CreateQueryPlan(root->left_, &left_plan), common::kPlanError, - "can not create union query plan left query") - CHECK_STATUS(CreateQueryPlan(root->right_, &right_plan), common::kPlanError, - "can not create union query plan right query") - auto* res = node_manager_->MakeNode(left_plan, right_plan, root->is_all_); - if (root->config_options_ != nullptr) { - res->config_options_ = root->config_options_; + auto list = node_manager_->MakeList(); + for (auto n : root->inputs()) { + node::QueryPlanNode* query = nullptr; + CHECK_STATUS(CreateQueryPlan(n, &query)); + list->data_.push_back(query); } - *plan_tree = res; + auto span = absl::MakeSpan(list->data_); + *plan_tree = node_manager_->MakeNode(root->op_type(), span, root->distinct()); return base::Status::OK(); } + base::Status Planner::CheckWindowFrame(const node::WindowDefNode *w_ptr) { CHECK_TRUE(nullptr != w_ptr->GetFrame(), common::kPlanError, "fail to create project list node: frame can't be unbound ") @@ -430,7 +435,7 @@ base::Status Planner::CreateCreateFunctionPlanNode(const node::CreateFunctionNod base::Status Planner::CreateSelectIntoPlanNode(const node::SelectIntoNode *root, node::PlanNode **output) { CHECK_TRUE(nullptr != root, common::kPlanError, "fail to create select into plan with null node"); - PlanNode *query = nullptr; + node::QueryPlanNode *query = nullptr; CHECK_STATUS(CreateQueryPlan(root->Query(), &query)) *output = node_manager_->MakeSelectIntoPlanNode(query, root->QueryStr(), root->OutFile(), root->Options(), root->ConfigOptions()); @@ -524,6 +529,7 @@ base::Status Planner::ValidateOnlineServingOp(node::PlanNode *node) { case node::kPlanTypeWindow: case node::kPlanTypeQuery: case node::kPlanTypeFilter: + case node::kPlanTypeSetOperation: case node::kPlanTypeJoin: { break; } @@ -593,6 +599,7 @@ base::Status Planner::ValidateClusterOnlineTrainingOp(node::PlanNode *node) { case node::kPlanTypeRename: case node::kPlanTypeLimit: case node::kPlanTypeFilter: + case node::kPlanTypeSetOperation: case node::kPlanTypeQuery: { break; } @@ -618,7 +625,7 @@ base::Status Planner::PrepareRequestTable(node::PlanNode *node, std::vectortype_) { case node::kPlanTypeJoin: - case node::kPlanTypeUnion: { + case node::kPlanTypeSetOperation: { auto binary_op = dynamic_cast(node); CHECK_TRUE(nullptr != binary_op->GetLeft(), common::kPlanError, "Left child of ", node->GetTypeName(), " is null") @@ -663,7 +670,7 @@ base::Status SimplePlanner::CreatePlanTree(const NodePointVector &parser_trees, for (auto parser_tree : parser_trees) { switch (parser_tree->GetType()) { case node::kQuery: { - PlanNode *query_plan = nullptr; + node::QueryPlanNode *query_plan = nullptr; CHECK_STATUS(CreateQueryPlan(dynamic_cast(parser_tree), &query_plan)); if (!is_batch_mode_) { @@ -917,7 +924,9 @@ base::Status Planner::CreateTableReferencePlanNode(const node::TableRefNode *roo } case node::kRefQuery: { const node::QueryRefNode *sub_query_node = dynamic_cast(root); - CHECK_STATUS(CreateQueryPlan(sub_query_node->query_, &plan_node)) + node::QueryPlanNode* query = nullptr; + CHECK_STATUS(CreateQueryPlan(sub_query_node->query_, &query)) + plan_node = query; if (!sub_query_node->alias_table_name_.empty()) { *output = node_manager_->MakeRenamePlanNode(plan_node, sub_query_node->alias_table_name_); } else { diff --git a/hybridse/src/plan/planner.h b/hybridse/src/plan/planner.h index b484b3c594c..731663ab246 100644 --- a/hybridse/src/plan/planner.h +++ b/hybridse/src/plan/planner.h @@ -80,9 +80,9 @@ class Planner { // currently only apply to rows window bool ExpandCurrentHistoryWindow(std::vector *windows); base::Status CheckWindowFrame(const node::WindowDefNode *w_ptr); - base::Status CreateQueryPlan(const node::QueryNode *root, PlanNode **plan_tree); - base::Status CreateSelectQueryPlan(const node::SelectQueryNode *root, node::QueryPlanNode **plan_tree); - base::Status CreateUnionQueryPlan(const node::UnionQueryNode *root, PlanNode **plan_tree); + base::Status CreateQueryPlan(const node::QueryNode *root, node::QueryPlanNode **plan_tree); + base::Status CreateSelectQueryPlan(const node::SelectQueryNode *root, node::PlanNode **plan_tree); + base::Status CreateSetOperationPlan(const node::SetOperationNode *root, node::SetOperationPlanNode **plan_tree); base::Status CreateCreateTablePlan(const node::SqlNode *root, node::PlanNode **output); base::Status CreateTableReferencePlanNode(const node::TableRefNode *root, node::PlanNode **output); base::Status CreateCmdPlan(const SqlNode *root, node::PlanNode **output); diff --git a/hybridse/src/planv2/ast_node_converter.cc b/hybridse/src/planv2/ast_node_converter.cc index 2592c19fb99..5d9eb939113 100644 --- a/hybridse/src/planv2/ast_node_converter.cc +++ b/hybridse/src/planv2/ast_node_converter.cc @@ -21,8 +21,11 @@ #include "absl/base/attributes.h" #include "absl/container/flat_hash_map.h" +#include "absl/strings/ascii.h" #include "absl/strings/match.h" +#include "absl/types/span.h" #include "base/fe_status.h" +#include "udf/udf.h" #include "zetasql/parser/ast_node_kind.h" namespace hybridse { @@ -52,6 +55,8 @@ static base::Status convertAlterAction(const zetasql::ASTAlterAction* action, no node::AlterActionBase** out); static base::Status ConvertAlterTableStmt(const zetasql::ASTAlterTableStatement* stmt, node::NodeManager* nm, node::SqlNode** out); +static base::Status ConvertSetOperation(const zetasql::ASTSetOperation* stmt, node::NodeManager* nm, + node::SetOperationNode** out); /// Used to convert zetasql ASTExpression Node into our ExprNode base::Status ConvertExprNode(const zetasql::ASTExpression* ast_expression, node::NodeManager* node_manager, @@ -339,7 +344,7 @@ base::Status ConvertExprNode(const zetasql::ASTExpression* ast_expression, node: "Un-support Modifiers for function call") std::string function_name = ""; CHECK_STATUS(AstPathExpressionToString(function_call->function(), &function_name)) - boost::to_lower(function_name); + absl::AsciiStrToLower(&function_name); // Convert function call TYPE(value) to cast expression CAST(value as TYPE) node::DataType data_type; base::Status status = node::StringToDataType(function_name, &data_type); @@ -1329,35 +1334,19 @@ base::Status ConvertQueryExpr(const zetasql::ASTQueryExpression* query_expressio return base::Status::OK(); } case zetasql::AST_SET_OPERATION: { - const auto set_op = query_expression->GetAsOrNull(); - CHECK_TRUE(set_op != nullptr, common::kSqlAstError, "not an ASTSetOperation"); - switch (set_op->op_type()) { - case zetasql::ASTSetOperation::OperationType::UNION: { - CHECK_TRUE(set_op->inputs().size() >= 2, common::kSqlAstError, - "Union Set Operation have inputs size less than 2"); - bool is_distinct = set_op->distinct(); - node::QueryNode* left = nullptr; - CHECK_STATUS(ConvertQueryExpr(set_op->inputs().at(0), node_manager, &left)); - - for (size_t i = 1; i < set_op->inputs().size(); ++i) { - auto input = set_op->inputs().at(i); - node::QueryNode* expr_node = nullptr; - // TODO(aceforeverd): support set operation - CHECK_STATUS(ConvertQueryExpr(input, node_manager, &expr_node)); - left = node_manager->MakeUnionQueryNode(left, expr_node, !is_distinct); - } - - *output = left; - return base::Status::OK(); - } - default: { - return base::Status(common::kSqlAstError, - absl::StrCat("Un-support set operation: ", set_op->GetSQLForOperation())); - } - } + node::SetOperationNode* set = nullptr; + CHECK_STATUS( + ConvertGuard(query_expression, node_manager, &set, ConvertSetOperation)); + *output = set; + return base::Status::OK(); + } + case zetasql::AST_QUERY: { + node::QueryNode* query = nullptr; + CHECK_STATUS(ConvertGuard(query_expression, node_manager, &query, ConvertQueryNode)); + *output = query; + return base::Status::OK(); } default: { - // NOTE: code basically won't reach here unless inner error return base::Status(common::kSqlAstError, absl::StrCat("can not create query plan node with invalid query type ", query_expression->GetNodeKindString())); @@ -2140,7 +2129,7 @@ base::Status ConvertAstOptionsListToMap(const zetasql::ASTOptionsList* options, for (auto entry : options->options_entries()) { std::string key = entry->name()->GetAsString(); if (to_lower) { - boost::to_lower(key); + absl::AsciiStrToLower(&key); } auto entry_value = entry->value(); node::ExprNode* value = nullptr; @@ -2391,5 +2380,31 @@ base::Status ConvertAlterTableStmt(const zetasql::ASTAlterTableStatement* ast_no return base::Status::OK(); } +base::Status ConvertSetOperation(const zetasql::ASTSetOperation* set_op, node::NodeManager* node_manager, + node::SetOperationNode** out) { + switch (set_op->op_type()) { + case zetasql::ASTSetOperation::OperationType::UNION: { + CHECK_TRUE(set_op->inputs().size() >= 2, common::kSqlAstError, + "Union Set Operation have inputs size less than 2"); + + auto list = node_manager->MakeList(); + for (auto n : set_op->inputs()) { + node::QueryNode* expr_node = nullptr; + CHECK_STATUS(ConvertQueryExpr(n, node_manager, &expr_node)); + list->data_.push_back(expr_node); + } + + auto span = absl::MakeSpan(list->data_); + *out = node_manager->MakeNode(node::SetOperationType::UNION, span, + set_op->distinct()); + return base::Status::OK(); + } + default: { + return base::Status(common::kSqlAstError, + absl::StrCat("Un-support set operation: ", set_op->GetSQLForOperation())); + } + } +} + } // namespace plan } // namespace hybridse diff --git a/hybridse/src/planv2/ast_node_converter.h b/hybridse/src/planv2/ast_node_converter.h index 5893439cfe0..e85c6cf8487 100644 --- a/hybridse/src/planv2/ast_node_converter.h +++ b/hybridse/src/planv2/ast_node_converter.h @@ -20,7 +20,6 @@ #include #include "node/node_manager.h" -#include "udf/udf.h" #include "zetasql/parser/parser.h" namespace hybridse { diff --git a/hybridse/src/planv2/ast_node_converter_test.cc b/hybridse/src/planv2/ast_node_converter_test.cc index 20daab24019..51447011f78 100644 --- a/hybridse/src/planv2/ast_node_converter_test.cc +++ b/hybridse/src/planv2/ast_node_converter_test.cc @@ -1204,6 +1204,8 @@ INSTANTIATE_TEST_SUITE_P(ASTHavingStatementTest, ASTNodeConverterTest, testing::ValuesIn(sqlcase::InitCases("cases/plan/having_query.yaml", FILTERS))); INSTANTIATE_TEST_SUITE_P(ASTHWindowQueryTest, ASTNodeConverterTest, testing::ValuesIn(sqlcase::InitCases("cases/plan/window_query.yaml", FILTERS))); +INSTANTIATE_TEST_SUITE_P(ASTUnionQueryTest, ASTNodeConverterTest, + testing::ValuesIn(sqlcase::InitCases("cases/plan/union_query.yaml", FILTERS))); } // namespace plan } // namespace hybridse diff --git a/hybridse/src/planv2/planner_v2_test.cc b/hybridse/src/planv2/planner_v2_test.cc index 4eb07bbb611..b5012bcce11 100644 --- a/hybridse/src/planv2/planner_v2_test.cc +++ b/hybridse/src/planv2/planner_v2_test.cc @@ -72,8 +72,8 @@ INSTANTIATE_TEST_SUITE_P(SqlOrderParse, PlannerV2Test, INSTANTIATE_TEST_SUITE_P(SqlJoinParse, PlannerV2Test, testing::ValuesIn(sqlcase::InitCases("cases/plan/join_query.yaml", FILTERS))); -// INSTANTIATE_TEST_SUITE_P(SqlUnionParse, PlannerV2Test, -// testing::ValuesIn(sqlcase::InitCases("cases/plan/union_query.yaml", FILTERS))); +INSTANTIATE_TEST_SUITE_P(SqlUnionParse, PlannerV2Test, + testing::ValuesIn(sqlcase::InitCases("cases/plan/union_query.yaml", FILTERS))); INSTANTIATE_TEST_SUITE_P(SqlSubQueryParse, PlannerV2Test, testing::ValuesIn(sqlcase::InitCases("cases/plan/sub_query.yaml", FILTERS))); @@ -99,9 +99,15 @@ TEST_P(PlannerV2Test, PlannerSucessTest) { node::PlanNodeList plan_trees; EXPECT_EQ(param.expect().success_, PlanAPI::CreatePlanTreeFromScript(sqlstr, plan_trees, manager_, status)) << status; - if (!param.expect().plan_tree_str_.empty()) { - // HACK: weak implementation, but usually it works - EXPECT_EQ(param.expect().plan_tree_str_, plan_trees.at(0)->GetTreeString()); + if (param.expect().success_) { + if (!param.expect().plan_tree_str_.empty()) { + // HACK: weak implementation, but usually it works + EXPECT_EQ(param.expect().plan_tree_str_, plan_trees.at(0)->GetTreeString()); + } + } else { + if (!param.expect().msg_.empty()) { + EXPECT_EQ(absl::StripAsciiWhitespace(param.expect().msg_), status.msg); + } } } TEST_P(PlannerV2Test, PlannerClusterOnlineServingOptTest) { @@ -1868,8 +1874,6 @@ INSTANTIATE_TEST_SUITE_P(SqlErrorQuery, PlannerV2ErrorTest, testing::ValuesIn(sqlcase::InitCases("cases/plan/error_query.yaml", FILTERS))); INSTANTIATE_TEST_SUITE_P(SqlUnsupporQuery, PlannerV2ErrorTest, testing::ValuesIn(sqlcase::InitCases("cases/plan/error_unsupport_sql.yaml", FILTERS))); -INSTANTIATE_TEST_SUITE_P(SqlErrorRequestQuery, PlannerV2ErrorTest, - testing::ValuesIn(sqlcase::InitCases("cases/plan/error_request_query.yaml", FILTERS))); TEST_P(PlannerV2ErrorTest, RequestModePlanErrorTest) { auto &sql_case = GetParam(); diff --git a/hybridse/src/testing/engine_test_base.cc b/hybridse/src/testing/engine_test_base.cc index 4992b6b5018..7d02528b5ce 100644 --- a/hybridse/src/testing/engine_test_base.cc +++ b/hybridse/src/testing/engine_test_base.cc @@ -556,6 +556,8 @@ INSTANTIATE_TEST_SUITE_P(EngineBatchWhereGroupQuery, EngineTest, testing::ValuesIn(sqlcase::InitCases("cases/query/where_group_query.yaml"))); INSTANTIATE_TEST_SUITE_P(WithClause, EngineTest, testing::ValuesIn(sqlcase::InitCases("cases/query/with.yaml"))); +INSTANTIATE_TEST_SUITE_P(UnionQuery, EngineTest, + testing::ValuesIn(sqlcase::InitCases("cases/query/union_query.yml"))); INSTANTIATE_TEST_SUITE_P(EngineTestWindowRowQuery, EngineTest, testing::ValuesIn(sqlcase::InitCases("cases/function/window/test_window_row.yaml"))); diff --git a/hybridse/src/vm/catalog_wrapper.cc b/hybridse/src/vm/catalog_wrapper.cc index fbdd337e869..2bbf9dac1fe 100644 --- a/hybridse/src/vm/catalog_wrapper.cc +++ b/hybridse/src/vm/catalog_wrapper.cc @@ -397,5 +397,145 @@ void LazyLeftJoinIterator::onNewLeftRow() { value_ = res.first; matches_right_ |= res.second; } +void UnionIterator::Next() { + auto top = keys_.top(); + keys_.pop(); + inputs_.at(top.second)->Next(); + + if (inputs_.at(top.second)->Valid()) { + keys_.emplace(inputs_.at(top.second)->GetKey(), top.second); + } +} +const uint64_t& UnionIterator::GetKey() const { + if (Valid()) { + auto& top = keys_.top(); + return inputs_.at(top.second)->GetKey(); + } + + return INVALID_KEY; +} +const Row& UnionIterator::GetValue() { + if (Valid()) { + auto& top = keys_.top(); + return inputs_.at(top.second)->GetValue(); + } + + return INVALID_ROW; +} +void UnionIterator::Seek(const uint64_t& key) { + for (auto& n : inputs_) { + n->Seek(key); + } + rebuild_keys(); +} +void UnionIterator::SeekToFirst() { + for (auto& n : inputs_) { + n->SeekToFirst(); + } + rebuild_keys(); +} +void UnionIterator::rebuild_keys() { + keys_ = {}; + for (size_t i = 0; i < inputs_.size(); ++i) { + if (inputs_[i]->Valid()) { + keys_.emplace(inputs_[i]->GetKey(), i); + } + } +} +RowIterator* SetOperationHandler::GetRawIterator() { + switch (op_type_) { + case node::SetOperationType::UNION: { + std::vector > iters; + for (auto tb : inputs_) { + iters.emplace_back(tb->GetIterator()); + } + return new UnionIterator(absl::MakeSpan(iters), distinct_); + } + default: + return nullptr; + } +} +RowIterator* UnionWindowIterator::GetRawValue() { + std::vector> iters; + if (Valid()) { + auto& idxs = keys_.begin()->second; + for (auto i : idxs) { + iters.push_back(inputs_.at(i)->GetValue()); + } + } + + return new UnionIterator(absl::MakeSpan(iters), distinct_); +} +void UnionWindowIterator::Seek(const std::string& key) { + for (auto& i : inputs_) { + i->Seek(key); + } + rebuild_keys(); +} +void UnionWindowIterator::SeekToFirst() { + for (auto& i : inputs_) { + i->SeekToFirst(); + } + rebuild_keys(); +} +void UnionWindowIterator::Next() { + auto idxs = keys_.begin()->second; + keys_.erase(keys_.begin()); + for (auto i : idxs) { + inputs_.at(i)->Next(); + if (inputs_.at(i)->Valid()) { + keys_[inputs_.at(i)->GetKey()].push_back(i); + } + } +} +const codec::Row UnionWindowIterator::GetKey() { + if (Valid()) { + return keys_.begin()->first; + } + + return INVALID_ROW; +} +void UnionWindowIterator::rebuild_keys() { + keys_.clear(); + for (size_t i = 0; i < inputs_.size(); i++) { + if (inputs_[i]->Valid()) { + keys_[inputs_[i]->GetKey()].push_back(i); + } + } +} +RowIterator* SetOperationPartitionHandler::GetRawIterator() { + switch (op_type_) { + case node::SetOperationType::UNION: { + std::vector> iters; + for (auto tb : inputs_) { + iters.emplace_back(tb->GetIterator()); + } + return new UnionIterator(absl::MakeSpan(iters), distinct_); + } + default: + return nullptr; + } +} +std::shared_ptr SetOperationPartitionHandler::GetSegment(const std::string& key) { + std::vector> segs; + for (auto n : inputs_) { + segs.push_back(n->GetSegment(key)); + } + + return std::shared_ptr(new SetOperationHandler(op_type_, segs, distinct_)); +} +std::unique_ptr SetOperationPartitionHandler::GetWindowIterator() { + // NOTE: window iterator may out-of-order, use 'GetSegment' if ordering is mandatory + if (op_type_ != node::SetOperationType::UNION) { + return {}; + } + + std::vector> iters; + for (auto n : inputs_) { + iters.push_back(n->GetWindowIterator()); + } + + return std::unique_ptr(new UnionWindowIterator(absl::MakeSpan(iters), distinct_)); +} } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/catalog_wrapper.h b/hybridse/src/vm/catalog_wrapper.h index bfd1265aa82..077849acee1 100644 --- a/hybridse/src/vm/catalog_wrapper.h +++ b/hybridse/src/vm/catalog_wrapper.h @@ -21,6 +21,9 @@ #include #include #include +#include +#include +#include #include "absl/base/attributes.h" #include "codec/row_iterator.h" @@ -30,6 +33,9 @@ namespace hybridse { namespace vm { +static constexpr uint64_t INVALID_KEY = 0; +static const Row INVALID_ROW = Row(); + class IteratorProjectWrapper : public RowIterator { public: IteratorProjectWrapper(std::unique_ptr&& iter, const Row& parameter, const ProjectFun* fun) @@ -962,6 +968,148 @@ class ConcatPartitionHandler final : public PartitionHandler { size_t right_slices_; }; +class UnionIterator final : public codec::RowIterator { + public: + UnionIterator(absl::Span> inputs, bool distinct) : distinct_(distinct) { + size_t i = 0; + for (auto& n : inputs) { + if (n) { + n->SeekToFirst(); + if (n->Valid()) { + keys_.emplace(n->GetKey(), i++); + inputs_.push_back(std::move(n)); + } + } + } + } + ~UnionIterator() override {} + + bool Valid() const override { return !keys_.empty(); } + void Next() override; + const uint64_t& GetKey() const override; + const Row& GetValue() override; + + bool IsSeekable() const override { return true; }; + + void Seek(const uint64_t& key) override; + + void SeekToFirst() override; + + private: + using E = + std::pair().GetKey())>>, + decltype(std::vector().size())>; + struct PairLess { + constexpr bool operator() (const E& lhs, const E& rhs) const { + // larger key(larger index value if key equals) at top + // top key is the last/latest + if (lhs.first == rhs.first) { + return lhs.second < rhs.second; + } + return lhs.first < rhs.first; + } + }; + using MaxHeap = std::priority_queue, PairLess>; + + void rebuild_keys(); + + std::vector> inputs_; + bool distinct_ = false; // NOLINT + + MaxHeap keys_; +}; + +class SetOperationHandler final : public TableHandler { + public: + SetOperationHandler(node::SetOperationType type, absl::Span const> inputs, + bool distinct) + : op_type_(type), inputs_(inputs.begin(), inputs.end()), distinct_(distinct) {} + SetOperationHandler(node::SetOperationType type, absl::Span const> inputs, + bool distinct) + : op_type_(type), inputs_(inputs.begin(), inputs.end()), distinct_(distinct) {} + ~SetOperationHandler() override {} + + RowIterator* GetRawIterator() override; + + // unimplemented + const Types& GetTypes() override { return inputs_[0]->GetTypes(); } + const IndexHint& GetIndex() override { return inputs_[0]->GetIndex(); } + const Schema* GetSchema() override { return inputs_[0]->GetSchema(); } + const std::string& GetName() override { return inputs_[0]->GetName(); } + const std::string& GetDatabase() override { return inputs_[0]->GetDatabase(); } + + protected: + node::SetOperationType op_type_; + std::vector> inputs_; + bool distinct_ = false; +}; + +class UnionWindowIterator final : public codec::WindowIterator { + // NOTE: iterator ordering may out-of-order, same keys from different input iterator may output in two iteration. + // Because the input iterator may out-of-order itself. + public: + UnionWindowIterator(absl::Span> inputs, bool distinct) + : distinct_(distinct) { + size_t i = 0; + for (auto& n : inputs) { + if (n) { + n->SeekToFirst(); + if (n->Valid()) { + keys_[n->GetKey()].push_back(i++); + inputs_.push_back(std::move(n)); + } + } + } + } + ~UnionWindowIterator() override {} + + bool Valid() override { + return !keys_.empty(); + } + + RowIterator* GetRawValue() override; + + void Seek(const std::string& key) override; + + void SeekToFirst() override; + + void Next() override; + + const codec::Row GetKey() override; + + private: + void rebuild_keys(); + std::vector> inputs_; + + // smaller key comes first + std::map, std::less> keys_; + bool distinct_; +}; + +class SetOperationPartitionHandler final : public PartitionHandler { + public: + SetOperationPartitionHandler(node::SetOperationType type, + absl::Span const> inputs, bool distinct) + : op_type_(type), inputs_(inputs.begin(), inputs.end()), distinct_(distinct) {} + ~SetOperationPartitionHandler() override {} + + RowIterator* GetRawIterator() override; + + std::shared_ptr GetSegment(const std::string& key) override; + + std::unique_ptr GetWindowIterator() override; + + const Types& GetTypes() override { return inputs_[0]->GetTypes(); } + const IndexHint& GetIndex() override { return inputs_[0]->GetIndex(); } + const Schema* GetSchema() override { return inputs_[0]->GetSchema(); } + const std::string& GetName() override { return inputs_[0]->GetName(); } + const std::string& GetDatabase() override { return inputs_[0]->GetDatabase(); } + + private: + node::SetOperationType op_type_; + std::vector> inputs_; + bool distinct_ = false; +}; } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/internal/node_helper.cc b/hybridse/src/vm/internal/node_helper.cc index 46b3e0dfa8f..02b3ef24ca4 100644 --- a/hybridse/src/vm/internal/node_helper.cc +++ b/hybridse/src/vm/internal/node_helper.cc @@ -52,8 +52,10 @@ absl::StatusOr ExtractRequestNode(PhysicalOpNode* in) { // generally it is of type Partition, but can be Table as well e.g window (t1 instance_not_in_window) return nullptr; } + case vm::kPhysicalOpSetOperation: { + return ExtractRequestNode(in->GetProducer(0)); + } case vm::kPhysicalOpJoin: - case vm::kPhysicalOpUnion: case vm::kPhysicalOpPostRequestUnion: case vm::kPhysicalOpRequestUnion: case vm::kPhysicalOpRequestAggUnion: diff --git a/hybridse/src/vm/mem_catalog.cc b/hybridse/src/vm/mem_catalog.cc index f4f5897f10f..7244dda61af 100644 --- a/hybridse/src/vm/mem_catalog.cc +++ b/hybridse/src/vm/mem_catalog.cc @@ -20,6 +20,23 @@ namespace hybridse { namespace vm { + +std::shared_ptr TableHandler::Cast(std::shared_ptr in) { + switch (in->GetHandlerType()) { + case kRowHandler: { + auto left_table = std::shared_ptr(new MemTableHandler()); + left_table->AddRow(std::dynamic_pointer_cast(in)->GetValue()); + return left_table; + } + default: + return std::dynamic_pointer_cast(in); + } + return nullptr; +} +std::shared_ptr PartitionHandler::Cast(std::shared_ptr in) { + return std::dynamic_pointer_cast(in); +} + MemTimeTableIterator::MemTimeTableIterator(const MemTimeTable* table, const vm::Schema* schema) : table_(table), diff --git a/hybridse/src/vm/physical_op.cc b/hybridse/src/vm/physical_op.cc index aea329e65d5..4deeb7f91f5 100644 --- a/hybridse/src/vm/physical_op.cc +++ b/hybridse/src/vm/physical_op.cc @@ -44,7 +44,7 @@ static absl::flat_hash_map CreatePhysicalOpTy {kPhysicalOpDistinct, "DISTINCT"}, {kPhysicalOpWindow, "WINDOW"}, {kPhysicalOpJoin, "JOIN"}, - {kPhysicalOpUnion, "UNION"}, + {kPhysicalOpSetOperation, "SET_OPERATION"}, {kPhysicalOpPostRequestUnion, "POST_REQUEST_UNION"}, {kPhysicalOpRequestUnion, "REQUEST_UNION"}, {kPhysicalOpRequestAggUnion, "REQUEST_AGG_UNION"}, @@ -102,23 +102,23 @@ void PhysicalOpNode::Print(std::ostream& output, const std::string& tab) const { output << tab << PhysicalOpTypeName(type_); } -bool PhysicalOpNode::IsSameSchema(const vm::Schema& schema, const vm::Schema& exp_schema) const { - if (schema.size() != exp_schema.size()) { +bool PhysicalOpNode::IsSameSchema(const codec::Schema* schema, const codec::Schema* exp_schema) { + if (schema->size() != exp_schema->size()) { LOG(WARNING) << "Schemas size aren't consistent: " - << "expect size " << exp_schema.size() << ", real size " << schema.size(); + << "expect size " << exp_schema->size() << ", real size " << schema->size(); return false; } - for (int i = 0; i < schema.size(); i++) { - if (schema.Get(i).name() != exp_schema.Get(i).name()) { + for (int i = 0; i < schema->size(); i++) { + if (schema->Get(i).name() != exp_schema->Get(i).name()) { LOG(WARNING) << "Schemas aren't consistent:\n" - << exp_schema.Get(i).DebugString() << "vs:\n" - << schema.Get(i).DebugString(); + << exp_schema->Get(i).DebugString() << "vs:\n" + << schema->Get(i).DebugString(); return false; } - if (schema.Get(i).type() != exp_schema.Get(i).type()) { + if (schema->Get(i).type() != exp_schema->Get(i).type()) { LOG(WARNING) << "Schemas aren't consistent:\n" - << exp_schema.Get(i).DebugString() << "vs:\n" - << schema.Get(i).DebugString(); + << exp_schema->Get(i).DebugString() << "vs:\n" + << schema->Get(i).DebugString(); return false; } } @@ -138,8 +138,14 @@ base::Status PhysicalOpNode::SchemaStartWith(const vm::Schema& lhs, const vm::Sc void PhysicalOpNode::Print() const { this->Print(std::cout, " "); } -void PhysicalOpNode::PrintChildren(std::ostream& output, const std::string& tab) const {} -void PhysicalOpNode::UpdateProducer(int i, PhysicalOpNode* producer) { producers_[i] = producer; } +void PhysicalOpNode::PrintChildren(std::ostream& output, const std::string& tab) const { + for (size_t i = 0; i < producers_.size(); ++i) { + producers_[i]->Print(output, tab + INDENT); + if (i + i < producers_.size()) { + output << "\n"; + } + } +} void PhysicalUnaryNode::PrintChildren(std::ostream& output, const std::string& tab) const { if (producers_.empty() || nullptr == producers_[0]) { LOG(WARNING) << "empty producers"; @@ -524,7 +530,9 @@ PhysicalSimpleProjectNode* PhysicalSimpleProjectNode::CastFrom(PhysicalOpNode* n return dynamic_cast(node); } -PhysicalUnionNode* PhysicalUnionNode::CastFrom(PhysicalOpNode* node) { return dynamic_cast(node); } +PhysicalSetOperationNode* PhysicalSetOperationNode::CastFrom(PhysicalOpNode* node) { + return dynamic_cast(node); +} PhysicalPostRequestUnionNode* PhysicalPostRequestUnionNode::CastFrom(PhysicalOpNode* node) { return dynamic_cast(node); @@ -861,7 +869,7 @@ bool PhysicalWindowAggrerationNode::AddWindowUnion(PhysicalOpNode* node) { return false; } } else { - if (!IsSameSchema(*node->GetOutputSchema(), *producers_[0]->GetOutputSchema())) { + if (!IsSameSchema(node->GetOutputSchema(), producers_[0]->GetOutputSchema())) { LOG(WARNING) << "Union Table and window input schema aren't consistent"; return false; } @@ -1215,7 +1223,7 @@ std::string PhysicalOpNode::SchemaToString(const std::string& tab) const { std::vector PhysicalOpNode::GetDependents() const { return GetProducers(); } -Status PhysicalUnionNode::InitSchema(PhysicalPlanContext* ctx) { +Status PhysicalSetOperationNode::InitSchema(PhysicalPlanContext* ctx) { CHECK_TRUE(!producers_.empty(), common::kPlanError, "Empty union"); schemas_ctx_.Clear(); schemas_ctx_.SetDefaultDBName(ctx->db()); @@ -1223,16 +1231,16 @@ Status PhysicalUnionNode::InitSchema(PhysicalPlanContext* ctx) { return Status::OK(); } -Status PhysicalUnionNode::WithNewChildren(node::NodeManager* nm, const std::vector& children, - PhysicalOpNode** out) { - CHECK_TRUE(children.size() == 2, common::kPlanError); - *out = nm->RegisterNode(new PhysicalUnionNode(children[0], children[1], is_all_)); +Status PhysicalSetOperationNode::WithNewChildren(node::NodeManager* nm, const std::vector& children, + PhysicalOpNode** out) { + absl::Span sp = absl::MakeSpan(children); + *out = nm->RegisterNode(new PhysicalSetOperationNode(op_type_, sp, distinct_)); return Status::OK(); } -void PhysicalUnionNode::Print(std::ostream& output, const std::string& tab) const { +void PhysicalSetOperationNode::Print(std::ostream& output, const std::string& tab) const { PhysicalOpNode::Print(output, tab); - output << "\n"; + output << "(" << node::SetOperatorName(op_type_, distinct_) << ")\n"; PrintChildren(output, tab); } @@ -1643,5 +1651,81 @@ Status BuildColumnReplacement(const node::ExprNode* expr, const SchemasContext* return Status::OK(); } +absl::StatusOr PhysicalOpNode::TraceColID(size_t col_id) const { + size_t sc_idx; + size_t col_idx; + auto s = schemas_ctx()->ResolveColumnIndexByID(col_id, &sc_idx, &col_idx); + if (!s.isOK()) { + return absl::NotFoundError(s.msg); + } + + auto source = schemas_ctx()->GetSchemaSource(sc_idx); + auto path_idx = source->GetSourceChildIdx(col_idx); + int child_col_id = source->GetSourceColumnID(col_idx); + + return ColProducerTraceInfo{{path_idx, child_col_id}}; +} +absl::StatusOr PhysicalOpNode::TraceColID(absl::string_view col_name) const { + size_t sc_idx; + size_t col_idx; + auto s = schemas_ctx()->ResolveColumnIndexByName("", "", std::string(col_name), &sc_idx, &col_idx); + if (!s.isOK()) { + return absl::NotFoundError(s.msg); + } + + auto source = schemas_ctx()->GetSchemaSource(sc_idx); + auto path_idx = source->GetSourceChildIdx(col_idx); + int child_col_id = source->GetSourceColumnID(col_idx); + + return ColProducerTraceInfo{{path_idx, child_col_id}}; +} +absl::StatusOr PhysicalOpNode::TraceLastDescendants(size_t col_id) const { + ColLastDescendantTraceInfo trace_info; + auto info = TraceColID(col_id); + if (!info.ok()) { + return info.status(); + } + for (auto entry : info.value()) { + if (entry.first < 0) { + trace_info.emplace_back(this, col_id); + continue; + } + + auto res = GetProducer(entry.first)->TraceLastDescendants(entry.second); + if (!res.ok()) { + return res.status(); + } + + for (auto& e : res.value()) { + trace_info.emplace_back(e.first, e.second); + } + } + + return trace_info; +} +absl::StatusOr PhysicalSetOperationNode::TraceColID(size_t col_id) const { + std::string col_name; + auto s = schemas_ctx()->ResolveColumnNameByID(col_id, &col_name); + if (!s.isOK()) { + return absl::NotFoundError(s.msg); + } + + return TraceColID(col_name); +} +absl::StatusOr PhysicalSetOperationNode::TraceColID(absl::string_view col_name) const { + ColProducerTraceInfo vec; + + // every producer node in set operation is able to backtrace columns in current node context + for (int i = 0; i < GetProducerCnt(); ++i) { + size_t child_col_id = 0; + auto s = GetProducer(i)->schemas_ctx()->ResolveColumnID("", "", std::string(col_name), &child_col_id); + if (!s.isOK()) { + return absl::NotFoundError(s.msg); + } + vec.emplace_back(i, child_col_id); + } + + return vec; +} } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/runner.cc b/hybridse/src/vm/runner.cc index eb284e6e945..e44ec7b7d4b 100644 --- a/hybridse/src/vm/runner.cc +++ b/hybridse/src/vm/runner.cc @@ -40,6 +40,57 @@ namespace vm { #define MAX_DEBUG_LINES_CNT 20 #define MAX_DEBUG_COLUMN_MAX 20 +static absl::flat_hash_map CreateRunnerTypeToNamesMap() { + absl::flat_hash_map map = { + {kRunnerData, "DATA"}, + {kRunnerRequest, "REQUEST"}, + {kRunnerGroup, "GROUP"}, + {kRunnerGroupAndSort, "GROUP_AND_SORT"}, + {kRunnerFilter, "FILTER"}, + {kRunnerConstProject, "CONST_PROJECT"}, + {kRunnerTableProject, "TABLE_PROJECT"}, + {kRunnerRowProject, "ROW_PROJECT"}, + {kRunnerSimpleProject, "SIMPLE_PROJECT"}, + {kRunnerSelectSlice, "SELECT_SLICE"}, + {kRunnerGroupAgg, "GROUP_AGG_PROJECT"}, + {kRunnerAgg, "AGG_PROJECT"}, + {kRunnerReduce, "REDUCE_PROJECT"}, + {kRunnerWindowAgg, "WINDOW_AGG_PROJECT"}, + {kRunnerRequestUnion, "REQUEST_UNION"}, + {kRunnerRequestAggUnion, "REQUEST_AGG_UNION"}, + {kRunnerPostRequestUnion, "POST_REQUEST_UNION"}, + {kRunnerIndexSeek, "INDEX_SEEK"}, + {kRunnerJoin, "JOIN"}, + {kRunnerConcat, "CONCAT"}, + {kRunnerRequestJoin, "REQUEST_JOIN"}, + {kRunnerLimit, "LIMIT"}, + {kRunnerRequestRunProxy, "REQUEST_RUN_PROXY"}, + {kRunnerBatchRequestRunProxy, "BATCH_REQUEST_RUN_PROXY"}, + {kRunnerOrder, "ORDRE"}, + {kRunnerSetOperation, "SET_OPERATION"}, + {kRunnerUnknow, "UNKOWN_RUNNER"}, + }; + for (auto kind = 0; kind < RunnerType::kRunnerUnknow; ++kind) { + DCHECK(map.find(static_cast(kind)) != map.end()) + << "name of " << kind << " not exist"; + } + return map; +} + +static const auto& GetRunnerTypeToNamesMap() { + static const auto &map = *new auto(CreateRunnerTypeToNamesMap()); + return map; +} + +std::string RunnerTypeName(RunnerType type) { + auto& map = GetRunnerTypeToNamesMap(); + auto it = map.find(type); + if (it != map.end()) { + return std::string(it->second); + } + return "kUnknow"; +} + bool Runner::GetColumnBool(const int8_t* buf, const RowView* row_view, int idx, type::Type type) { bool key = false; @@ -2557,5 +2608,28 @@ int32_t IteratorStatus::FindFirstIteratorWithMaximizeKey(const std::vector SetOperationRunner::Run(RunnerContext& ctx, + const std::vector>& inputs) { + bool opt = true; + for (auto& n : inputs) { + if (n->GetHandlerType() != kPartitionHandler) { + opt = false; + break; + } + } + if (opt) { + std::vector> in; + for (auto n : inputs) { + in.emplace_back(PartitionHandler::Cast(n)); + } + return std::shared_ptr(new SetOperationPartitionHandler(op_type_, in, distinct_)); + } + + std::vector> in; + for (auto n : inputs) { + in.emplace_back(TableHandler::Cast(n)); + } + return std::shared_ptr(new SetOperationHandler(op_type_, in, distinct_)); +} } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/runner.h b/hybridse/src/vm/runner.h index b40130db812..3a2ea0416d7 100644 --- a/hybridse/src/vm/runner.h +++ b/hybridse/src/vm/runner.h @@ -74,62 +74,11 @@ enum RunnerType { kRunnerRequestJoin, kRunnerBatchRequestRunProxy, kRunnerLimit, + kRunnerSetOperation, kRunnerUnknow, }; -inline const std::string RunnerTypeName(const RunnerType& type) { - switch (type) { - case kRunnerData: - return "DATA"; - case kRunnerRequest: - return "REQUEST"; - case kRunnerGroup: - return "GROUP"; - case kRunnerGroupAndSort: - return "GROUP_AND_SORT"; - case kRunnerFilter: - return "FILTER"; - case kRunnerConstProject: - return "CONST_PROJECT"; - case kRunnerTableProject: - return "TABLE_PROJECT"; - case kRunnerRowProject: - return "ROW_PROJECT"; - case kRunnerSimpleProject: - return "SIMPLE_PROJECT"; - case kRunnerSelectSlice: - return "SELECT_SLICE"; - case kRunnerGroupAgg: - return "GROUP_AGG_PROJECT"; - case kRunnerAgg: - return "AGG_PROJECT"; - case kRunnerReduce: - return "REDUCE_PROJECT"; - case kRunnerWindowAgg: - return "WINDOW_AGG_PROJECT"; - case kRunnerRequestUnion: - return "REQUEST_UNION"; - case kRunnerRequestAggUnion: - return "REQUEST_AGG_UNION"; - case kRunnerPostRequestUnion: - return "POST_REQUEST_UNION"; - case kRunnerIndexSeek: - return "INDEX_SEEK"; - case kRunnerJoin: - return "JOIN"; - case kRunnerConcat: - return "CONCAT"; - case kRunnerRequestJoin: - return "REQUEST_JOIN"; - case kRunnerLimit: - return "LIMIT"; - case kRunnerRequestRunProxy: - return "REQUEST_RUN_PROXY"; - case kRunnerBatchRequestRunProxy: - return "BATCH_REQUEST_RUN_PROXY"; - default: - return "UNKNOW"; - } -} + +std::string RunnerTypeName(RunnerType type); class Runner : public node::NodeBase { public: @@ -835,6 +784,22 @@ class ProxyRequestRunner : public Runner { Runner* index_input_; }; +class SetOperationRunner : public Runner { + public: + SetOperationRunner(const int32_t id, const SchemasContext* schema, node::SetOperationType type, bool distinct) + : Runner(id, kRunnerSetOperation, schema), op_type_(type), distinct_(distinct) { + is_lazy_ = true; + } + ~SetOperationRunner() {} + + std::shared_ptr Run(RunnerContext& ctx, // NOLINT + const std::vector>& inputs) override; // NOLINT + + private: + node::SetOperationType op_type_; + bool distinct_ = false; +}; + } // namespace vm } // namespace hybridse #endif // HYBRIDSE_SRC_VM_RUNNER_H_ diff --git a/hybridse/src/vm/runner_builder.cc b/hybridse/src/vm/runner_builder.cc index 5d595ba9785..8a94114ba2f 100644 --- a/hybridse/src/vm/runner_builder.cc +++ b/hybridse/src/vm/runner_builder.cc @@ -332,7 +332,7 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { return RegisterTask(node, BinaryInherit(left_task, right_task, runner, op->join().index_key(), kLeftBias)); } else { - // optimize happens before, in left node + // optimize happens before, in right node auto right_route_info = right_task.GetRouteInfo(); runner->AddProducer(left_task.GetRoot()); runner->AddProducer(right_task.GetRoot()); @@ -521,6 +521,31 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { CreateRunner(id_++, node->schemas_ctx(), union_op->request_ts()); return RegisterTask(node, BinaryInherit(left_task, right_task, runner, Key(), kRightBias)); } + case kPhysicalOpSetOperation: { + auto set = dynamic_cast(node); + auto set_runner = + CreateRunner(id_++, node->schemas_ctx(), set->op_type_, set->distinct_); + std::vector tasks; + for (auto n : node->GetProducers()) { + auto task = Build(n, status); + if (!status.isOK()) { + LOG(WARNING) << status; + return fail; + } + set_runner->AddProducer(task.GetRoot()); + + tasks.push_back(task); + } + if (support_cluster_optimized_) { + // first cluster task + for (auto & task : tasks) { + if (task.IsClusterTask()) { + return RegisterTask(node, ClusterTask(set_runner, {}, task.GetRouteInfo())); + } + } + } + return RegisterTask(node, ClusterTask(set_runner)); + } default: { status.code = common::kExecutionPlanError; status.msg = absl::StrCat("Non-support node ", PhysicalOpTypeName(node->GetOpType()), diff --git a/hybridse/src/vm/schemas_context.cc b/hybridse/src/vm/schemas_context.cc index 7d794bc8f92..42e8ad2b832 100644 --- a/hybridse/src/vm/schemas_context.cc +++ b/hybridse/src/vm/schemas_context.cc @@ -740,10 +740,12 @@ void SchemasContext::BuildTrivial( std::string SchemasContext::DebugString() const { std::stringstream ss; - ss << absl::StrCat("{", root_db_name_, ",", root_relation_name_, ",", default_db_name_, ", ", - absl::StrJoin(schema_sources_, ",", [](std::string* out, const SchemaSource* source) { - absl::StrAppend(out, source->DebugString()); - })); + ss << absl::StrCat( + "{db=", root_db_name_, ", table=", root_relation_name_, ", default_db=", default_db_name_, ", sources={", + absl::StrJoin( + schema_sources_, ",", + [](std::string* out, const SchemaSource* source) { absl::StrAppend(out, source->DebugString()); }), + "}"); ss << ", id_map={" << absl::StrJoin(column_id_map_, ",", [](std::string* out, decltype(column_id_map_)::const_reference e) { absl::StrAppend(out, e.first, "=(", e.second.first, ",", e.second.second, ")"); diff --git a/hybridse/src/vm/transform.cc b/hybridse/src/vm/transform.cc index dc67a30c9a8..505522f076c 100644 --- a/hybridse/src/vm/transform.cc +++ b/hybridse/src/vm/transform.cc @@ -181,6 +181,12 @@ Status BatchModeTransformer::TransformPlanOp(const node::PlanNode* node, Physica TransformWithClauseEntry(dynamic_cast(node), &op)); break; } + case ::hybridse::node::kPlanTypeSetOperation: { + PhysicalSetOperationNode* set = nullptr; + CHECK_STATUS(TransformSetOperation(dynamic_cast(node), &set)); + op = set; + break; + } default: { FAIL_STATUS(kPlanError, "Fail to transform physical plan: " @@ -330,6 +336,33 @@ Status BatchModeTransformer::InitFnInfo(PhysicalOpNode* node, return Status::OK(); } +Status BatchModeTransformer::TransformSetOperation(const node::SetOperationPlanNode* node, + PhysicalSetOperationNode** out) { + CHECK_TRUE(node != nullptr && out != nullptr, kPlanError, "Input node or output node is null"); + + CHECK_TRUE(!node->distinct(), common::kPhysicalPlanError, "un-implemented: UNION DISTINCT"); + + std::vector inputs; + const SchemasContext* expect_sc = nullptr; + for (auto n : node->inputs()) { + PhysicalOpNode* query_out = nullptr; + CHECK_STATUS(TransformQueryPlan(n, &query_out)); + + if (expect_sc == nullptr) { + expect_sc = query_out->schemas_ctx(); + } else { + CHECK_TRUE(PhysicalOpNode::IsSameSchema(expect_sc->GetOutputSchema(), query_out->GetOutputSchema()), + common::kPlanError, "union sources have different schema: ", expect_sc->ReadableString(), " vs ", + query_out->schemas_ctx()->ReadableString()); + } + inputs.push_back(query_out); + } + PhysicalSetOperationNode* set = nullptr; + CHECK_STATUS(CreateOp(&set, node->op_type(), inputs, node->distinct())); + *out = set; + return Status::OK(); +} + Status BatchModeTransformer::TransformLimitOp(const node::LimitPlanNode* node, PhysicalOpNode** output) { CHECK_TRUE(node != nullptr && output != nullptr, kPlanError, @@ -1667,6 +1700,10 @@ Status BatchModeTransformer::ValidatePartitionDataProvider(PhysicalOpNode* in) { for (auto& window_union : n->window_unions().window_unions_) { CHECK_STATUS(ValidateWindowIndexOptimization(window_union.second, window_union.first)); } + } else if (kPhysicalOpSetOperation == in->GetOpType()) { + for (auto n : in->GetProducers()) { + CHECK_STATUS(ValidatePartitionDataProvider(n)); + } } else { CHECK_TRUE(kPhysicalOpDataProvider == in->GetOpType() && kProviderTypeTable != dynamic_cast(in)->provider_type_, @@ -1927,10 +1964,6 @@ Status BatchModeTransformer::TransformPhysicalPlan(const ::hybridse::node::PlanN *output = nullptr; break; } - case ::hybridse::node::kPlanTypeUnion: { - FAIL_STATUS(kPlanError, "Non-support UNION OP"); - break; - } case node::kPlanTypeCreate: { CHECK_STATUS(TransformCreateTableOp(dynamic_cast(node), output), "Fail to transform create table op"); diff --git a/hybridse/src/vm/transform.h b/hybridse/src/vm/transform.h index 45c4d9660e7..acd86be587f 100644 --- a/hybridse/src/vm/transform.h +++ b/hybridse/src/vm/transform.h @@ -156,6 +156,8 @@ class BatchModeTransformer { protected: Status TransformPlanOp(const ::hybridse::node::PlanNode* node, ::hybridse::vm::PhysicalOpNode** ouput); + Status TransformSetOperation(const node::SetOperationPlanNode* node, PhysicalSetOperationNode** out); + virtual Status TransformLimitOp(const node::LimitPlanNode* node, PhysicalOpNode** output); virtual Status TransformProjectPlanOp(const node::ProjectPlanNode*, PhysicalOpNode**); diff --git a/src/sdk/sql_sdk_test.h b/src/sdk/sql_sdk_test.h index 5a020d144cb..4945686b4c9 100644 --- a/src/sdk/sql_sdk_test.h +++ b/src/sdk/sql_sdk_test.h @@ -140,6 +140,8 @@ INSTANTIATE_TEST_SUITE_P(SQLSDKTestWhere, SQLSDKQueryTest, testing::ValuesIn(SQLSDKQueryTest::InitCases("cases/function/select/test_where.yaml"))); INSTANTIATE_TEST_SUITE_P(WithClause, SQLSDKQueryTest, testing::ValuesIn(SQLSDKQueryTest::InitCases("cases/query/with.yaml"))); +INSTANTIATE_TEST_SUITE_P(UnionQuery, SQLSDKQueryTest, + testing::ValuesIn(SQLSDKQueryTest::InitCases("cases/query/union_query.yml"))); // Test Multiple Databases INSTANTIATE_TEST_SUITE_P(