Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add tpch/nexmark tests for shared kafka source #19589

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions e2e_test/nexmark/create_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
control substitution on

statement ok
CREATE SOURCE person (
"id" BIGINT,
"name" VARCHAR,
"email_address" VARCHAR,
"credit_card" VARCHAR,
"city" VARCHAR,
"state" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR,
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-person'
) FORMAT PLAIN ENCODE JSON;


statement ok
CREATE SOURCE auction (
"id" BIGINT,
"item_name" VARCHAR,
"description" VARCHAR,
"initial_bid" BIGINT,
"reserve" BIGINT,
"date_time" TIMESTAMP,
"expires" TIMESTAMP,
"seller" BIGINT,
"category" BIGINT,
"extra" VARCHAR,
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-auction'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE bid (
"auction" BIGINT,
"bidder" BIGINT,
"price" BIGINT,
"channel" VARCHAR,
"url" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-bid'
) FORMAT PLAIN ENCODE JSON;
8 changes: 8 additions & 0 deletions e2e_test/nexmark/drop_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
statement ok
DROP SOURCE person CASCADE;

statement ok
DROP SOURCE auction CASCADE;

statement ok
DROP SOURCE bid CASCADE;
58 changes: 58 additions & 0 deletions e2e_test/nexmark/produce_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
control substitution on

system ok
rpk topic delete -r nexmark-* || true

system ok
rpk topic create nexmark-auction -p 4 &&
rpk topic create nexmark-bid -p 4 &&
rpk topic create nexmark-person -p 4

include ./create_tables.slt.part

include ./insert_auction.slt.part
include ./insert_bid.slt.part
include ./insert_person.slt.part

statement ok
flush;

statement ok
create sink nexmark_auction FROM auction
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-auction'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink nexmark_bid FROM bid
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-bid'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink nexmark_person FROM person
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-person'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

sleep 5s

statement ok
DROP SINK nexmark_auction;

statement ok
DROP SINK nexmark_bid;

statement ok
DROP SINK nexmark_person;

include ./drop_tables.slt.part
9 changes: 9 additions & 0 deletions e2e_test/source_inline/kafka/nexmark.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include ../../nexmark/produce_kafka.slt.part
include ../../nexmark/create_sources_kafka.slt.part

control substitution off

include ../../streaming/nexmark/create_views.slt.part
include ../../streaming/nexmark/test_mv_result.slt.part

include ../../nexmark/drop_sources_kafka.slt.part
30 changes: 30 additions & 0 deletions e2e_test/source_inline/kafka/tpch.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
include ../../tpch/produce_kafka.slt.part
include ../../tpch/create_sources_kafka.slt.part

control substitution off

include ../../streaming/tpch/create_views.slt.part
include ../../streaming/tpch/q1.slt.part
include ../../streaming/tpch/q2.slt.part
include ../../streaming/tpch/q3.slt.part
include ../../streaming/tpch/q4.slt.part
include ../../streaming/tpch/q5.slt.part
include ../../streaming/tpch/q6.slt.part
include ../../streaming/tpch/q7.slt.part
include ../../streaming/tpch/q8.slt.part
include ../../streaming/tpch/q9.slt.part
include ../../streaming/tpch/q10.slt.part
include ../../streaming/tpch/q11.slt.part
include ../../streaming/tpch/q12.slt.part
include ../../streaming/tpch/q13.slt.part
include ../../streaming/tpch/q14.slt.part
include ../../streaming/tpch/q15.slt.part
include ../../streaming/tpch/q16.slt.part
include ../../streaming/tpch/q17.slt.part
include ../../streaming/tpch/q18.slt.part
include ../../streaming/tpch/q19.slt.part
include ../../streaming/tpch/q20.slt.part
include ../../streaming/tpch/q21.slt.part
include ../../streaming/tpch/q22.slt.part

include ../../tpch/drop_sources_kafka.slt.part
118 changes: 118 additions & 0 deletions e2e_test/tpch/create_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
control substitution on

statement ok
CREATE SOURCE supplier (
s_suppkey INTEGER,
s_name VARCHAR,
s_address VARCHAR,
s_nationkey INTEGER,
s_phone VARCHAR,
s_acctbal NUMERIC,
s_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-supplier'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE part (
p_partkey INTEGER,
p_name VARCHAR,
p_mfgr VARCHAR,
p_brand VARCHAR,
p_type VARCHAR,
p_size INTEGER,
p_container VARCHAR,
p_retailprice NUMERIC,
p_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-part'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE partsupp (
ps_partkey INTEGER,
ps_suppkey INTEGER,
ps_availqty INTEGER,
ps_supplycost NUMERIC,
ps_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-partsupp'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE customer (
c_custkey INTEGER,
c_name VARCHAR,
c_address VARCHAR,
c_nationkey INTEGER,
c_phone VARCHAR,
c_acctbal NUMERIC,
c_mktsegment VARCHAR,
c_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-customer'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE orders (
o_orderkey BIGINT,
o_custkey INTEGER,
o_orderstatus VARCHAR,
o_totalprice NUMERIC,
o_orderdate DATE,
o_orderpriority VARCHAR,
o_clerk VARCHAR,
o_shippriority INTEGER,
o_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-orders'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE lineitem (
l_orderkey BIGINT,
l_partkey INTEGER,
l_suppkey INTEGER,
l_linenumber INTEGER,
l_quantity NUMERIC,
l_extendedprice NUMERIC,
l_discount NUMERIC,
l_tax NUMERIC,
l_returnflag VARCHAR,
l_linestatus VARCHAR,
l_shipdate DATE,
l_commitdate DATE,
l_receiptdate DATE,
l_shipinstruct VARCHAR,
l_shipmode VARCHAR,
l_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-lineitem'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE nation (
n_nationkey INTEGER,
n_name VARCHAR,
n_regionkey INTEGER,
n_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-nation'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE region (
r_regionkey INTEGER,
r_name VARCHAR,
r_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-region'
) FORMAT PLAIN ENCODE JSON;
23 changes: 23 additions & 0 deletions e2e_test/tpch/drop_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
statement ok
DROP SOURCE supplier CASCADE;

statement ok
DROP SOURCE region CASCADE;

statement ok
DROP SOURCE nation CASCADE;

statement ok
DROP SOURCE lineitem CASCADE;

statement ok
DROP SOURCE orders CASCADE;

statement ok
DROP SOURCE customer CASCADE;

statement ok
DROP SOURCE partsupp CASCADE;

statement ok
DROP SOURCE part CASCADE;
Loading