Skip to content

Commit

Permalink
fix: correctly handle hidden columns for SourceBackfill
Browse files Browse the repository at this point in the history
fix #19575

Signed-off-by: xxchan <[email protected]>

fix test

Signed-off-by: xxchan <[email protected]>

add tpch test

Signed-off-by: xxchan <[email protected]>

fix

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 26, 2024
1 parent 184900a commit d30ea7d
Show file tree
Hide file tree
Showing 23 changed files with 6,235 additions and 91 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ repos:
rev: v2.3.0
hooks:
- id: end-of-file-fixer
exclude: 'src/frontend/planner_test/tests/testdata/.*'
- id: trailing-whitespace
- repo: https://github.com/crate-ci/typos
rev: v1.23.1
Expand Down
48 changes: 48 additions & 0 deletions e2e_test/nexmark/create_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
control substitution on

statement ok
CREATE SOURCE person (
"id" BIGINT,
"name" VARCHAR,
"email_address" VARCHAR,
"credit_card" VARCHAR,
"city" VARCHAR,
"state" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR,
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-person'
) FORMAT PLAIN ENCODE JSON;


statement ok
CREATE SOURCE auction (
"id" BIGINT,
"item_name" VARCHAR,
"description" VARCHAR,
"initial_bid" BIGINT,
"reserve" BIGINT,
"date_time" TIMESTAMP,
"expires" TIMESTAMP,
"seller" BIGINT,
"category" BIGINT,
"extra" VARCHAR,
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-auction'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE bid (
"auction" BIGINT,
"bidder" BIGINT,
"price" BIGINT,
"channel" VARCHAR,
"url" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-bid'
) FORMAT PLAIN ENCODE JSON;
8 changes: 8 additions & 0 deletions e2e_test/nexmark/drop_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
statement ok
DROP SOURCE person CASCADE;

statement ok
DROP SOURCE auction CASCADE;

statement ok
DROP SOURCE bid CASCADE;
99 changes: 99 additions & 0 deletions e2e_test/nexmark/produce_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
control substitution on

system ok
rpk topic delete -r nexmark-* || true

system ok
rpk topic create nexmark-auction -p 4 &&
rpk topic create nexmark-bid -p 4 &&
rpk topic create nexmark-person -p 4

include ./create_tables.slt.part

include ./insert_auction.slt.part
include ./insert_bid.slt.part
include ./insert_person.slt.part

statement ok
flush;

statement ok
create sink nexmark_auction FROM auction
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-auction'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink nexmark_bid FROM bid
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-bid'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink nexmark_person FROM person
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-person'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

sleep 5s

statement ok
DROP SINK nexmark_auction;

statement ok
DROP SINK nexmark_bid;

statement ok
DROP SINK nexmark_person;

# statement ok
# create sink nexmark_events AS
# WITH event_person AS (
# SELECT
# 0 AS event_type,
# row(id, name, email_address, credit_card, city, state, date_time, extra)::struct<"id" BIGINT, "name" VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, "city" VARCHAR, "state" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR> AS person,
# NULL::struct<"id" BIGINT, "item_name" VARCHAR, "description" VARCHAR, "initial_bid" BIGINT, "reserve" BIGINT, "date_time" TIMESTAMP, "expires" TIMESTAMP, "seller" BIGINT, "category" BIGINT, "extra" VARCHAR> AS auction,
# NULL::struct<"auction" BIGINT, "bidder" BIGINT, "price" BIGINT, "channel" VARCHAR, "url" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR> AS bid
# FROM person
# ),
# event_auction AS (
# SELECT
# 1 AS event_type,
# NULL::struct<"id" BIGINT, "name" VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, "city" VARCHAR, "state" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR> AS person,
# row(id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra)::struct<"id" BIGINT, "item_name" VARCHAR, "description" VARCHAR, "initial_bid" BIGINT, "reserve" BIGINT, "date_time" TIMESTAMP, "expires" TIMESTAMP, "seller" BIGINT, "category" BIGINT, "extra" VARCHAR> AS auction,
# NULL::struct<"auction" BIGINT, "bidder" BIGINT, "price" BIGINT, "channel" VARCHAR, "url" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR> AS bid
# FROM auction
# ),
# event_bid AS (
# SELECT
# 2 AS event_type,
# NULL::struct<"id" BIGINT, "name" VARCHAR, "email_address" VARCHAR, "credit_card" VARCHAR, "city" VARCHAR, "state" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR> AS person,
# NULL::struct<"id" BIGINT, "item_name" VARCHAR, "description" VARCHAR, "initial_bid" BIGINT, "reserve" BIGINT, "date_time" TIMESTAMP, "expires" TIMESTAMP, "seller" BIGINT, "category" BIGINT, "extra" VARCHAR> AS auction,
# row(auction, bidder, price, channel, url, date_time, extra)::struct<"auction" BIGINT, "bidder" BIGINT, "price" BIGINT, "channel" VARCHAR, "url" VARCHAR, "date_time" TIMESTAMP, "extra" VARCHAR> AS bid
# FROM bid
# )
# SELECT * FROM event_person
# UNION ALL
# SELECT * FROM event_auction
# UNION ALL
# SELECT * FROM event_bid
# WITH (
# ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
# topic = 'nexmark-events'
# ) FORMAT PLAIN ENCODE JSON (
# force_append_only='true'
# );

# statement ok
# DROP SINK nexmark_events;

include ./drop_tables.slt.part
9 changes: 9 additions & 0 deletions e2e_test/source_inline/kafka/nexmark.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include ../../nexmark/produce_kafka.slt.part
include ../../nexmark/create_sources_kafka.slt.part

control substitution off

include ../../streaming/nexmark/create_views.slt.part
include ../../streaming/nexmark/test_mv_result.slt.part

include ../../nexmark/drop_sources_kafka.slt.part
30 changes: 30 additions & 0 deletions e2e_test/source_inline/kafka/tpch.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
include ../../tpch/produce_kafka.slt.part
include ../../tpch/create_sources_kafka.slt.part

control substitution off

include ../../streaming/tpch/create_views.slt.part
include ../../streaming/tpch/q1.slt.part
include ../../streaming/tpch/q2.slt.part
include ../../streaming/tpch/q3.slt.part
include ../../streaming/tpch/q4.slt.part
include ../../streaming/tpch/q5.slt.part
include ../../streaming/tpch/q6.slt.part
include ../../streaming/tpch/q7.slt.part
include ../../streaming/tpch/q8.slt.part
include ../../streaming/tpch/q9.slt.part
include ../../streaming/tpch/q10.slt.part
include ../../streaming/tpch/q11.slt.part
include ../../streaming/tpch/q12.slt.part
include ../../streaming/tpch/q13.slt.part
include ../../streaming/tpch/q14.slt.part
include ../../streaming/tpch/q15.slt.part
include ../../streaming/tpch/q16.slt.part
include ../../streaming/tpch/q17.slt.part
include ../../streaming/tpch/q18.slt.part
include ../../streaming/tpch/q19.slt.part
include ../../streaming/tpch/q20.slt.part
include ../../streaming/tpch/q21.slt.part
include ../../streaming/tpch/q22.slt.part

include ../../tpch/drop_sources_kafka.slt.part
118 changes: 118 additions & 0 deletions e2e_test/tpch/create_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
control substitution on

statement ok
CREATE SOURCE supplier (
s_suppkey INTEGER,
s_name VARCHAR,
s_address VARCHAR,
s_nationkey INTEGER,
s_phone VARCHAR,
s_acctbal NUMERIC,
s_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-supplier'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE part (
p_partkey INTEGER,
p_name VARCHAR,
p_mfgr VARCHAR,
p_brand VARCHAR,
p_type VARCHAR,
p_size INTEGER,
p_container VARCHAR,
p_retailprice NUMERIC,
p_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-part'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE partsupp (
ps_partkey INTEGER,
ps_suppkey INTEGER,
ps_availqty INTEGER,
ps_supplycost NUMERIC,
ps_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-partsupp'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE customer (
c_custkey INTEGER,
c_name VARCHAR,
c_address VARCHAR,
c_nationkey INTEGER,
c_phone VARCHAR,
c_acctbal NUMERIC,
c_mktsegment VARCHAR,
c_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-customer'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE orders (
o_orderkey BIGINT,
o_custkey INTEGER,
o_orderstatus VARCHAR,
o_totalprice NUMERIC,
o_orderdate DATE,
o_orderpriority VARCHAR,
o_clerk VARCHAR,
o_shippriority INTEGER,
o_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-orders'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE lineitem (
l_orderkey BIGINT,
l_partkey INTEGER,
l_suppkey INTEGER,
l_linenumber INTEGER,
l_quantity NUMERIC,
l_extendedprice NUMERIC,
l_discount NUMERIC,
l_tax NUMERIC,
l_returnflag VARCHAR,
l_linestatus VARCHAR,
l_shipdate DATE,
l_commitdate DATE,
l_receiptdate DATE,
l_shipinstruct VARCHAR,
l_shipmode VARCHAR,
l_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-lineitem'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE nation (
n_nationkey INTEGER,
n_name VARCHAR,
n_regionkey INTEGER,
n_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-nation'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE region (
r_regionkey INTEGER,
r_name VARCHAR,
r_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-region'
) FORMAT PLAIN ENCODE JSON;
23 changes: 23 additions & 0 deletions e2e_test/tpch/drop_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
statement ok
DROP SOURCE supplier CASCADE;

statement ok
DROP SOURCE region CASCADE;

statement ok
DROP SOURCE nation CASCADE;

statement ok
DROP SOURCE lineitem CASCADE;

statement ok
DROP SOURCE orders CASCADE;

statement ok
DROP SOURCE customer CASCADE;

statement ok
DROP SOURCE partsupp CASCADE;

statement ok
DROP SOURCE part CASCADE;
Loading

0 comments on commit d30ea7d

Please sign in to comment.