Skip to content

Commit

Permalink
test: add tpch/nexmark tests for shared kafka source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Nov 27, 2024
1 parent e61510f commit 17ccb4f
Show file tree
Hide file tree
Showing 14 changed files with 6,036 additions and 8 deletions.
48 changes: 48 additions & 0 deletions e2e_test/nexmark/create_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
control substitution on

statement ok
CREATE SOURCE person (
"id" BIGINT,
"name" VARCHAR,
"email_address" VARCHAR,
"credit_card" VARCHAR,
"city" VARCHAR,
"state" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR,
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-person'
) FORMAT PLAIN ENCODE JSON;


statement ok
CREATE SOURCE auction (
"id" BIGINT,
"item_name" VARCHAR,
"description" VARCHAR,
"initial_bid" BIGINT,
"reserve" BIGINT,
"date_time" TIMESTAMP,
"expires" TIMESTAMP,
"seller" BIGINT,
"category" BIGINT,
"extra" VARCHAR,
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-auction'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE bid (
"auction" BIGINT,
"bidder" BIGINT,
"price" BIGINT,
"channel" VARCHAR,
"url" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-bid'
) FORMAT PLAIN ENCODE JSON;
8 changes: 8 additions & 0 deletions e2e_test/nexmark/drop_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
statement ok
DROP SOURCE person CASCADE;

statement ok
DROP SOURCE auction CASCADE;

statement ok
DROP SOURCE bid CASCADE;
58 changes: 58 additions & 0 deletions e2e_test/nexmark/produce_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
control substitution on

system ok
rpk topic delete -r nexmark-* || true

system ok
rpk topic create nexmark-auction -p 4 &&
rpk topic create nexmark-bid -p 4 &&
rpk topic create nexmark-person -p 4

include ./create_tables.slt.part

include ./insert_auction.slt.part
include ./insert_bid.slt.part
include ./insert_person.slt.part

statement ok
flush;

statement ok
create sink nexmark_auction FROM auction
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-auction'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink nexmark_bid FROM bid
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-bid'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink nexmark_person FROM person
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-person'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

sleep 5s

statement ok
DROP SINK nexmark_auction;

statement ok
DROP SINK nexmark_bid;

statement ok
DROP SINK nexmark_person;

include ./drop_tables.slt.part
9 changes: 9 additions & 0 deletions e2e_test/source_inline/kafka/nexmark.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include ../../nexmark/produce_kafka.slt.part
include ../../nexmark/create_sources_kafka.slt.part

control substitution off

include ../../streaming/nexmark/create_views.slt.part
include ../../streaming/nexmark/test_mv_result.slt.part

include ../../nexmark/drop_sources_kafka.slt.part
30 changes: 30 additions & 0 deletions e2e_test/source_inline/kafka/tpch.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
include ../../tpch/produce_kafka.slt.part
include ../../tpch/create_sources_kafka.slt.part

control substitution off

include ../../streaming/tpch/create_views.slt.part
include ../../streaming/tpch/q1.slt.part
include ../../streaming/tpch/q2.slt.part
include ../../streaming/tpch/q3.slt.part
include ../../streaming/tpch/q4.slt.part
include ../../streaming/tpch/q5.slt.part
include ../../streaming/tpch/q6.slt.part
include ../../streaming/tpch/q7.slt.part
include ../../streaming/tpch/q8.slt.part
include ../../streaming/tpch/q9.slt.part
include ../../streaming/tpch/q10.slt.part
include ../../streaming/tpch/q11.slt.part
include ../../streaming/tpch/q12.slt.part
include ../../streaming/tpch/q13.slt.part
include ../../streaming/tpch/q14.slt.part
include ../../streaming/tpch/q15.slt.part
include ../../streaming/tpch/q16.slt.part
include ../../streaming/tpch/q17.slt.part
include ../../streaming/tpch/q18.slt.part
include ../../streaming/tpch/q19.slt.part
include ../../streaming/tpch/q20.slt.part
include ../../streaming/tpch/q21.slt.part
include ../../streaming/tpch/q22.slt.part

include ../../tpch/drop_sources_kafka.slt.part
118 changes: 118 additions & 0 deletions e2e_test/tpch/create_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
control substitution on

statement ok
CREATE SOURCE supplier (
s_suppkey INTEGER,
s_name VARCHAR,
s_address VARCHAR,
s_nationkey INTEGER,
s_phone VARCHAR,
s_acctbal NUMERIC,
s_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-supplier'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE part (
p_partkey INTEGER,
p_name VARCHAR,
p_mfgr VARCHAR,
p_brand VARCHAR,
p_type VARCHAR,
p_size INTEGER,
p_container VARCHAR,
p_retailprice NUMERIC,
p_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-part'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE partsupp (
ps_partkey INTEGER,
ps_suppkey INTEGER,
ps_availqty INTEGER,
ps_supplycost NUMERIC,
ps_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-partsupp'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE customer (
c_custkey INTEGER,
c_name VARCHAR,
c_address VARCHAR,
c_nationkey INTEGER,
c_phone VARCHAR,
c_acctbal NUMERIC,
c_mktsegment VARCHAR,
c_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-customer'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE orders (
o_orderkey BIGINT,
o_custkey INTEGER,
o_orderstatus VARCHAR,
o_totalprice NUMERIC,
o_orderdate DATE,
o_orderpriority VARCHAR,
o_clerk VARCHAR,
o_shippriority INTEGER,
o_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-orders'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE lineitem (
l_orderkey BIGINT,
l_partkey INTEGER,
l_suppkey INTEGER,
l_linenumber INTEGER,
l_quantity NUMERIC,
l_extendedprice NUMERIC,
l_discount NUMERIC,
l_tax NUMERIC,
l_returnflag VARCHAR,
l_linestatus VARCHAR,
l_shipdate DATE,
l_commitdate DATE,
l_receiptdate DATE,
l_shipinstruct VARCHAR,
l_shipmode VARCHAR,
l_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-lineitem'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE nation (
n_nationkey INTEGER,
n_name VARCHAR,
n_regionkey INTEGER,
n_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-nation'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE region (
r_regionkey INTEGER,
r_name VARCHAR,
r_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-region'
) FORMAT PLAIN ENCODE JSON;
23 changes: 23 additions & 0 deletions e2e_test/tpch/drop_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
statement ok
DROP SOURCE supplier CASCADE;

statement ok
DROP SOURCE region CASCADE;

statement ok
DROP SOURCE nation CASCADE;

statement ok
DROP SOURCE lineitem CASCADE;

statement ok
DROP SOURCE orders CASCADE;

statement ok
DROP SOURCE customer CASCADE;

statement ok
DROP SOURCE partsupp CASCADE;

statement ok
DROP SOURCE part CASCADE;
Loading

0 comments on commit 17ccb4f

Please sign in to comment.