From a6e58df07a71db9d0d30d0b029f2d8e6071d6145 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 26 Nov 2024 21:17:54 +0800 Subject: [PATCH] add tpch test Signed-off-by: xxchan --- .pre-commit-config.yaml | 1 + ...fka.part => create_sources_kafka.slt.part} | 0 e2e_test/nexmark/drop_sources_kafka.slt.part | 8 + e2e_test/nexmark/produce_kafka.slt.part | 6 +- e2e_test/source_inline/kafka/nexmark.slt | 5 +- e2e_test/source_inline/kafka/tpch.slt | 30 + e2e_test/tpch/create_sources_kafka.slt.part | 118 ++ e2e_test/tpch/drop_sources_kafka.slt.part | 23 + e2e_test/tpch/produce_kafka.slt.part | 131 ++ .../tests/testdata/input/tpch_kafka.yaml | 902 ++++++++++ .../tests/testdata/output/tpch_kafka.yaml | 1513 +++++++++++++++++ 11 files changed, 2732 insertions(+), 5 deletions(-) rename e2e_test/nexmark/{create_sources_kafka.part => create_sources_kafka.slt.part} (100%) create mode 100644 e2e_test/nexmark/drop_sources_kafka.slt.part create mode 100644 e2e_test/source_inline/kafka/tpch.slt create mode 100644 e2e_test/tpch/create_sources_kafka.slt.part create mode 100644 e2e_test/tpch/drop_sources_kafka.slt.part create mode 100644 e2e_test/tpch/produce_kafka.slt.part create mode 100644 src/frontend/planner_test/tests/testdata/input/tpch_kafka.yaml create mode 100644 src/frontend/planner_test/tests/testdata/output/tpch_kafka.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cb54c1606356e..cfb56d052daf4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,6 +7,7 @@ repos: hooks: - id: end-of-file-fixer - id: trailing-whitespace + exclude: 'src/frontend/planner_test/tests/testdata/.*' - repo: https://github.com/crate-ci/typos rev: v1.23.1 hooks: diff --git a/e2e_test/nexmark/create_sources_kafka.part b/e2e_test/nexmark/create_sources_kafka.slt.part similarity index 100% rename from e2e_test/nexmark/create_sources_kafka.part rename to e2e_test/nexmark/create_sources_kafka.slt.part diff --git a/e2e_test/nexmark/drop_sources_kafka.slt.part b/e2e_test/nexmark/drop_sources_kafka.slt.part new file mode 100644 index 0000000000000..998b1b07c12d1 --- /dev/null +++ b/e2e_test/nexmark/drop_sources_kafka.slt.part @@ -0,0 +1,8 @@ +statement ok +DROP SOURCE person CASCADE; + +statement ok +DROP SOURCE auction CASCADE; + +statement ok +DROP SOURCE bid CASCADE; diff --git a/e2e_test/nexmark/produce_kafka.slt.part b/e2e_test/nexmark/produce_kafka.slt.part index 676607d7137f3..719511311b836 100644 --- a/e2e_test/nexmark/produce_kafka.slt.part +++ b/e2e_test/nexmark/produce_kafka.slt.part @@ -1,10 +1,12 @@ control substitution on system ok -rpk topic delete nexmark-events || true +rpk topic delete -r nexmark-* || true system ok -rpk topic create nexmark-events -p 4 +rpk topic create nexmark-auction -p 4 && +rpk topic create nexmark-bid -p 4 && +rpk topic create nexmark-person -p 4 include ./create_tables.slt.part diff --git a/e2e_test/source_inline/kafka/nexmark.slt b/e2e_test/source_inline/kafka/nexmark.slt index 233e3b357bc91..9460a0190b534 100644 --- a/e2e_test/source_inline/kafka/nexmark.slt +++ b/e2e_test/source_inline/kafka/nexmark.slt @@ -1,10 +1,9 @@ include ../../nexmark/produce_kafka.slt.part -include ../../nexmark/create_sources_kafka.part +include ../../nexmark/create_sources_kafka.slt.part control substitution off include ../../streaming/nexmark/create_views.slt.part include ../../streaming/nexmark/test_mv_result.slt.part -statement ok -drop source nexmark cascade; +include ../../nexmark/drop_sources_kafka.slt.part diff --git a/e2e_test/source_inline/kafka/tpch.slt b/e2e_test/source_inline/kafka/tpch.slt new file mode 100644 index 0000000000000..85fed5034af4d --- /dev/null +++ b/e2e_test/source_inline/kafka/tpch.slt @@ -0,0 +1,30 @@ +include ../../tpch/produce_kafka.slt.part +include ../../tpch/create_sources_kafka.slt.part + +control substitution off + +include ../../streaming/tpch/create_views.slt.part +include ../../streaming/tpch/q1.slt.part +include ../../streaming/tpch/q2.slt.part +include ../../streaming/tpch/q3.slt.part +include ../../streaming/tpch/q4.slt.part +include ../../streaming/tpch/q5.slt.part +include ../../streaming/tpch/q6.slt.part +include ../../streaming/tpch/q7.slt.part +include ../../streaming/tpch/q8.slt.part +include ../../streaming/tpch/q9.slt.part +include ../../streaming/tpch/q10.slt.part +include ../../streaming/tpch/q11.slt.part +include ../../streaming/tpch/q12.slt.part +include ../../streaming/tpch/q13.slt.part +include ../../streaming/tpch/q14.slt.part +include ../../streaming/tpch/q15.slt.part +include ../../streaming/tpch/q16.slt.part +include ../../streaming/tpch/q17.slt.part +include ../../streaming/tpch/q18.slt.part +include ../../streaming/tpch/q19.slt.part +include ../../streaming/tpch/q20.slt.part +include ../../streaming/tpch/q21.slt.part +include ../../streaming/tpch/q22.slt.part + +include ../../tpch/drop_sources_kafka.slt.part diff --git a/e2e_test/tpch/create_sources_kafka.slt.part b/e2e_test/tpch/create_sources_kafka.slt.part new file mode 100644 index 0000000000000..9dcc50e7d1f0e --- /dev/null +++ b/e2e_test/tpch/create_sources_kafka.slt.part @@ -0,0 +1,118 @@ +control substitution on + +statement ok +CREATE SOURCE supplier ( + s_suppkey INTEGER, + s_name VARCHAR, + s_address VARCHAR, + s_nationkey INTEGER, + s_phone VARCHAR, + s_acctbal NUMERIC, + s_comment VARCHAR +) WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-supplier' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SOURCE part ( + p_partkey INTEGER, + p_name VARCHAR, + p_mfgr VARCHAR, + p_brand VARCHAR, + p_type VARCHAR, + p_size INTEGER, + p_container VARCHAR, + p_retailprice NUMERIC, + p_comment VARCHAR +) WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-part' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SOURCE partsupp ( + ps_partkey INTEGER, + ps_suppkey INTEGER, + ps_availqty INTEGER, + ps_supplycost NUMERIC, + ps_comment VARCHAR +) WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-partsupp' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SOURCE customer ( + c_custkey INTEGER, + c_name VARCHAR, + c_address VARCHAR, + c_nationkey INTEGER, + c_phone VARCHAR, + c_acctbal NUMERIC, + c_mktsegment VARCHAR, + c_comment VARCHAR +) WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-customer' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SOURCE orders ( + o_orderkey BIGINT, + o_custkey INTEGER, + o_orderstatus VARCHAR, + o_totalprice NUMERIC, + o_orderdate DATE, + o_orderpriority VARCHAR, + o_clerk VARCHAR, + o_shippriority INTEGER, + o_comment VARCHAR +) WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-orders' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SOURCE lineitem ( + l_orderkey BIGINT, + l_partkey INTEGER, + l_suppkey INTEGER, + l_linenumber INTEGER, + l_quantity NUMERIC, + l_extendedprice NUMERIC, + l_discount NUMERIC, + l_tax NUMERIC, + l_returnflag VARCHAR, + l_linestatus VARCHAR, + l_shipdate DATE, + l_commitdate DATE, + l_receiptdate DATE, + l_shipinstruct VARCHAR, + l_shipmode VARCHAR, + l_comment VARCHAR +) WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-lineitem' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SOURCE nation ( + n_nationkey INTEGER, + n_name VARCHAR, + n_regionkey INTEGER, + n_comment VARCHAR +) WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-nation' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SOURCE region ( + r_regionkey INTEGER, + r_name VARCHAR, + r_comment VARCHAR +) WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-region' +) FORMAT PLAIN ENCODE JSON; diff --git a/e2e_test/tpch/drop_sources_kafka.slt.part b/e2e_test/tpch/drop_sources_kafka.slt.part new file mode 100644 index 0000000000000..2c239fcbb422f --- /dev/null +++ b/e2e_test/tpch/drop_sources_kafka.slt.part @@ -0,0 +1,23 @@ +statement ok +DROP SOURCE supplier CASCADE; + +statement ok +DROP SOURCE region CASCADE; + +statement ok +DROP SOURCE nation CASCADE; + +statement ok +DROP SOURCE lineitem CASCADE; + +statement ok +DROP SOURCE orders CASCADE; + +statement ok +DROP SOURCE customer CASCADE; + +statement ok +DROP SOURCE partsupp CASCADE; + +statement ok +DROP SOURCE part CASCADE; diff --git a/e2e_test/tpch/produce_kafka.slt.part b/e2e_test/tpch/produce_kafka.slt.part new file mode 100644 index 0000000000000..07b476a3a9f30 --- /dev/null +++ b/e2e_test/tpch/produce_kafka.slt.part @@ -0,0 +1,131 @@ +control substitution on + +system ok +rpk topic delete -r tpch-* || true + +system ok +rpk topic create tpch-supplier -p 4 && +rpk topic create tpch-part -p 4 && +rpk topic create tpch-partsupp -p 4 && +rpk topic create tpch-customer -p 4 && +rpk topic create tpch-orders -p 4 && +rpk topic create tpch-lineitem -p 4 && +rpk topic create tpch-nation -p 4 && +rpk topic create tpch-region -p 4 + +include ./create_tables.slt.part + +include ./insert_supplier.slt.part +include ./insert_part.slt.part +include ./insert_partsupp.slt.part +include ./insert_customer.slt.part +include ./insert_orders.slt.part +include ./insert_lineitem.slt.part +include ./insert_nation.slt.part +include ./insert_region.slt.part + +statement ok +flush; + +statement ok +create sink kafka_supplier FROM supplier +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-supplier' +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true' +); + +statement ok +create sink kafka_part FROM part +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-part' +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true' +); + +statement ok +create sink kafka_partsupp FROM partsupp +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-partsupp' +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true' +); + +statement ok +create sink kafka_customer FROM customer +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-customer' +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true' +); + +# note: In source, Date format is days_since_unix_epoch. In sink, it's num_days_from_ce. +# https://github.com/risingwavelabs/risingwave/issues/16467 + +statement ok +create sink kafka_orders AS select * except(o_orderdate), o_orderdate::varchar as o_orderdate FROM orders +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-orders' +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true' +); + +statement ok +create sink kafka_lineitem AS select * except(l_shipdate, l_commitdate, l_receiptdate), l_shipdate::varchar as l_shipdate, l_commitdate::varchar as l_commitdate, l_receiptdate::varchar as l_receiptdate FROM lineitem +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-lineitem' +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true' +); + +statement ok +create sink kafka_nation FROM nation +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-nation' +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true' +); + +statement ok +create sink kafka_region FROM region +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'tpch-region' +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true' +); + +sleep 5s + +statement ok +DROP SINK kafka_supplier; + +statement ok +DROP SINK kafka_part; + +statement ok +DROP SINK kafka_partsupp; + +statement ok +DROP SINK kafka_customer; + +statement ok +DROP SINK kafka_orders; + +statement ok +DROP SINK kafka_lineitem; + +statement ok +DROP SINK kafka_nation; + +statement ok +DROP SINK kafka_region; + +include ./drop_tables.slt.part diff --git a/src/frontend/planner_test/tests/testdata/input/tpch_kafka.yaml b/src/frontend/planner_test/tests/testdata/input/tpch_kafka.yaml new file mode 100644 index 0000000000000..6372549b17573 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/tpch_kafka.yaml @@ -0,0 +1,902 @@ +- id: create_tables + sql: | + CREATE SOURCE supplier ( + s_suppkey INTEGER, + s_name VARCHAR, + s_address VARCHAR, + s_nationkey INTEGER, + s_phone VARCHAR, + s_acctbal NUMERIC, + s_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'supplier', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE part ( + p_partkey INTEGER, + p_name VARCHAR, + p_mfgr VARCHAR, + p_brand VARCHAR, + p_type VARCHAR, + p_size INTEGER, + p_container VARCHAR, + p_retailprice NUMERIC, + p_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'part', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE partsupp ( + ps_partkey INTEGER, + ps_suppkey INTEGER, + ps_availqty INTEGER, + ps_supplycost NUMERIC, + ps_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'partsupp', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE customer ( + c_custkey INTEGER, + c_name VARCHAR, + c_address VARCHAR, + c_nationkey INTEGER, + c_phone VARCHAR, + c_acctbal NUMERIC, + c_mktsegment VARCHAR, + c_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'customer', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE orders ( + o_orderkey BIGINT, + o_custkey INTEGER, + o_orderstatus VARCHAR, + o_totalprice NUMERIC, + o_orderdate DATE, + o_orderpriority VARCHAR, + o_clerk VARCHAR, + o_shippriority INTEGER, + o_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'orders', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE lineitem ( + l_orderkey BIGINT, + l_partkey INTEGER, + l_suppkey INTEGER, + l_linenumber INTEGER, + l_quantity NUMERIC, + l_extendedprice NUMERIC, + l_discount NUMERIC, + l_tax NUMERIC, + l_returnflag VARCHAR, + l_linestatus VARCHAR, + l_shipdate DATE, + l_commitdate DATE, + l_receiptdate DATE, + l_shipinstruct VARCHAR, + l_shipmode VARCHAR, + l_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'lineitem', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE nation ( + n_nationkey INTEGER, + n_name VARCHAR, + n_regionkey INTEGER, + n_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'nation', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE region ( + r_regionkey INTEGER, + r_name VARCHAR, + r_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'region', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + expected_outputs: [] +- id: tpch_q1 + before: + - create_tables + sql: | + select + l_returnflag, + l_linestatus, + sum(l_quantity) as sum_qty, + sum(l_extendedprice) as sum_base_price, + sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + avg(l_quantity) as avg_qty, + avg(l_extendedprice) as avg_price, + avg(l_discount) as avg_disc, + count(*) as count_order + from + lineitem + where + l_shipdate <= date '1998-12-01' - interval '71' day + group by + l_returnflag, + l_linestatus + order by + l_returnflag, + l_linestatus + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q2 + before: + - create_tables + sql: | + select + s_acctbal, + s_name, + n_name, + p_partkey, + p_mfgr, + s_address, + s_phone, + s_comment + from + part, + supplier, + partsupp, + nation, + region + where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and p_size = 4 + and p_type like '%TIN' + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'AFRICA' + and ps_supplycost = ( + select + min(ps_supplycost) + from + partsupp, + supplier, + nation, + region + where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'AFRICA' + ) + order by + s_acctbal desc, + n_name, + s_name, + p_partkey + LIMIT 100; + expected_outputs: + - stream_plan +- id: tpch_q3 + before: + - create_tables + sql: | + select + l_orderkey, + sum(l_extendedprice * (1 - l_discount)) as revenue, + o_orderdate, + o_shippriority + from + customer, + orders, + lineitem + where + c_mktsegment = 'FURNITURE' + and c_custkey = o_custkey + and l_orderkey = o_orderkey + and o_orderdate < date '1995-03-29' + and l_shipdate > date '1995-03-29' + group by + l_orderkey, + o_orderdate, + o_shippriority + order by + revenue desc, + o_orderdate + LIMIT 10; + expected_outputs: + - stream_plan +- id: tpch_q4 + before: + - create_tables + sql: | + select + o_orderpriority, + count(*) as order_count + from + orders + where + o_orderdate >= date '1997-07-01' + and o_orderdate < date '1997-07-01' + interval '3' month + and exists ( + select + * + from + lineitem + where + l_orderkey = o_orderkey + and l_commitdate < l_receiptdate + ) + group by + o_orderpriority + order by + o_orderpriority + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q5 + before: + - create_tables + sql: | + select + n_name, + sum(l_extendedprice * (1 - l_discount)) as revenue + from + customer, + orders, + lineitem, + supplier, + nation, + region + where + c_custkey = o_custkey + and l_orderkey = o_orderkey + and l_suppkey = s_suppkey + and c_nationkey = s_nationkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'MIDDLE EAST' + and o_orderdate >= date '1994-01-01' + and o_orderdate < date '1994-01-01' + interval '1' year + group by + n_name + order by + revenue desc + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q6 + before: + - create_tables + sql: | + select + sum(l_extendedprice * l_discount) as revenue + from + lineitem + where + l_shipdate >= date '1994-01-01' + and l_shipdate < date '1994-01-01' + interval '1' year + and l_discount between 0.08 - 0.01 and 0.08 + 0.01 + and l_quantity < 24; + expected_outputs: + - stream_plan +- id: tpch_q7 + before: + - create_tables + sql: | + select + supp_nation, + cust_nation, + l_year, + sum(volume) as revenue + from + ( + select + n1.n_name as supp_nation, + n2.n_name as cust_nation, + extract(year from l_shipdate) as l_year, + l_extendedprice * (1 - l_discount) as volume + from + supplier, + lineitem, + orders, + customer, + nation n1, + nation n2 + where + s_suppkey = l_suppkey + and o_orderkey = l_orderkey + and c_custkey = o_custkey + and s_nationkey = n1.n_nationkey + and c_nationkey = n2.n_nationkey + and ( + (n1.n_name = 'ROMANIA' and n2.n_name = 'IRAN') + or (n1.n_name = 'IRAN' and n2.n_name = 'ROMANIA') + ) + and l_shipdate between date '1983-01-01' and date '2000-12-31' + ) as shipping + group by + supp_nation, + cust_nation, + l_year + order by + supp_nation, + cust_nation, + l_year + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q8 + before: + - create_tables + sql: | + select + o_year, + sum(case + when nation = 'IRAN' then volume + else 0 + end) / sum(volume) as mkt_share + from + ( + select + extract(year from o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) as volume, + n2.n_name as nation + from + part, + supplier, + lineitem, + orders, + customer, + nation n1, + nation n2, + region + where + p_partkey = l_partkey + and s_suppkey = l_suppkey + and l_orderkey = o_orderkey + and o_custkey = c_custkey + and c_nationkey = n1.n_nationkey + and n1.n_regionkey = r_regionkey + and r_name = 'ASIA' + and s_nationkey = n2.n_nationkey + and o_orderdate between date '1995-01-01' and date '1996-12-31' + and p_type = 'PROMO ANODIZED STEEL' + ) as all_nations + group by + o_year + order by + o_year + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q9 + before: + - create_tables + sql: | + select + nation, + o_year, + sum(amount) as sum_profit + from + ( + select + n_name as nation, + extract(year from o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount + from + part, + supplier, + lineitem, + partsupp, + orders, + nation + where + s_suppkey = l_suppkey + and ps_suppkey = l_suppkey + and ps_partkey = l_partkey + and p_partkey = l_partkey + and o_orderkey = l_orderkey + and s_nationkey = n_nationkey + and p_name like '%yellow%' + ) as profit + group by + nation, + o_year + order by + nation, + o_year desc + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q10 + before: + - create_tables + sql: | + select + c_custkey, + c_name, + sum(l_extendedprice * (1.00 - l_discount)) as revenue, + c_acctbal, + n_name, + c_address, + c_phone, + c_comment + from + customer, + orders, + lineitem, + nation + where + c_custkey = o_custkey + and l_orderkey = o_orderkey + and o_orderdate >= date '1994-01-01' + and o_orderdate < date '1994-01-01' + interval '3' month + and l_returnflag = 'R' + and c_nationkey = n_nationkey + group by + c_custkey, + c_name, + c_acctbal, + c_phone, + n_name, + c_address, + c_comment + order by + revenue desc + LIMIT 20; + expected_outputs: + - stream_plan +- id: tpch_q11 + before: + - create_tables + sql: | + select + ps_partkey, + sum(ps_supplycost * ps_availqty) as value + from + partsupp, + supplier, + nation + where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'ARGENTINA' + group by + ps_partkey + having + sum(ps_supplycost * ps_availqty) > ( + select + sum(ps_supplycost * ps_availqty) * 0.0001000000 + from + partsupp, + supplier, + nation + where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'ARGENTINA' + ) + order by + value desc + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q12 + before: + - create_tables + sql: | + select + l_shipmode, + sum(case + when o_orderpriority = '1-URGENT' + or o_orderpriority = '2-HIGH' + then 1 + else 0 + end) as high_line_count, + sum(case + when o_orderpriority <> '1-URGENT' + and o_orderpriority <> '2-HIGH' + then 1 + else 0 + end) as low_line_count + from + orders, + lineitem + where + o_orderkey = l_orderkey + and l_shipmode in ('FOB', 'SHIP') + and l_commitdate < l_receiptdate + and l_shipdate < l_commitdate + and l_receiptdate >= date '1994-01-01' + and l_receiptdate < date '1994-01-01' + interval '1' year + group by + l_shipmode + order by + l_shipmode + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q13 + before: + - create_tables + sql: | + select + c_count, + count(*) as custdist + from + ( + select + c_custkey, + count(o_orderkey) as c_count + from + customer left outer join orders on + c_custkey = o_custkey + and o_comment not like '%:1%:2%' + group by + c_custkey + ) as c_orders (c_custkey, c_count) + group by + c_count + order by + custdist desc, + c_count desc + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q14 + before: + - create_tables + sql: | + select + 100.00 * sum(case + when p_type like 'PROMO%' + then l_extendedprice * (1 - l_discount) + else 0 + end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue + from + lineitem, + part + where + l_partkey = p_partkey + and l_shipdate >= date '1995-09-01' + and l_shipdate < date '1995-09-01' + interval '1' month; + expected_outputs: + - stream_plan +- id: tpch_q15 + before: + - create_tables + sql: | + with revenue0 (supplier_no, total_revenue) as ( + select + l_suppkey, + sum(l_extendedprice * (1 - l_discount)) + from + lineitem + where + l_shipdate >= date '1993-01-01' + and l_shipdate < date '1993-01-01' + interval '3' month + group by + l_suppkey + ) + select + s_suppkey, + s_name, + s_address, + s_phone, + total_revenue + from + supplier, + revenue0 + where + s_suppkey = supplier_no + and total_revenue = ( + select + max(total_revenue) + from + revenue0 + ) + order by + s_suppkey + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q16 + before: + - create_tables + sql: | + select + p_brand, + p_type, + p_size, + count(distinct ps_suppkey) as supplier_cnt + from + partsupp, + part + where + p_partkey = ps_partkey + and p_brand <> 'Brand#45' + and p_type not like 'SMALL PLATED%' + and p_size in (19, 17, 16, 23, 10, 4, 38, 11) + and ps_suppkey not in ( + select + s_suppkey + from + supplier + where + s_comment like '%Customer%Complaints%' + ) + group by + p_brand, + p_type, + p_size + order by + supplier_cnt desc, + p_brand, + p_type, + p_size + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q17 + before: + - create_tables + sql: | + select + sum(l_extendedprice) / 7.0 as avg_yearly + from + lineitem, + part + where + p_partkey = l_partkey + and p_brand = 'Brand#13' + and p_container = 'JUMBO PKG' + and l_quantity < ( + select + 0.2 * avg(l_quantity) + from + lineitem + where + l_partkey = p_partkey + ); + expected_outputs: + - stream_plan +- id: tpch_q18 + before: + - create_tables + sql: | + select + c_name, + c_custkey, + o_orderkey, + o_orderdate, + o_totalprice, + sum(l_quantity) quantity + from + customer, + orders, + lineitem + where + o_orderkey in ( + select + l_orderkey + from + lineitem + group by + l_orderkey + having + sum(l_quantity) > 1 + ) + and c_custkey = o_custkey + and o_orderkey = l_orderkey + group by + c_name, + c_custkey, + o_orderkey, + o_orderdate, + o_totalprice + order by + o_totalprice desc, + o_orderdate + LIMIT 100; + expected_outputs: + - stream_plan +- id: tpch_q19 + before: + - create_tables + sql: | + select + sum(l_extendedprice* (1 - l_discount)) as revenue + from + lineitem, + part + where + ( + p_partkey = l_partkey + and p_brand = 'Brand#52' + and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') + and l_quantity >= 1 and l_quantity <= 11 + and p_size between 1 and 5 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) + or + ( + p_partkey = l_partkey + and p_brand = 'Brand#24' + and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') + and l_quantity >= 30 and l_quantity <= 40 + and p_size between 1 and 10 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) + or + ( + p_partkey = l_partkey + and p_brand = 'Brand#32' + and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') + and l_quantity >= 10 and l_quantity <= 20 + and p_size between 1 and 15 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ); + expected_outputs: + - stream_plan +- id: tpch_q20 + before: + - create_tables + sql: | + select + s_name, + s_address + from + supplier, + nation + where + s_suppkey in ( + select + ps_suppkey + from + partsupp, + ( + select + l_partkey agg_partkey, + l_suppkey agg_suppkey, + 0.5 * sum(l_quantity) AS agg_quantity + from + lineitem + where + l_shipdate >= date '1994-01-01' + and l_shipdate < date '1994-01-01' + interval '1' year + group by + l_partkey, + l_suppkey + ) agg_lineitem + where + agg_partkey = ps_partkey + and agg_suppkey = ps_suppkey + and ps_partkey in ( + select + p_partkey + from + part + where + p_name like 'forest%' + ) + and ps_availqty > agg_quantity + ) + and s_nationkey = n_nationkey + and n_name = 'KENYA' + order by + s_name + LIMIT 1; + expected_outputs: + - stream_plan +- id: tpch_q21 + before: + - create_tables + sql: | + select + s_name, + count(*) as numwait + from + supplier, + lineitem l1, + orders, + nation + where + s_suppkey = l1.l_suppkey + and o_orderkey = l1.l_orderkey + and o_orderstatus = 'F' + and l1.l_receiptdate > l1.l_commitdate + and exists ( + select + * + from + lineitem l2 + where + l2.l_orderkey = l1.l_orderkey + and l2.l_suppkey <> l1.l_suppkey + ) + and not exists ( + select + * + from + lineitem l3 + where + l3.l_orderkey = l1.l_orderkey + and l3.l_suppkey <> l1.l_suppkey + and l3.l_receiptdate > l3.l_commitdate + ) + and s_nationkey = n_nationkey + and n_name = 'GERMANY' + group by + s_name + order by + numwait desc, + s_name + LIMIT 100; + expected_outputs: + - stream_plan +- id: tpch_q22 + before: + - create_tables + sql: | + select + cntrycode, + count(*) as numcust, + sum(c_acctbal) as totacctbal + from + ( + select + substring(c_phone from 1 for 2) as cntrycode, + c_acctbal + from + customer + where + substring(c_phone from 1 for 2) in + ('30', '24', '31', '38', '25', '34', '37') + and c_acctbal > ( + select + avg(c_acctbal) + from + customer + where + c_acctbal > 0.00::numeric + and substring(c_phone from 1 for 2) in + ('30', '24', '31', '38', '25', '34', '37') + ) + and not exists ( + select + * + from + orders + where + o_custkey = c_custkey + ) + ) as custsale + group by + cntrycode + order by + cntrycode + LIMIT 1; + expected_outputs: + - stream_plan diff --git a/src/frontend/planner_test/tests/testdata/output/tpch_kafka.yaml b/src/frontend/planner_test/tests/testdata/output/tpch_kafka.yaml new file mode 100644 index 0000000000000..b1f9c6d5dc88c --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/tpch_kafka.yaml @@ -0,0 +1,1513 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- id: create_tables + sql: | + CREATE SOURCE supplier ( + s_suppkey INTEGER, + s_name VARCHAR, + s_address VARCHAR, + s_nationkey INTEGER, + s_phone VARCHAR, + s_acctbal NUMERIC, + s_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'supplier', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE part ( + p_partkey INTEGER, + p_name VARCHAR, + p_mfgr VARCHAR, + p_brand VARCHAR, + p_type VARCHAR, + p_size INTEGER, + p_container VARCHAR, + p_retailprice NUMERIC, + p_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'part', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE partsupp ( + ps_partkey INTEGER, + ps_suppkey INTEGER, + ps_availqty INTEGER, + ps_supplycost NUMERIC, + ps_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'partsupp', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE customer ( + c_custkey INTEGER, + c_name VARCHAR, + c_address VARCHAR, + c_nationkey INTEGER, + c_phone VARCHAR, + c_acctbal NUMERIC, + c_mktsegment VARCHAR, + c_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'customer', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE orders ( + o_orderkey BIGINT, + o_custkey INTEGER, + o_orderstatus VARCHAR, + o_totalprice NUMERIC, + o_orderdate DATE, + o_orderpriority VARCHAR, + o_clerk VARCHAR, + o_shippriority INTEGER, + o_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'orders', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE lineitem ( + l_orderkey BIGINT, + l_partkey INTEGER, + l_suppkey INTEGER, + l_linenumber INTEGER, + l_quantity NUMERIC, + l_extendedprice NUMERIC, + l_discount NUMERIC, + l_tax NUMERIC, + l_returnflag VARCHAR, + l_linestatus VARCHAR, + l_shipdate DATE, + l_commitdate DATE, + l_receiptdate DATE, + l_shipinstruct VARCHAR, + l_shipmode VARCHAR, + l_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'lineitem', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE nation ( + n_nationkey INTEGER, + n_name VARCHAR, + n_regionkey INTEGER, + n_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'nation', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; + + CREATE SOURCE region ( + r_regionkey INTEGER, + r_name VARCHAR, + r_comment VARCHAR + ) WITH ( + connector = 'kafka', + topic = 'region', + properties.bootstrap.server = 'fake', + ) FORMAT PLAIN ENCODE JSON; +- id: tpch_q1 + before: + - create_tables + sql: | + select + l_returnflag, + l_linestatus, + sum(l_quantity) as sum_qty, + sum(l_extendedprice) as sum_base_price, + sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + avg(l_quantity) as avg_qty, + avg(l_extendedprice) as avg_price, + avg(l_discount) as avg_disc, + count(*) as count_order + from + lineitem + where + l_shipdate <= date '1998-12-01' - interval '71' day + group by + l_returnflag, + l_linestatus + order by + l_returnflag, + l_linestatus + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [l_returnflag, l_linestatus, sum_qty, sum_base_price, sum_disc_price, sum_charge, avg_qty, avg_price, avg_disc, count_order], stream_key: [], pk_columns: [l_returnflag, l_linestatus], pk_conflict: NoCheck } + └─StreamProject { exprs: [l_returnflag, l_linestatus, sum(l_quantity), sum(l_extendedprice), sum($expr1), sum($expr2), $expr3, $expr4, $expr5, count] } + └─StreamTopN { order: [l_returnflag ASC, l_linestatus ASC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [l_returnflag ASC, l_linestatus ASC], limit: 1, offset: 0, group_key: [$expr6] } + └─StreamProject { exprs: [l_returnflag, l_linestatus, sum(l_quantity), sum(l_extendedprice), sum($expr1), sum($expr2), (sum(l_quantity) / count(l_quantity)::Decimal) as $expr3, (sum(l_extendedprice) / count(l_extendedprice)::Decimal) as $expr4, (sum(l_discount) / count(l_discount)::Decimal) as $expr5, count, Vnode(l_returnflag, l_linestatus) as $expr6] } + └─StreamHashAgg [append_only] { group_key: [l_returnflag, l_linestatus], aggs: [sum(l_quantity), sum(l_extendedprice), sum($expr1), sum($expr2), count(l_quantity), count(l_extendedprice), sum(l_discount), count(l_discount), count] } + └─StreamExchange { dist: HashShard(l_returnflag, l_linestatus) } + └─StreamProject { exprs: [l_returnflag, l_linestatus, l_quantity, l_extendedprice, $expr1, ($expr1 * (1:Decimal + l_tax)) as $expr2, l_discount, _row_id] } + └─StreamProject { exprs: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id, (l_extendedprice * (1:Decimal - l_discount)) as $expr1] } + └─StreamFilter { predicate: (l_shipdate <= '1998-09-21 00:00:00':Timestamp) } + └─StreamRowIdGen { row_id_index: 19 } + └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q2 + before: + - create_tables + sql: | + select + s_acctbal, + s_name, + n_name, + p_partkey, + p_mfgr, + s_address, + s_phone, + s_comment + from + part, + supplier, + partsupp, + nation, + region + where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and p_size = 4 + and p_type like '%TIN' + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'AFRICA' + and ps_supplycost = ( + select + min(ps_supplycost) + from + partsupp, + supplier, + nation, + region + where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'AFRICA' + ) + order by + s_acctbal desc, + n_name, + s_name, + p_partkey + LIMIT 100; + stream_plan: |- + StreamMaterialize { columns: [s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, _row_id(hidden), min(ps_supplycost)(hidden), ps_partkey(hidden), _row_id#1(hidden), _row_id#2(hidden), r_regionkey(hidden), _row_id#3(hidden), n_nationkey(hidden), _row_id#4(hidden), s_suppkey(hidden)], stream_key: [_row_id, p_partkey, _row_id#1, _row_id#2, r_regionkey, _row_id#3, n_nationkey, _row_id#4, s_suppkey, min(ps_supplycost), ps_partkey], pk_columns: [s_acctbal, n_name, s_name, p_partkey, _row_id, _row_id#1, _row_id#2, r_regionkey, _row_id#3, n_nationkey, _row_id#4, s_suppkey, min(ps_supplycost), ps_partkey], pk_conflict: NoCheck } + └─StreamProject { exprs: [s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, _row_id, min(ps_supplycost), ps_partkey, _row_id, _row_id, r_regionkey, _row_id, n_nationkey, _row_id, s_suppkey] } + └─StreamTopN { order: [s_acctbal DESC, n_name ASC, s_name ASC, p_partkey ASC], limit: 100, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [s_acctbal DESC, n_name ASC, s_name ASC, p_partkey ASC], limit: 100, offset: 0, group_key: [_vnode] } + └─StreamProject { exprs: [s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, _row_id, min(ps_supplycost), ps_partkey, _row_id, _row_id, r_regionkey, _row_id, n_nationkey, _row_id, s_suppkey, Vnode(ps_partkey, min(ps_supplycost)) as _vnode] } + └─StreamHashJoin { type: Inner, predicate: p_partkey = ps_partkey AND min(ps_supplycost) = ps_supplycost AND ps_partkey = ps_partkey, output: [s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, _row_id, min(ps_supplycost), ps_partkey, _row_id, _row_id, r_regionkey, _row_id, n_nationkey, _row_id, s_suppkey] } + ├─StreamExchange { dist: HashShard(ps_partkey, min(ps_supplycost)) } + │ └─StreamHashJoin { type: Inner, predicate: p_partkey = ps_partkey, output: [p_partkey, p_mfgr, ps_partkey, min(ps_supplycost), _row_id] } + │ ├─StreamExchange { dist: HashShard(p_partkey) } + │ │ └─StreamFilter { predicate: (p_size = 4:Int32) AND Like(p_type, '%TIN':Varchar) } + │ │ └─StreamRowIdGen { row_id_index: 12 } + │ │ └─StreamSourceScan { columns: [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamProject { exprs: [ps_partkey, min(ps_supplycost)] } + │ └─StreamHashAgg [append_only] { group_key: [ps_partkey], aggs: [min(ps_supplycost), count] } + │ └─StreamExchange { dist: HashShard(ps_partkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: s_nationkey = n_nationkey, output: [ps_supplycost, ps_partkey, _row_id, _row_id, ps_suppkey, s_nationkey, _row_id, _row_id, r_regionkey] } + │ ├─StreamExchange { dist: HashShard(s_nationkey) } + │ │ └─StreamHashJoin [append_only] { type: Inner, predicate: ps_suppkey = s_suppkey, output: [ps_partkey, ps_supplycost, s_nationkey, _row_id, ps_suppkey, _row_id] } + │ │ ├─StreamExchange { dist: HashShard(ps_suppkey) } + │ │ │ └─StreamShare { id: 6 } + │ │ │ └─StreamProject { exprs: [ps_partkey, ps_suppkey, ps_supplycost, _row_id] } + │ │ │ └─StreamRowIdGen { row_id_index: 8 } + │ │ │ └─StreamSourceScan { columns: [ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ │ └─StreamExchange { dist: HashShard(s_suppkey) } + │ │ └─StreamShare { id: 10 } + │ │ └─StreamProject { exprs: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _row_id] } + │ │ └─StreamRowIdGen { row_id_index: 10 } + │ │ └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(n_nationkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: r_regionkey = n_regionkey, output: [n_nationkey, _row_id, r_regionkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(r_regionkey) } + │ │ └─StreamFilter { predicate: (r_name = 'AFRICA':Varchar) } + │ │ └─StreamShare { id: 17 } + │ │ └─StreamProject { exprs: [r_regionkey, r_name, _row_id] } + │ │ └─StreamFilter { predicate: (r_name = 'AFRICA':Varchar) } + │ │ └─StreamRowIdGen { row_id_index: 6 } + │ │ └─StreamSourceScan { columns: [r_regionkey, r_name, r_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(n_regionkey) } + │ └─StreamShare { id: 22 } + │ └─StreamProject { exprs: [n_nationkey, n_name, n_regionkey, _row_id] } + │ └─StreamRowIdGen { row_id_index: 7 } + │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(ps_partkey, ps_supplycost) } + └─StreamHashJoin [append_only] { type: Inner, predicate: s_suppkey = ps_suppkey, output: [n_name, s_name, s_address, s_phone, s_acctbal, s_comment, ps_partkey, ps_supplycost, _row_id, _row_id, r_regionkey, _row_id, n_nationkey, s_suppkey, _row_id] } + ├─StreamExchange { dist: HashShard(s_suppkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: n_nationkey = s_nationkey, output: [n_name, s_suppkey, s_name, s_address, s_phone, s_acctbal, s_comment, _row_id, _row_id, r_regionkey, n_nationkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(n_nationkey) } + │ │ └─StreamHashJoin [append_only] { type: Inner, predicate: r_regionkey = n_regionkey, output: [n_nationkey, n_name, _row_id, r_regionkey, _row_id] } + │ │ ├─StreamExchange { dist: HashShard(r_regionkey) } + │ │ │ └─StreamFilter { predicate: (r_name = 'AFRICA':Varchar) } + │ │ │ └─StreamShare { id: 17 } + │ │ │ └─StreamProject { exprs: [r_regionkey, r_name, _row_id] } + │ │ │ └─StreamFilter { predicate: (r_name = 'AFRICA':Varchar) } + │ │ │ └─StreamRowIdGen { row_id_index: 6 } + │ │ │ └─StreamSourceScan { columns: [r_regionkey, r_name, r_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ │ └─StreamExchange { dist: HashShard(n_regionkey) } + │ │ └─StreamShare { id: 22 } + │ │ └─StreamProject { exprs: [n_nationkey, n_name, n_regionkey, _row_id] } + │ │ └─StreamRowIdGen { row_id_index: 7 } + │ │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(s_nationkey) } + │ └─StreamShare { id: 10 } + │ └─StreamProject { exprs: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _row_id] } + │ └─StreamRowIdGen { row_id_index: 10 } + │ └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(ps_suppkey) } + └─StreamFilter { predicate: IsNotNull(ps_partkey) } + └─StreamShare { id: 6 } + └─StreamProject { exprs: [ps_partkey, ps_suppkey, ps_supplycost, _row_id] } + └─StreamRowIdGen { row_id_index: 8 } + └─StreamSourceScan { columns: [ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q3 + before: + - create_tables + sql: | + select + l_orderkey, + sum(l_extendedprice * (1 - l_discount)) as revenue, + o_orderdate, + o_shippriority + from + customer, + orders, + lineitem + where + c_mktsegment = 'FURNITURE' + and c_custkey = o_custkey + and l_orderkey = o_orderkey + and o_orderdate < date '1995-03-29' + and l_shipdate > date '1995-03-29' + group by + l_orderkey, + o_orderdate, + o_shippriority + order by + revenue desc, + o_orderdate + LIMIT 10; + stream_plan: |- + StreamMaterialize { columns: [l_orderkey, revenue, o_orderdate, o_shippriority], stream_key: [l_orderkey, o_orderdate, o_shippriority], pk_columns: [revenue, o_orderdate, l_orderkey, o_shippriority], pk_conflict: NoCheck } + └─StreamProject { exprs: [l_orderkey, sum($expr1), o_orderdate, o_shippriority] } + └─StreamTopN { order: [sum($expr1) DESC, o_orderdate ASC], limit: 10, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [sum($expr1) DESC, o_orderdate ASC], limit: 10, offset: 0, group_key: [$expr2] } + └─StreamProject { exprs: [l_orderkey, sum($expr1), o_orderdate, o_shippriority, Vnode(l_orderkey, o_orderdate, o_shippriority) as $expr2] } + └─StreamHashAgg [append_only] { group_key: [l_orderkey, o_orderdate, o_shippriority], aggs: [sum($expr1), count] } + └─StreamExchange { dist: HashShard(l_orderkey, o_orderdate, o_shippriority) } + └─StreamProject { exprs: [l_orderkey, o_orderdate, o_shippriority, (l_extendedprice * (1:Decimal - l_discount)) as $expr1, _row_id, _row_id, c_custkey, _row_id, o_orderkey] } + └─StreamHashJoin [append_only] { type: Inner, predicate: o_orderkey = l_orderkey, output: [o_orderdate, o_shippriority, l_orderkey, l_extendedprice, l_discount, _row_id, _row_id, c_custkey, o_orderkey, _row_id] } + ├─StreamExchange { dist: HashShard(o_orderkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: c_custkey = o_custkey, output: [o_orderkey, o_orderdate, o_shippriority, _row_id, c_custkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(c_custkey) } + │ │ └─StreamFilter { predicate: (c_mktsegment = 'FURNITURE':Varchar) } + │ │ └─StreamRowIdGen { row_id_index: 11 } + │ │ └─StreamSourceScan { columns: [c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(o_custkey) } + │ └─StreamFilter { predicate: (o_orderdate < '1995-03-29':Date) } + │ └─StreamRowIdGen { row_id_index: 12 } + │ └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(l_orderkey) } + └─StreamFilter { predicate: (l_shipdate > '1995-03-29':Date) } + └─StreamRowIdGen { row_id_index: 19 } + └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q4 + before: + - create_tables + sql: | + select + o_orderpriority, + count(*) as order_count + from + orders + where + o_orderdate >= date '1997-07-01' + and o_orderdate < date '1997-07-01' + interval '3' month + and exists ( + select + * + from + lineitem + where + l_orderkey = o_orderkey + and l_commitdate < l_receiptdate + ) + group by + o_orderpriority + order by + o_orderpriority + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [o_orderpriority, order_count], stream_key: [], pk_columns: [o_orderpriority], pk_conflict: NoCheck } + └─StreamProject { exprs: [o_orderpriority, count] } + └─StreamTopN { order: [o_orderpriority ASC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [o_orderpriority ASC], limit: 1, offset: 0, group_key: [_vnode] } + └─StreamProject { exprs: [o_orderpriority, count, Vnode(o_orderpriority) as _vnode] } + └─StreamHashAgg { group_key: [o_orderpriority], aggs: [count] } + └─StreamExchange { dist: HashShard(o_orderpriority) } + └─StreamHashJoin { type: LeftSemi, predicate: o_orderkey = l_orderkey, output: [o_orderpriority, _row_id, o_orderkey] } + ├─StreamExchange { dist: HashShard(o_orderkey) } + │ └─StreamFilter { predicate: (o_orderdate >= '1997-07-01':Date) AND (o_orderdate < '1997-10-01 00:00:00':Timestamp) } + │ └─StreamRowIdGen { row_id_index: 12 } + │ └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(l_orderkey) } + └─StreamProject { exprs: [l_orderkey, _row_id] } + └─StreamFilter { predicate: (l_commitdate < l_receiptdate) } + └─StreamRowIdGen { row_id_index: 19 } + └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q5 + before: + - create_tables + sql: | + select + n_name, + sum(l_extendedprice * (1 - l_discount)) as revenue + from + customer, + orders, + lineitem, + supplier, + nation, + region + where + c_custkey = o_custkey + and l_orderkey = o_orderkey + and l_suppkey = s_suppkey + and c_nationkey = s_nationkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'MIDDLE EAST' + and o_orderdate >= date '1994-01-01' + and o_orderdate < date '1994-01-01' + interval '1' year + group by + n_name + order by + revenue desc + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [n_name, revenue], stream_key: [], pk_columns: [revenue], pk_conflict: NoCheck } + └─StreamProject { exprs: [n_name, sum($expr1)] } + └─StreamTopN { order: [sum($expr1) DESC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [sum($expr1) DESC], limit: 1, offset: 0, group_key: [$expr2] } + └─StreamProject { exprs: [n_name, sum($expr1), Vnode(n_name) as $expr2] } + └─StreamHashAgg [append_only] { group_key: [n_name], aggs: [sum($expr1), count] } + └─StreamExchange { dist: HashShard(n_name) } + └─StreamProject { exprs: [n_name, (l_extendedprice * (1:Decimal - l_discount)) as $expr1, _row_id, _row_id, r_regionkey, _row_id, _row_id, o_custkey, _row_id, _row_id, l_suppkey, o_orderkey, n_nationkey] } + └─StreamHashJoin [append_only] { type: Inner, predicate: n_nationkey = s_nationkey AND n_nationkey = c_nationkey, output: [l_extendedprice, l_discount, n_name, _row_id, _row_id, r_regionkey, n_nationkey, _row_id, _row_id, o_custkey, _row_id, _row_id, l_suppkey, o_orderkey, c_nationkey] } + ├─StreamExchange { dist: HashShard(n_nationkey, n_nationkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: r_regionkey = n_regionkey, output: [n_nationkey, n_name, _row_id, r_regionkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(r_regionkey) } + │ │ └─StreamFilter { predicate: (r_name = 'MIDDLE EAST':Varchar) } + │ │ └─StreamRowIdGen { row_id_index: 6 } + │ │ └─StreamSourceScan { columns: [r_regionkey, r_name, r_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(n_regionkey) } + │ └─StreamFilter { predicate: IsNotNull(n_nationkey) } + │ └─StreamRowIdGen { row_id_index: 7 } + │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(c_nationkey, s_nationkey) } + └─StreamHashJoin [append_only] { type: Inner, predicate: o_orderkey = l_orderkey AND c_nationkey = s_nationkey, output: [c_nationkey, l_extendedprice, l_discount, s_nationkey, _row_id, _row_id, o_custkey, o_orderkey, _row_id, _row_id, l_suppkey] } + ├─StreamExchange { dist: HashShard(o_orderkey, c_nationkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: o_custkey = c_custkey, output: [o_orderkey, c_nationkey, _row_id, o_custkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(o_custkey) } + │ │ └─StreamFilter { predicate: (o_orderdate >= '1994-01-01':Date) AND (o_orderdate < '1995-01-01 00:00:00':Timestamp) } + │ │ └─StreamRowIdGen { row_id_index: 12 } + │ │ └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(c_custkey) } + │ └─StreamRowIdGen { row_id_index: 11 } + │ └─StreamSourceScan { columns: [c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(l_orderkey, s_nationkey) } + └─StreamHashJoin [append_only] { type: Inner, predicate: l_suppkey = s_suppkey, output: [l_orderkey, l_extendedprice, l_discount, s_nationkey, _row_id, l_suppkey, _row_id] } + ├─StreamExchange { dist: HashShard(l_suppkey) } + │ └─StreamRowIdGen { row_id_index: 19 } + │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(s_suppkey) } + └─StreamRowIdGen { row_id_index: 10 } + └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q6 + before: + - create_tables + sql: | + select + sum(l_extendedprice * l_discount) as revenue + from + lineitem + where + l_shipdate >= date '1994-01-01' + and l_shipdate < date '1994-01-01' + interval '1' year + and l_discount between 0.08 - 0.01 and 0.08 + 0.01 + and l_quantity < 24; + stream_plan: |- + StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } + └─StreamProject { exprs: [sum(sum($expr1))] } + └─StreamSimpleAgg [append_only] { aggs: [sum(sum($expr1)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum($expr1)] } + └─StreamProject { exprs: [(l_extendedprice * l_discount) as $expr1, _row_id] } + └─StreamFilter { predicate: (l_shipdate >= '1994-01-01':Date) AND (l_shipdate < '1995-01-01 00:00:00':Timestamp) AND (l_discount >= 0.07:Decimal) AND (l_discount <= 0.09:Decimal) AND (l_quantity < 24:Decimal) } + └─StreamRowIdGen { row_id_index: 19 } + └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q7 + before: + - create_tables + sql: | + select + supp_nation, + cust_nation, + l_year, + sum(volume) as revenue + from + ( + select + n1.n_name as supp_nation, + n2.n_name as cust_nation, + extract(year from l_shipdate) as l_year, + l_extendedprice * (1 - l_discount) as volume + from + supplier, + lineitem, + orders, + customer, + nation n1, + nation n2 + where + s_suppkey = l_suppkey + and o_orderkey = l_orderkey + and c_custkey = o_custkey + and s_nationkey = n1.n_nationkey + and c_nationkey = n2.n_nationkey + and ( + (n1.n_name = 'ROMANIA' and n2.n_name = 'IRAN') + or (n1.n_name = 'IRAN' and n2.n_name = 'ROMANIA') + ) + and l_shipdate between date '1983-01-01' and date '2000-12-31' + ) as shipping + group by + supp_nation, + cust_nation, + l_year + order by + supp_nation, + cust_nation, + l_year + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [supp_nation, cust_nation, l_year, revenue], stream_key: [], pk_columns: [supp_nation, cust_nation, l_year], pk_conflict: NoCheck } + └─StreamProject { exprs: [n_name, n_name, $expr1, sum($expr2)] } + └─StreamTopN { order: [n_name ASC, n_name ASC, $expr1 ASC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [n_name ASC, n_name ASC, $expr1 ASC], limit: 1, offset: 0, group_key: [$expr3] } + └─StreamProject { exprs: [n_name, n_name, $expr1, sum($expr2), Vnode(n_name, n_name, $expr1) as $expr3] } + └─StreamHashAgg [append_only] { group_key: [n_name, n_name, $expr1], aggs: [sum($expr2), count] } + └─StreamExchange { dist: HashShard(n_name, n_name, $expr1) } + └─StreamProject { exprs: [n_name, n_name, Extract('YEAR':Varchar, l_shipdate) as $expr1, (l_extendedprice * (1:Decimal - l_discount)) as $expr2, _row_id, _row_id, n_nationkey, _row_id, s_suppkey, _row_id, _row_id, n_nationkey, _row_id, c_custkey, l_orderkey] } + └─StreamFilter { predicate: (((n_name = 'ROMANIA':Varchar) AND (n_name = 'IRAN':Varchar)) OR ((n_name = 'IRAN':Varchar) AND (n_name = 'ROMANIA':Varchar))) } + └─StreamHashJoin [append_only] { type: Inner, predicate: l_orderkey = o_orderkey, output: all } + ├─StreamExchange { dist: HashShard(l_orderkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: s_suppkey = l_suppkey, output: [n_name, l_orderkey, l_extendedprice, l_discount, l_shipdate, _row_id, _row_id, n_nationkey, s_suppkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(s_suppkey) } + │ │ └─StreamHashJoin [append_only] { type: Inner, predicate: n_nationkey = s_nationkey, output: [n_name, s_suppkey, _row_id, n_nationkey, _row_id] } + │ │ ├─StreamExchange { dist: HashShard(n_nationkey) } + │ │ │ └─StreamShare { id: 3 } + │ │ │ └─StreamProject { exprs: [n_nationkey, n_name, _row_id] } + │ │ │ └─StreamRowIdGen { row_id_index: 7 } + │ │ │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ │ └─StreamExchange { dist: HashShard(s_nationkey) } + │ │ └─StreamRowIdGen { row_id_index: 10 } + │ │ └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(l_suppkey) } + │ └─StreamFilter { predicate: (l_shipdate >= '1983-01-01':Date) AND (l_shipdate <= '2000-12-31':Date) } + │ └─StreamRowIdGen { row_id_index: 19 } + │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(o_orderkey) } + └─StreamHashJoin [append_only] { type: Inner, predicate: c_custkey = o_custkey, output: [n_name, o_orderkey, _row_id, _row_id, n_nationkey, c_custkey, _row_id] } + ├─StreamExchange { dist: HashShard(c_custkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: n_nationkey = c_nationkey, output: [n_name, c_custkey, _row_id, n_nationkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(n_nationkey) } + │ │ └─StreamShare { id: 3 } + │ │ └─StreamProject { exprs: [n_nationkey, n_name, _row_id] } + │ │ └─StreamRowIdGen { row_id_index: 7 } + │ │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(c_nationkey) } + │ └─StreamRowIdGen { row_id_index: 11 } + │ └─StreamSourceScan { columns: [c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(o_custkey) } + └─StreamRowIdGen { row_id_index: 12 } + └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q8 + before: + - create_tables + sql: | + select + o_year, + sum(case + when nation = 'IRAN' then volume + else 0 + end) / sum(volume) as mkt_share + from + ( + select + extract(year from o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) as volume, + n2.n_name as nation + from + part, + supplier, + lineitem, + orders, + customer, + nation n1, + nation n2, + region + where + p_partkey = l_partkey + and s_suppkey = l_suppkey + and l_orderkey = o_orderkey + and o_custkey = c_custkey + and c_nationkey = n1.n_nationkey + and n1.n_regionkey = r_regionkey + and r_name = 'ASIA' + and s_nationkey = n2.n_nationkey + and o_orderdate between date '1995-01-01' and date '1996-12-31' + and p_type = 'PROMO ANODIZED STEEL' + ) as all_nations + group by + o_year + order by + o_year + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [o_year, mkt_share], stream_key: [], pk_columns: [o_year], pk_conflict: NoCheck } + └─StreamProject { exprs: [$expr1, $expr4] } + └─StreamTopN { order: [$expr1 ASC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [$expr1 ASC], limit: 1, offset: 0, group_key: [$expr5] } + └─StreamProject { exprs: [$expr1, (sum($expr3) / sum($expr2)) as $expr4, Vnode($expr1) as $expr5] } + └─StreamHashAgg [append_only] { group_key: [$expr1], aggs: [sum($expr3), sum($expr2), count] } + └─StreamExchange { dist: HashShard($expr1) } + └─StreamProject { exprs: [$expr1, Case((n_name = 'IRAN':Varchar), $expr2, 0:Decimal) as $expr3, $expr2, _row_id, _row_id, r_regionkey, _row_id, n_nationkey, _row_id, _row_id, n_nationkey, _row_id, _row_id, p_partkey, s_suppkey, _row_id, l_orderkey, c_custkey] } + └─StreamProject { exprs: [Extract('YEAR':Varchar, o_orderdate) as $expr1, (l_extendedprice * (1:Decimal - l_discount)) as $expr2, n_name, _row_id, _row_id, r_regionkey, _row_id, n_nationkey, _row_id, _row_id, n_nationkey, _row_id, _row_id, p_partkey, s_suppkey, _row_id, l_orderkey, c_custkey] } + └─StreamHashJoin [append_only] { type: Inner, predicate: c_custkey = o_custkey, output: [l_extendedprice, l_discount, o_orderdate, n_name, _row_id, _row_id, r_regionkey, _row_id, n_nationkey, c_custkey, _row_id, _row_id, n_nationkey, _row_id, _row_id, p_partkey, s_suppkey, _row_id, l_orderkey] } + ├─StreamExchange { dist: HashShard(c_custkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: n_nationkey = c_nationkey, output: [c_custkey, _row_id, _row_id, r_regionkey, n_nationkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(n_nationkey) } + │ │ └─StreamHashJoin [append_only] { type: Inner, predicate: r_regionkey = n_regionkey, output: [n_nationkey, _row_id, r_regionkey, _row_id] } + │ │ ├─StreamExchange { dist: HashShard(r_regionkey) } + │ │ │ └─StreamFilter { predicate: (r_name = 'ASIA':Varchar) } + │ │ │ └─StreamRowIdGen { row_id_index: 6 } + │ │ │ └─StreamSourceScan { columns: [r_regionkey, r_name, r_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ │ └─StreamExchange { dist: HashShard(n_regionkey) } + │ │ └─StreamShare { id: 6 } + │ │ └─StreamProject { exprs: [n_nationkey, n_name, n_regionkey, _row_id] } + │ │ └─StreamRowIdGen { row_id_index: 7 } + │ │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(c_nationkey) } + │ └─StreamRowIdGen { row_id_index: 11 } + │ └─StreamSourceScan { columns: [c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(o_custkey) } + └─StreamHashJoin [append_only] { type: Inner, predicate: l_orderkey = o_orderkey, output: [n_name, l_extendedprice, l_discount, o_custkey, o_orderdate, _row_id, _row_id, n_nationkey, _row_id, _row_id, p_partkey, s_suppkey, l_orderkey, _row_id] } + ├─StreamExchange { dist: HashShard(l_orderkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: s_suppkey = l_suppkey, output: [n_name, l_orderkey, l_extendedprice, l_discount, _row_id, _row_id, n_nationkey, s_suppkey, _row_id, _row_id, p_partkey] } + │ ├─StreamExchange { dist: HashShard(s_suppkey) } + │ │ └─StreamHashJoin [append_only] { type: Inner, predicate: n_nationkey = s_nationkey, output: [n_name, s_suppkey, _row_id, n_nationkey, _row_id] } + │ │ ├─StreamExchange { dist: HashShard(n_nationkey) } + │ │ │ └─StreamShare { id: 6 } + │ │ │ └─StreamProject { exprs: [n_nationkey, n_name, n_regionkey, _row_id] } + │ │ │ └─StreamRowIdGen { row_id_index: 7 } + │ │ │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ │ └─StreamExchange { dist: HashShard(s_nationkey) } + │ │ └─StreamRowIdGen { row_id_index: 10 } + │ │ └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(l_suppkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: p_partkey = l_partkey, output: [l_orderkey, l_suppkey, l_extendedprice, l_discount, _row_id, p_partkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(p_partkey) } + │ │ └─StreamFilter { predicate: (p_type = 'PROMO ANODIZED STEEL':Varchar) } + │ │ └─StreamRowIdGen { row_id_index: 12 } + │ │ └─StreamSourceScan { columns: [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(l_partkey) } + │ └─StreamRowIdGen { row_id_index: 19 } + │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(o_orderkey) } + └─StreamFilter { predicate: (o_orderdate >= '1995-01-01':Date) AND (o_orderdate <= '1996-12-31':Date) } + └─StreamRowIdGen { row_id_index: 12 } + └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q9 + before: + - create_tables + sql: | + select + nation, + o_year, + sum(amount) as sum_profit + from + ( + select + n_name as nation, + extract(year from o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount + from + part, + supplier, + lineitem, + partsupp, + orders, + nation + where + s_suppkey = l_suppkey + and ps_suppkey = l_suppkey + and ps_partkey = l_partkey + and p_partkey = l_partkey + and o_orderkey = l_orderkey + and s_nationkey = n_nationkey + and p_name like '%yellow%' + ) as profit + group by + nation, + o_year + order by + nation, + o_year desc + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [nation, o_year, sum_profit], stream_key: [], pk_columns: [nation, o_year], pk_conflict: NoCheck } + └─StreamProject { exprs: [n_name, $expr1, sum($expr2)] } + └─StreamTopN { order: [n_name ASC, $expr1 DESC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [n_name ASC, $expr1 DESC], limit: 1, offset: 0, group_key: [$expr3] } + └─StreamProject { exprs: [n_name, $expr1, sum($expr2), Vnode(n_name, $expr1) as $expr3] } + └─StreamHashAgg [append_only] { group_key: [n_name, $expr1], aggs: [sum($expr2), count] } + └─StreamExchange { dist: HashShard(n_name, $expr1) } + └─StreamProject { exprs: [n_name, Extract('YEAR':Varchar, o_orderdate) as $expr1, ((l_extendedprice * (1:Decimal - l_discount)) - (ps_supplycost * l_quantity)) as $expr2, _row_id, _row_id, p_partkey, _row_id, _row_id, n_nationkey, _row_id, _row_id, o_orderkey, ps_suppkey, ps_partkey] } + └─StreamHashJoin [append_only] { type: Inner, predicate: p_partkey = l_partkey AND ps_suppkey = l_suppkey AND ps_partkey = l_partkey AND ps_suppkey = s_suppkey, output: [l_quantity, l_extendedprice, l_discount, ps_supplycost, o_orderdate, n_name, _row_id, _row_id, p_partkey, ps_suppkey, ps_partkey, _row_id, _row_id, n_nationkey, _row_id, _row_id, o_orderkey, s_suppkey] } + ├─StreamExchange { dist: HashShard(ps_suppkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: p_partkey = ps_partkey, output: [p_partkey, ps_partkey, ps_suppkey, ps_supplycost, _row_id, _row_id] } + │ ├─StreamExchange { dist: HashShard(p_partkey) } + │ │ └─StreamFilter { predicate: Like(p_name, '%yellow%':Varchar) } + │ │ └─StreamRowIdGen { row_id_index: 12 } + │ │ └─StreamSourceScan { columns: [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(ps_partkey) } + │ └─StreamFilter { predicate: IsNotNull(ps_suppkey) } + │ └─StreamRowIdGen { row_id_index: 8 } + │ └─StreamSourceScan { columns: [ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamHashJoin [append_only] { type: Inner, predicate: s_suppkey = l_suppkey, output: [n_name, s_suppkey, o_orderdate, l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount, _row_id, _row_id, n_nationkey, _row_id, _row_id, o_orderkey] } + ├─StreamExchange { dist: HashShard(s_suppkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: n_nationkey = s_nationkey, output: [n_name, s_suppkey, _row_id, n_nationkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(n_nationkey) } + │ │ └─StreamRowIdGen { row_id_index: 7 } + │ │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(s_nationkey) } + │ └─StreamRowIdGen { row_id_index: 10 } + │ └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(l_suppkey) } + └─StreamHashJoin [append_only] { type: Inner, predicate: o_orderkey = l_orderkey, output: [o_orderdate, l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount, _row_id, o_orderkey, _row_id] } + ├─StreamExchange { dist: HashShard(o_orderkey) } + │ └─StreamRowIdGen { row_id_index: 12 } + │ └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(l_orderkey) } + └─StreamFilter { predicate: IsNotNull(l_partkey) } + └─StreamRowIdGen { row_id_index: 19 } + └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q10 + before: + - create_tables + sql: | + select + c_custkey, + c_name, + sum(l_extendedprice * (1.00 - l_discount)) as revenue, + c_acctbal, + n_name, + c_address, + c_phone, + c_comment + from + customer, + orders, + lineitem, + nation + where + c_custkey = o_custkey + and l_orderkey = o_orderkey + and o_orderdate >= date '1994-01-01' + and o_orderdate < date '1994-01-01' + interval '3' month + and l_returnflag = 'R' + and c_nationkey = n_nationkey + group by + c_custkey, + c_name, + c_acctbal, + c_phone, + n_name, + c_address, + c_comment + order by + revenue desc + LIMIT 20; + stream_plan: |- + StreamMaterialize { columns: [c_custkey, c_name, revenue, c_acctbal, n_name, c_address, c_phone, c_comment], stream_key: [c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment], pk_columns: [revenue, c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment], pk_conflict: NoCheck } + └─StreamProject { exprs: [c_custkey, c_name, sum($expr1), c_acctbal, n_name, c_address, c_phone, c_comment] } + └─StreamTopN { order: [sum($expr1) DESC], limit: 20, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [sum($expr1) DESC], limit: 20, offset: 0, group_key: [$expr2] } + └─StreamProject { exprs: [c_custkey, c_name, sum($expr1), c_acctbal, n_name, c_address, c_phone, c_comment, Vnode(c_custkey) as $expr2] } + └─StreamHashAgg [append_only] { group_key: [c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment], aggs: [sum($expr1), count] } + └─StreamProject { exprs: [c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment, (l_extendedprice * (1.00:Decimal - l_discount)) as $expr1, _row_id, _row_id, n_nationkey, _row_id, _row_id, l_orderkey] } + └─StreamHashJoin [append_only] { type: Inner, predicate: c_custkey = o_custkey, output: [c_custkey, c_name, c_address, c_phone, c_acctbal, c_comment, l_extendedprice, l_discount, n_name, _row_id, _row_id, n_nationkey, _row_id, _row_id, l_orderkey] } + ├─StreamExchange { dist: HashShard(c_custkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: n_nationkey = c_nationkey, output: [n_name, c_custkey, c_name, c_address, c_phone, c_acctbal, c_comment, _row_id, n_nationkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(n_nationkey) } + │ │ └─StreamRowIdGen { row_id_index: 7 } + │ │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(c_nationkey) } + │ └─StreamRowIdGen { row_id_index: 11 } + │ └─StreamSourceScan { columns: [c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(o_custkey) } + └─StreamHashJoin [append_only] { type: Inner, predicate: l_orderkey = o_orderkey, output: [l_extendedprice, l_discount, o_custkey, _row_id, l_orderkey, _row_id] } + ├─StreamExchange { dist: HashShard(l_orderkey) } + │ └─StreamFilter { predicate: (l_returnflag = 'R':Varchar) } + │ └─StreamRowIdGen { row_id_index: 19 } + │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(o_orderkey) } + └─StreamFilter { predicate: (o_orderdate >= '1994-01-01':Date) AND (o_orderdate < '1994-04-01 00:00:00':Timestamp) } + └─StreamRowIdGen { row_id_index: 12 } + └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q11 + before: + - create_tables + sql: | + select + ps_partkey, + sum(ps_supplycost * ps_availqty) as value + from + partsupp, + supplier, + nation + where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'ARGENTINA' + group by + ps_partkey + having + sum(ps_supplycost * ps_availqty) > ( + select + sum(ps_supplycost * ps_availqty) * 0.0001000000 + from + partsupp, + supplier, + nation + where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'ARGENTINA' + ) + order by + value desc + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [ps_partkey, value], stream_key: [], pk_columns: [value], pk_conflict: NoCheck } + └─StreamProject { exprs: [ps_partkey, sum($expr1)] } + └─StreamTopN { order: [sum($expr1) DESC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [sum($expr1) DESC], limit: 1, offset: 0, group_key: [_vnode] } + └─StreamProject { exprs: [ps_partkey, sum($expr1), Vnode(ps_partkey) as _vnode] } + └─StreamDynamicFilter { predicate: (sum($expr1) > $expr3), output: [ps_partkey, sum($expr1)] } + ├─StreamProject { exprs: [ps_partkey, sum($expr1)] } + │ └─StreamHashAgg [append_only] { group_key: [ps_partkey], aggs: [sum($expr1), count] } + │ └─StreamExchange { dist: HashShard(ps_partkey) } + │ └─StreamProject { exprs: [ps_partkey, (ps_supplycost * ps_availqty::Decimal) as $expr1, _row_id, _row_id, ps_suppkey, _row_id, s_nationkey] } + │ └─StreamShare { id: 11 } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: s_nationkey = n_nationkey, output: [ps_partkey, ps_availqty, ps_supplycost, _row_id, _row_id, ps_suppkey, s_nationkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(s_nationkey) } + │ │ └─StreamHashJoin [append_only] { type: Inner, predicate: ps_suppkey = s_suppkey, output: [ps_partkey, ps_availqty, ps_supplycost, s_nationkey, _row_id, ps_suppkey, _row_id] } + │ │ ├─StreamExchange { dist: HashShard(ps_suppkey) } + │ │ │ └─StreamRowIdGen { row_id_index: 8 } + │ │ │ └─StreamSourceScan { columns: [ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ │ └─StreamExchange { dist: HashShard(s_suppkey) } + │ │ └─StreamRowIdGen { row_id_index: 10 } + │ │ └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(n_nationkey) } + │ └─StreamFilter { predicate: (n_name = 'ARGENTINA':Varchar) } + │ └─StreamRowIdGen { row_id_index: 7 } + │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: Broadcast } + └─StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3] } + └─StreamSimpleAgg [append_only] { aggs: [sum(sum($expr2)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum($expr2)] } + └─StreamProject { exprs: [(ps_supplycost * ps_availqty::Decimal) as $expr2, _row_id, _row_id, ps_suppkey, _row_id, s_nationkey] } + └─StreamShare { id: 11 } + └─StreamHashJoin [append_only] { type: Inner, predicate: s_nationkey = n_nationkey, output: [ps_partkey, ps_availqty, ps_supplycost, _row_id, _row_id, ps_suppkey, s_nationkey, _row_id] } + ├─StreamExchange { dist: HashShard(s_nationkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: ps_suppkey = s_suppkey, output: [ps_partkey, ps_availqty, ps_supplycost, s_nationkey, _row_id, ps_suppkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(ps_suppkey) } + │ │ └─StreamRowIdGen { row_id_index: 8 } + │ │ └─StreamSourceScan { columns: [ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(s_suppkey) } + │ └─StreamRowIdGen { row_id_index: 10 } + │ └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(n_nationkey) } + └─StreamFilter { predicate: (n_name = 'ARGENTINA':Varchar) } + └─StreamRowIdGen { row_id_index: 7 } + └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q12 + before: + - create_tables + sql: | + select + l_shipmode, + sum(case + when o_orderpriority = '1-URGENT' + or o_orderpriority = '2-HIGH' + then 1 + else 0 + end) as high_line_count, + sum(case + when o_orderpriority <> '1-URGENT' + and o_orderpriority <> '2-HIGH' + then 1 + else 0 + end) as low_line_count + from + orders, + lineitem + where + o_orderkey = l_orderkey + and l_shipmode in ('FOB', 'SHIP') + and l_commitdate < l_receiptdate + and l_shipdate < l_commitdate + and l_receiptdate >= date '1994-01-01' + and l_receiptdate < date '1994-01-01' + interval '1' year + group by + l_shipmode + order by + l_shipmode + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [l_shipmode, high_line_count, low_line_count], stream_key: [], pk_columns: [l_shipmode], pk_conflict: NoCheck } + └─StreamProject { exprs: [l_shipmode, sum($expr1), sum($expr2)] } + └─StreamTopN { order: [l_shipmode ASC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [l_shipmode ASC], limit: 1, offset: 0, group_key: [$expr3] } + └─StreamProject { exprs: [l_shipmode, sum($expr1), sum($expr2), Vnode(l_shipmode) as $expr3] } + └─StreamHashAgg [append_only] { group_key: [l_shipmode], aggs: [sum($expr1), sum($expr2), count] } + └─StreamExchange { dist: HashShard(l_shipmode) } + └─StreamProject { exprs: [l_shipmode, Case(((o_orderpriority = '1-URGENT':Varchar) OR (o_orderpriority = '2-HIGH':Varchar)), 1:Int32, 0:Int32) as $expr1, Case(((o_orderpriority <> '1-URGENT':Varchar) AND (o_orderpriority <> '2-HIGH':Varchar)), 1:Int32, 0:Int32) as $expr2, _row_id, _row_id, o_orderkey] } + └─StreamHashJoin [append_only] { type: Inner, predicate: o_orderkey = l_orderkey, output: [o_orderpriority, l_shipmode, _row_id, o_orderkey, _row_id] } + ├─StreamExchange { dist: HashShard(o_orderkey) } + │ └─StreamRowIdGen { row_id_index: 12 } + │ └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(l_orderkey) } + └─StreamFilter { predicate: In(l_shipmode, 'FOB':Varchar, 'SHIP':Varchar) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1994-01-01':Date) AND (l_receiptdate < '1995-01-01 00:00:00':Timestamp) } + └─StreamRowIdGen { row_id_index: 19 } + └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q13 + before: + - create_tables + sql: | + select + c_count, + count(*) as custdist + from + ( + select + c_custkey, + count(o_orderkey) as c_count + from + customer left outer join orders on + c_custkey = o_custkey + and o_comment not like '%:1%:2%' + group by + c_custkey + ) as c_orders (c_custkey, c_count) + group by + c_count + order by + custdist desc, + c_count desc + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [c_count, custdist], stream_key: [], pk_columns: [custdist, c_count], pk_conflict: NoCheck } + └─StreamProject { exprs: [count(o_orderkey), count] } + └─StreamTopN { order: [count DESC, count(o_orderkey) DESC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [count DESC, count(o_orderkey) DESC], limit: 1, offset: 0, group_key: [_vnode] } + └─StreamProject { exprs: [count(o_orderkey), count, Vnode(count(o_orderkey)) as _vnode] } + └─StreamHashAgg { group_key: [count(o_orderkey)], aggs: [count] } + └─StreamExchange { dist: HashShard(count(o_orderkey)) } + └─StreamProject { exprs: [c_custkey, count(o_orderkey)] } + └─StreamHashAgg { group_key: [c_custkey], aggs: [count(o_orderkey), count] } + └─StreamHashJoin { type: LeftOuter, predicate: c_custkey = o_custkey, output: [c_custkey, o_orderkey, _row_id, _row_id] } + ├─StreamExchange { dist: HashShard(c_custkey) } + │ └─StreamRowIdGen { row_id_index: 11 } + │ └─StreamSourceScan { columns: [c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(o_custkey) } + └─StreamProject { exprs: [o_orderkey, o_custkey, _row_id] } + └─StreamFilter { predicate: Not(Like(o_comment, '%:1%:2%':Varchar)) } + └─StreamRowIdGen { row_id_index: 12 } + └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q14 + before: + - create_tables + sql: | + select + 100.00 * sum(case + when p_type like 'PROMO%' + then l_extendedprice * (1 - l_discount) + else 0 + end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue + from + lineitem, + part + where + l_partkey = p_partkey + and l_shipdate >= date '1995-09-01' + and l_shipdate < date '1995-09-01' + interval '1' month; + stream_plan: |- + StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } + └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3] } + └─StreamSimpleAgg [append_only] { aggs: [sum(sum($expr1)), sum(sum($expr2)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum($expr2)] } + └─StreamProject { exprs: [Case(Like(p_type, 'PROMO%':Varchar), (l_extendedprice * (1:Decimal - l_discount)), 0:Decimal) as $expr1, (l_extendedprice * (1:Decimal - l_discount)) as $expr2, _row_id, _row_id, l_partkey] } + └─StreamHashJoin [append_only] { type: Inner, predicate: l_partkey = p_partkey, output: [l_extendedprice, l_discount, p_type, _row_id, l_partkey, _row_id] } + ├─StreamExchange { dist: HashShard(l_partkey) } + │ └─StreamFilter { predicate: (l_shipdate >= '1995-09-01':Date) AND (l_shipdate < '1995-10-01 00:00:00':Timestamp) } + │ └─StreamRowIdGen { row_id_index: 19 } + │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(p_partkey) } + └─StreamRowIdGen { row_id_index: 12 } + └─StreamSourceScan { columns: [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q15 + before: + - create_tables + sql: | + with revenue0 (supplier_no, total_revenue) as ( + select + l_suppkey, + sum(l_extendedprice * (1 - l_discount)) + from + lineitem + where + l_shipdate >= date '1993-01-01' + and l_shipdate < date '1993-01-01' + interval '3' month + group by + l_suppkey + ) + select + s_suppkey, + s_name, + s_address, + s_phone, + total_revenue + from + supplier, + revenue0 + where + s_suppkey = supplier_no + and total_revenue = ( + select + max(total_revenue) + from + revenue0 + ) + order by + s_suppkey + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [s_suppkey, s_name, s_address, s_phone, total_revenue, _row_id(hidden)], stream_key: [], pk_columns: [s_suppkey], pk_conflict: NoCheck } + └─StreamProject { exprs: [s_suppkey, s_name, s_address, s_phone, sum($expr1), _row_id] } + └─StreamTopN { order: [s_suppkey ASC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [s_suppkey ASC], limit: 1, offset: 0, group_key: [_vnode] } + └─StreamProject { exprs: [s_suppkey, s_name, s_address, s_phone, sum($expr1), _row_id, Vnode(sum($expr1)) as _vnode] } + └─StreamHashJoin { type: Inner, predicate: sum($expr1) = max(max(sum($expr1))), output: [s_suppkey, s_name, s_address, s_phone, sum($expr1), _row_id] } + ├─StreamExchange { dist: HashShard(sum($expr1)) } + │ └─StreamHashJoin { type: Inner, predicate: s_suppkey = l_suppkey, output: [s_suppkey, s_name, s_address, s_phone, sum($expr1), _row_id, l_suppkey] } + │ ├─StreamExchange { dist: HashShard(s_suppkey) } + │ │ └─StreamRowIdGen { row_id_index: 10 } + │ │ └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamShare { id: 9 } + │ └─StreamProject { exprs: [l_suppkey, sum($expr1)] } + │ └─StreamHashAgg [append_only] { group_key: [l_suppkey], aggs: [sum($expr1), count] } + │ └─StreamExchange { dist: HashShard(l_suppkey) } + │ └─StreamProject { exprs: [l_suppkey, (l_extendedprice * (1:Decimal - l_discount)) as $expr1, _row_id] } + │ └─StreamFilter { predicate: (l_shipdate >= '1993-01-01':Date) AND (l_shipdate < '1993-04-01 00:00:00':Timestamp) } + │ └─StreamRowIdGen { row_id_index: 19 } + │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(max(max(sum($expr1)))) } + └─StreamProject { exprs: [max(max(sum($expr1)))] } + └─StreamSimpleAgg { aggs: [max(max(sum($expr1))), count] } + └─StreamExchange { dist: Single } + └─StreamHashAgg { group_key: [_vnode], aggs: [max(sum($expr1)), count] } + └─StreamProject { exprs: [l_suppkey, sum($expr1), Vnode(l_suppkey) as _vnode] } + └─StreamShare { id: 9 } + └─StreamProject { exprs: [l_suppkey, sum($expr1)] } + └─StreamHashAgg [append_only] { group_key: [l_suppkey], aggs: [sum($expr1), count] } + └─StreamExchange { dist: HashShard(l_suppkey) } + └─StreamProject { exprs: [l_suppkey, (l_extendedprice * (1:Decimal - l_discount)) as $expr1, _row_id] } + └─StreamFilter { predicate: (l_shipdate >= '1993-01-01':Date) AND (l_shipdate < '1993-04-01 00:00:00':Timestamp) } + └─StreamRowIdGen { row_id_index: 19 } + └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q16 + before: + - create_tables + sql: | + select + p_brand, + p_type, + p_size, + count(distinct ps_suppkey) as supplier_cnt + from + partsupp, + part + where + p_partkey = ps_partkey + and p_brand <> 'Brand#45' + and p_type not like 'SMALL PLATED%' + and p_size in (19, 17, 16, 23, 10, 4, 38, 11) + and ps_suppkey not in ( + select + s_suppkey + from + supplier + where + s_comment like '%Customer%Complaints%' + ) + group by + p_brand, + p_type, + p_size + order by + supplier_cnt desc, + p_brand, + p_type, + p_size + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [p_brand, p_type, p_size, supplier_cnt], stream_key: [], pk_columns: [supplier_cnt, p_brand, p_type, p_size], pk_conflict: NoCheck } + └─StreamProject { exprs: [p_brand, p_type, p_size, count(distinct ps_suppkey)] } + └─StreamTopN { order: [count(distinct ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [count(distinct ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC], limit: 1, offset: 0, group_key: [$expr1] } + └─StreamProject { exprs: [p_brand, p_type, p_size, count(distinct ps_suppkey), Vnode(p_brand, p_type, p_size) as $expr1] } + └─StreamHashAgg { group_key: [p_brand, p_type, p_size], aggs: [count(distinct ps_suppkey), count] } + └─StreamExchange { dist: HashShard(p_brand, p_type, p_size) } + └─StreamHashJoin { type: LeftAnti, predicate: ps_suppkey = s_suppkey, output: [ps_suppkey, p_brand, p_type, p_size, _row_id, _row_id, ps_partkey] } + ├─StreamExchange { dist: HashShard(ps_suppkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: ps_partkey = p_partkey, output: [ps_suppkey, p_brand, p_type, p_size, _row_id, ps_partkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(ps_partkey) } + │ │ └─StreamRowIdGen { row_id_index: 8 } + │ │ └─StreamSourceScan { columns: [ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(p_partkey) } + │ └─StreamFilter { predicate: (p_brand <> 'Brand#45':Varchar) AND Not(Like(p_type, 'SMALL PLATED%':Varchar)) AND In(p_size, 19:Int32, 17:Int32, 16:Int32, 23:Int32, 10:Int32, 4:Int32, 38:Int32, 11:Int32) } + │ └─StreamRowIdGen { row_id_index: 12 } + │ └─StreamSourceScan { columns: [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(s_suppkey) } + └─StreamProject { exprs: [s_suppkey, _row_id] } + └─StreamFilter { predicate: Like(s_comment, '%Customer%Complaints%':Varchar) } + └─StreamRowIdGen { row_id_index: 10 } + └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q17 + before: + - create_tables + sql: | + select + sum(l_extendedprice) / 7.0 as avg_yearly + from + lineitem, + part + where + p_partkey = l_partkey + and p_brand = 'Brand#13' + and p_container = 'JUMBO PKG' + and l_quantity < ( + select + 0.2 * avg(l_quantity) + from + lineitem + where + l_partkey = p_partkey + ); + stream_plan: |- + StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck } + └─StreamProject { exprs: [(sum(sum(l_extendedprice)) / 7.0:Decimal) as $expr2] } + └─StreamSimpleAgg { aggs: [sum(sum(l_extendedprice)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum(l_extendedprice)] } + └─StreamProject { exprs: [l_extendedprice, _row_id, _row_id, l_partkey, p_partkey, l_partkey] } + └─StreamFilter { predicate: (l_quantity < $expr1) } + └─StreamHashJoin { type: Inner, predicate: p_partkey = l_partkey, output: all } + ├─StreamExchange { dist: HashShard(p_partkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: l_partkey = p_partkey, output: [l_quantity, l_extendedprice, p_partkey, _row_id, l_partkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(l_partkey) } + │ │ └─StreamShare { id: 3 } + │ │ └─StreamProject { exprs: [l_partkey, l_quantity, l_extendedprice, _row_id] } + │ │ └─StreamRowIdGen { row_id_index: 19 } + │ │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(p_partkey) } + │ └─StreamFilter { predicate: (p_brand = 'Brand#13':Varchar) AND (p_container = 'JUMBO PKG':Varchar) } + │ └─StreamRowIdGen { row_id_index: 12 } + │ └─StreamSourceScan { columns: [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamProject { exprs: [(0.2:Decimal * (sum(l_quantity) / count(l_quantity)::Decimal)) as $expr1, l_partkey] } + └─StreamHashAgg [append_only] { group_key: [l_partkey], aggs: [sum(l_quantity), count(l_quantity), count] } + └─StreamExchange { dist: HashShard(l_partkey) } + └─StreamShare { id: 3 } + └─StreamProject { exprs: [l_partkey, l_quantity, l_extendedprice, _row_id] } + └─StreamRowIdGen { row_id_index: 19 } + └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q18 + before: + - create_tables + sql: | + select + c_name, + c_custkey, + o_orderkey, + o_orderdate, + o_totalprice, + sum(l_quantity) quantity + from + customer, + orders, + lineitem + where + o_orderkey in ( + select + l_orderkey + from + lineitem + group by + l_orderkey + having + sum(l_quantity) > 1 + ) + and c_custkey = o_custkey + and o_orderkey = l_orderkey + group by + c_name, + c_custkey, + o_orderkey, + o_orderdate, + o_totalprice + order by + o_totalprice desc, + o_orderdate + LIMIT 100; + stream_plan: |- + StreamMaterialize { columns: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, quantity], stream_key: [c_custkey, c_name, o_orderkey, o_totalprice, o_orderdate], pk_columns: [o_totalprice, o_orderdate, c_custkey, c_name, o_orderkey], pk_conflict: NoCheck } + └─StreamProject { exprs: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, sum(l_quantity)] } + └─StreamTopN { order: [o_totalprice DESC, o_orderdate ASC], limit: 100, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [o_totalprice DESC, o_orderdate ASC], limit: 100, offset: 0, group_key: [$expr1] } + └─StreamProject { exprs: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, sum(l_quantity), Vnode(o_orderkey) as $expr1] } + └─StreamHashAgg { group_key: [c_custkey, c_name, o_orderkey, o_totalprice, o_orderdate], aggs: [sum(l_quantity), count] } + └─StreamHashJoin { type: LeftSemi, predicate: o_orderkey = l_orderkey, output: all } + ├─StreamHashJoin [append_only] { type: Inner, predicate: o_orderkey = l_orderkey, output: [c_custkey, c_name, o_orderkey, o_totalprice, o_orderdate, l_quantity, _row_id, _row_id, _row_id] } + │ ├─StreamExchange { dist: HashShard(o_orderkey) } + │ │ └─StreamHashJoin [append_only] { type: Inner, predicate: c_custkey = o_custkey, output: [c_custkey, c_name, o_orderkey, o_totalprice, o_orderdate, _row_id, _row_id] } + │ │ ├─StreamExchange { dist: HashShard(c_custkey) } + │ │ │ └─StreamRowIdGen { row_id_index: 11 } + │ │ │ └─StreamSourceScan { columns: [c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ │ └─StreamExchange { dist: HashShard(o_custkey) } + │ │ └─StreamRowIdGen { row_id_index: 12 } + │ │ └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(l_orderkey) } + │ └─StreamShare { id: 9 } + │ └─StreamProject { exprs: [l_orderkey, l_quantity, _row_id] } + │ └─StreamRowIdGen { row_id_index: 19 } + │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamProject { exprs: [l_orderkey] } + └─StreamFilter { predicate: (sum(l_quantity) > 1:Decimal) } + └─StreamProject { exprs: [l_orderkey, sum(l_quantity)] } + └─StreamHashAgg [append_only] { group_key: [l_orderkey], aggs: [sum(l_quantity), count] } + └─StreamExchange { dist: HashShard(l_orderkey) } + └─StreamShare { id: 9 } + └─StreamProject { exprs: [l_orderkey, l_quantity, _row_id] } + └─StreamRowIdGen { row_id_index: 19 } + └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q19 + before: + - create_tables + sql: | + select + sum(l_extendedprice* (1 - l_discount)) as revenue + from + lineitem, + part + where + ( + p_partkey = l_partkey + and p_brand = 'Brand#52' + and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') + and l_quantity >= 1 and l_quantity <= 11 + and p_size between 1 and 5 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) + or + ( + p_partkey = l_partkey + and p_brand = 'Brand#24' + and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') + and l_quantity >= 30 and l_quantity <= 40 + and p_size between 1 and 10 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) + or + ( + p_partkey = l_partkey + and p_brand = 'Brand#32' + and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') + and l_quantity >= 10 and l_quantity <= 20 + and p_size between 1 and 15 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ); + stream_plan: |- + StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } + └─StreamProject { exprs: [sum(sum($expr1))] } + └─StreamSimpleAgg [append_only] { aggs: [sum(sum($expr1)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum($expr1)] } + └─StreamProject { exprs: [(l_extendedprice * (1:Decimal - l_discount)) as $expr1, _row_id, _row_id, l_partkey] } + └─StreamFilter { predicate: ((((((p_brand = 'Brand#52':Varchar) AND In(p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND ((l_quantity >= 1:Decimal) AND (l_quantity <= 11:Decimal))) AND (p_size <= 5:Int32)) OR ((((p_brand = 'Brand#24':Varchar) AND In(p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND ((l_quantity >= 30:Decimal) AND (l_quantity <= 40:Decimal))) AND (p_size <= 10:Int32))) OR ((((p_brand = 'Brand#32':Varchar) AND In(p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND ((l_quantity >= 10:Decimal) AND (l_quantity <= 20:Decimal))) AND (p_size <= 15:Int32))) } + └─StreamHashJoin [append_only] { type: Inner, predicate: l_partkey = p_partkey, output: all } + ├─StreamExchange { dist: HashShard(l_partkey) } + │ └─StreamFilter { predicate: In(l_shipmode, 'AIR':Varchar, 'AIR REG':Varchar) AND (l_shipinstruct = 'DELIVER IN PERSON':Varchar) } + │ └─StreamRowIdGen { row_id_index: 19 } + │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(p_partkey) } + └─StreamFilter { predicate: (p_size >= 1:Int32) } + └─StreamRowIdGen { row_id_index: 12 } + └─StreamSourceScan { columns: [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q20 + before: + - create_tables + sql: "select\n\ts_name,\n\ts_address\nfrom\n\tsupplier,\n\tnation\nwhere\n\ts_suppkey in (\n\t\tselect\n\t\t\tps_suppkey\n\t\tfrom\n\t\t\tpartsupp,\n\t\t\t(\n\t\t\t\tselect\n\t\t\t\t\tl_partkey agg_partkey,\n\t\t\t\t\tl_suppkey agg_suppkey,\n\t\t\t\t\t0.5 * sum(l_quantity) AS agg_quantity\n\t\t\t\tfrom\n\t\t\t\t\tlineitem\n\t\t\t\twhere\n\t\t\t\t\tl_shipdate >= date '1994-01-01'\n\t\t\t\t\tand l_shipdate < date '1994-01-01' + interval '1' year\n\t\t\t\tgroup by\n\t\t\t\t\tl_partkey,\n\t\t\t\t\tl_suppkey\n\t\t\t) agg_lineitem\n\t\twhere\n\t\t\tagg_partkey = ps_partkey\n\t\t\tand agg_suppkey = ps_suppkey\n\t\t\tand ps_partkey in (\n\t\t\t\tselect\n\t\t\t\t\tp_partkey\n\t\t\t\tfrom\n\t\t\t\t\tpart\n\t\t\t\twhere\n\t\t\t\t\tp_name like 'forest%'\n\t\t\t)\n\t\t\tand ps_availqty > agg_quantity\n\t)\n\tand s_nationkey = n_nationkey\n\tand n_name = 'KENYA'\norder by\n\ts_name\nLIMIT 1;\n" + stream_plan: |- + StreamMaterialize { columns: [s_name, s_address, _row_id(hidden), _row_id#1(hidden), s_nationkey(hidden), s_suppkey(hidden)], stream_key: [], pk_columns: [s_name], pk_conflict: NoCheck } + └─StreamProject { exprs: [s_name, s_address, _row_id, _row_id, s_nationkey, s_suppkey] } + └─StreamTopN { order: [s_name ASC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [s_name ASC], limit: 1, offset: 0, group_key: [_vnode] } + └─StreamProject { exprs: [s_name, s_address, _row_id, _row_id, s_nationkey, s_suppkey, Vnode(s_suppkey) as _vnode] } + └─StreamHashJoin { type: LeftSemi, predicate: s_suppkey = ps_suppkey, output: [s_name, s_address, _row_id, _row_id, s_nationkey, s_suppkey] } + ├─StreamExchange { dist: HashShard(s_suppkey) } + │ └─StreamHashJoin [append_only] { type: Inner, predicate: s_nationkey = n_nationkey, output: [s_suppkey, s_name, s_address, _row_id, s_nationkey, _row_id] } + │ ├─StreamExchange { dist: HashShard(s_nationkey) } + │ │ └─StreamRowIdGen { row_id_index: 10 } + │ │ └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(n_nationkey) } + │ └─StreamFilter { predicate: (n_name = 'KENYA':Varchar) } + │ └─StreamRowIdGen { row_id_index: 7 } + │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(ps_suppkey) } + └─StreamHashJoin { type: LeftSemi, predicate: ps_partkey = p_partkey, output: [ps_suppkey, _row_id, ps_partkey] } + ├─StreamExchange { dist: HashShard(ps_partkey) } + │ └─StreamProject { exprs: [ps_partkey, ps_suppkey, _row_id, l_partkey, l_suppkey] } + │ └─StreamFilter { predicate: ($expr1 > $expr2) } + │ └─StreamHashJoin { type: Inner, predicate: ps_partkey = l_partkey AND ps_suppkey = l_suppkey, output: all } + │ ├─StreamExchange { dist: HashShard(ps_partkey, ps_suppkey) } + │ │ └─StreamProject { exprs: [ps_partkey, ps_suppkey, ps_availqty::Decimal as $expr1, _row_id] } + │ │ └─StreamRowIdGen { row_id_index: 8 } + │ │ └─StreamSourceScan { columns: [ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamProject { exprs: [l_partkey, l_suppkey, (0.5:Decimal * sum(l_quantity)) as $expr2] } + │ └─StreamHashAgg [append_only] { group_key: [l_partkey, l_suppkey], aggs: [sum(l_quantity), count] } + │ └─StreamExchange { dist: HashShard(l_partkey, l_suppkey) } + │ └─StreamFilter { predicate: (l_shipdate >= '1994-01-01':Date) AND (l_shipdate < '1995-01-01 00:00:00':Timestamp) } + │ └─StreamRowIdGen { row_id_index: 19 } + │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(p_partkey) } + └─StreamProject { exprs: [p_partkey, _row_id] } + └─StreamFilter { predicate: Like(p_name, 'forest%':Varchar) } + └─StreamRowIdGen { row_id_index: 12 } + └─StreamSourceScan { columns: [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q21 + before: + - create_tables + sql: | + select + s_name, + count(*) as numwait + from + supplier, + lineitem l1, + orders, + nation + where + s_suppkey = l1.l_suppkey + and o_orderkey = l1.l_orderkey + and o_orderstatus = 'F' + and l1.l_receiptdate > l1.l_commitdate + and exists ( + select + * + from + lineitem l2 + where + l2.l_orderkey = l1.l_orderkey + and l2.l_suppkey <> l1.l_suppkey + ) + and not exists ( + select + * + from + lineitem l3 + where + l3.l_orderkey = l1.l_orderkey + and l3.l_suppkey <> l1.l_suppkey + and l3.l_receiptdate > l3.l_commitdate + ) + and s_nationkey = n_nationkey + and n_name = 'GERMANY' + group by + s_name + order by + numwait desc, + s_name + LIMIT 100; + stream_plan: |- + StreamMaterialize { columns: [s_name, numwait], stream_key: [s_name], pk_columns: [numwait, s_name], pk_conflict: NoCheck } + └─StreamProject { exprs: [s_name, count] } + └─StreamTopN { order: [count DESC, s_name ASC], limit: 100, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [count DESC, s_name ASC], limit: 100, offset: 0, group_key: [_vnode] } + └─StreamProject { exprs: [s_name, count, Vnode(s_name) as _vnode] } + └─StreamHashAgg { group_key: [s_name], aggs: [count] } + └─StreamExchange { dist: HashShard(s_name) } + └─StreamHashJoin { type: LeftAnti, predicate: l_orderkey = l_orderkey AND (l_suppkey <> l_suppkey), output: [s_name, _row_id, _row_id, n_nationkey, _row_id, _row_id, o_orderkey, s_suppkey, l_orderkey] } + ├─StreamHashJoin { type: LeftSemi, predicate: l_orderkey = l_orderkey AND (l_suppkey <> l_suppkey), output: [s_name, l_orderkey, l_suppkey, _row_id, _row_id, n_nationkey, _row_id, _row_id, o_orderkey, s_suppkey] } + │ ├─StreamExchange { dist: HashShard(l_orderkey) } + │ │ └─StreamHashJoin [append_only] { type: Inner, predicate: s_suppkey = l_suppkey, output: [s_name, l_orderkey, l_suppkey, _row_id, _row_id, n_nationkey, s_suppkey, _row_id, _row_id, o_orderkey] } + │ │ ├─StreamExchange { dist: HashShard(s_suppkey) } + │ │ │ └─StreamHashJoin [append_only] { type: Inner, predicate: n_nationkey = s_nationkey, output: [s_suppkey, s_name, _row_id, n_nationkey, _row_id] } + │ │ │ ├─StreamExchange { dist: HashShard(n_nationkey) } + │ │ │ │ └─StreamFilter { predicate: (n_name = 'GERMANY':Varchar) } + │ │ │ │ └─StreamRowIdGen { row_id_index: 7 } + │ │ │ │ └─StreamSourceScan { columns: [n_nationkey, n_name, n_regionkey, n_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ │ │ └─StreamExchange { dist: HashShard(s_nationkey) } + │ │ │ └─StreamRowIdGen { row_id_index: 10 } + │ │ │ └─StreamSourceScan { columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ │ └─StreamExchange { dist: HashShard(l_suppkey) } + │ │ └─StreamHashJoin [append_only] { type: Inner, predicate: o_orderkey = l_orderkey, output: [l_orderkey, l_suppkey, _row_id, o_orderkey, _row_id] } + │ │ ├─StreamExchange { dist: HashShard(o_orderkey) } + │ │ │ └─StreamFilter { predicate: (o_orderstatus = 'F':Varchar) } + │ │ │ └─StreamRowIdGen { row_id_index: 12 } + │ │ │ └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ │ └─StreamExchange { dist: HashShard(l_orderkey) } + │ │ └─StreamFilter { predicate: (l_receiptdate > l_commitdate) } + │ │ └─StreamShare { id: 13 } + │ │ └─StreamProject { exprs: [l_orderkey, l_suppkey, l_commitdate, l_receiptdate, _row_id] } + │ │ └─StreamRowIdGen { row_id_index: 19 } + │ │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(l_orderkey) } + │ └─StreamProject { exprs: [l_orderkey, l_suppkey, _row_id] } + │ └─StreamShare { id: 13 } + │ └─StreamProject { exprs: [l_orderkey, l_suppkey, l_commitdate, l_receiptdate, _row_id] } + │ └─StreamRowIdGen { row_id_index: 19 } + │ └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: HashShard(l_orderkey) } + └─StreamProject { exprs: [l_orderkey, l_suppkey, _row_id] } + └─StreamFilter { predicate: (l_receiptdate > l_commitdate) } + └─StreamShare { id: 13 } + └─StreamProject { exprs: [l_orderkey, l_suppkey, l_commitdate, l_receiptdate, _row_id] } + └─StreamRowIdGen { row_id_index: 19 } + └─StreamSourceScan { columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } +- id: tpch_q22 + before: + - create_tables + sql: | + select + cntrycode, + count(*) as numcust, + sum(c_acctbal) as totacctbal + from + ( + select + substring(c_phone from 1 for 2) as cntrycode, + c_acctbal + from + customer + where + substring(c_phone from 1 for 2) in + ('30', '24', '31', '38', '25', '34', '37') + and c_acctbal > ( + select + avg(c_acctbal) + from + customer + where + c_acctbal > 0.00::numeric + and substring(c_phone from 1 for 2) in + ('30', '24', '31', '38', '25', '34', '37') + ) + and not exists ( + select + * + from + orders + where + o_custkey = c_custkey + ) + ) as custsale + group by + cntrycode + order by + cntrycode + LIMIT 1; + stream_plan: |- + StreamMaterialize { columns: [cntrycode, numcust, totacctbal], stream_key: [], pk_columns: [cntrycode], pk_conflict: NoCheck } + └─StreamProject { exprs: [$expr2, count, sum(c_acctbal)] } + └─StreamTopN { order: [$expr2 ASC], limit: 1, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: [$expr2 ASC], limit: 1, offset: 0, group_key: [_vnode] } + └─StreamProject { exprs: [$expr2, count, sum(c_acctbal), Vnode($expr2) as _vnode] } + └─StreamHashAgg { group_key: [$expr2], aggs: [count, sum(c_acctbal)] } + └─StreamExchange { dist: HashShard($expr2) } + └─StreamProject { exprs: [Substr(c_phone, 1:Int32, 2:Int32) as $expr2, c_acctbal, _row_id, c_custkey] } + └─StreamDynamicFilter { predicate: (c_acctbal > $expr1), output: [c_phone, c_acctbal, _row_id, c_custkey] } + ├─StreamHashJoin { type: LeftAnti, predicate: c_custkey = o_custkey, output: [c_phone, c_acctbal, _row_id, c_custkey] } + │ ├─StreamExchange { dist: HashShard(c_custkey) } + │ │ └─StreamFilter { predicate: In(Substr(c_phone, 1:Int32, 2:Int32), '30':Varchar, '24':Varchar, '31':Varchar, '38':Varchar, '25':Varchar, '34':Varchar, '37':Varchar) } + │ │ └─StreamShare { id: 4 } + │ │ └─StreamProject { exprs: [c_custkey, c_phone, c_acctbal, _row_id] } + │ │ └─StreamFilter { predicate: In(Substr(c_phone, 1:Int32, 2:Int32), '30':Varchar, '24':Varchar, '31':Varchar, '38':Varchar, '25':Varchar, '34':Varchar, '37':Varchar) } + │ │ └─StreamRowIdGen { row_id_index: 11 } + │ │ └─StreamSourceScan { columns: [c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + │ └─StreamExchange { dist: HashShard(o_custkey) } + │ └─StreamProject { exprs: [o_custkey, _row_id] } + │ └─StreamRowIdGen { row_id_index: 12 } + │ └─StreamSourceScan { columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } + └─StreamExchange { dist: Broadcast } + └─StreamProject { exprs: [(sum(sum(c_acctbal)) / sum0(count(c_acctbal))::Decimal) as $expr1] } + └─StreamSimpleAgg [append_only] { aggs: [sum(sum(c_acctbal)), sum0(count(c_acctbal)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum(c_acctbal), count(c_acctbal)] } + └─StreamFilter { predicate: (c_acctbal > 0.00:Decimal) AND In(Substr(c_phone, 1:Int32, 2:Int32), '30':Varchar, '24':Varchar, '31':Varchar, '38':Varchar, '25':Varchar, '34':Varchar, '37':Varchar) } + └─StreamShare { id: 4 } + └─StreamProject { exprs: [c_custkey, c_phone, c_acctbal, _row_id] } + └─StreamFilter { predicate: In(Substr(c_phone, 1:Int32, 2:Int32), '30':Varchar, '24':Varchar, '31':Varchar, '38':Varchar, '25':Varchar, '34':Varchar, '37':Varchar) } + └─StreamRowIdGen { row_id_index: 11 } + └─StreamSourceScan { columns: [c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }