Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick: shared source bug fixes (#19578, #19564) to 2.1 #19583

Merged
merged 4 commits into from
Nov 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ repos:
rev: v2.3.0
hooks:
- id: end-of-file-fixer
exclude: 'src/frontend/planner_test/tests/testdata/.*'
- id: trailing-whitespace
- repo: https://github.com/crate-ci/typos
rev: v1.23.1
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ python3 -m pip install --break-system-packages requests protobuf fastavro conflu
apt-get -y install jq

echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_meta=info" \
risedev ci-start ci-inline-source-test
risedev slt './e2e_test/source_inline/**/*.slt' -j16
risedev slt './e2e_test/source_inline/**/*.slt.serial'
48 changes: 48 additions & 0 deletions e2e_test/nexmark/create_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
control substitution on

statement ok
CREATE SOURCE person (
"id" BIGINT,
"name" VARCHAR,
"email_address" VARCHAR,
"credit_card" VARCHAR,
"city" VARCHAR,
"state" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR,
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-person'
) FORMAT PLAIN ENCODE JSON;


statement ok
CREATE SOURCE auction (
"id" BIGINT,
"item_name" VARCHAR,
"description" VARCHAR,
"initial_bid" BIGINT,
"reserve" BIGINT,
"date_time" TIMESTAMP,
"expires" TIMESTAMP,
"seller" BIGINT,
"category" BIGINT,
"extra" VARCHAR,
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-auction'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE bid (
"auction" BIGINT,
"bidder" BIGINT,
"price" BIGINT,
"channel" VARCHAR,
"url" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-bid'
) FORMAT PLAIN ENCODE JSON;
8 changes: 8 additions & 0 deletions e2e_test/nexmark/drop_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
statement ok
DROP SOURCE person CASCADE;

statement ok
DROP SOURCE auction CASCADE;

statement ok
DROP SOURCE bid CASCADE;
58 changes: 58 additions & 0 deletions e2e_test/nexmark/produce_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
control substitution on

system ok
rpk topic delete -r nexmark-* || true

system ok
rpk topic create nexmark-auction -p 4 &&
rpk topic create nexmark-bid -p 4 &&
rpk topic create nexmark-person -p 4

include ./create_tables.slt.part

include ./insert_auction.slt.part
include ./insert_bid.slt.part
include ./insert_person.slt.part

statement ok
flush;

statement ok
create sink nexmark_auction FROM auction
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-auction'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink nexmark_bid FROM bid
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-bid'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink nexmark_person FROM person
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'nexmark-person'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

sleep 5s

statement ok
DROP SINK nexmark_auction;

statement ok
DROP SINK nexmark_bid;

statement ok
DROP SINK nexmark_person;

include ./drop_tables.slt.part
73 changes: 73 additions & 0 deletions e2e_test/source_inline/kafka/issue_19563.slt.serial
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
control substitution on

# Note: rw_fragments is not isolated by schema so we make the test serial.

system ok
rpk topic create test-topic-19563 -p 6

statement ok
CREATE SOURCE kafkasource (
v1 timestamp with time zone
)
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test-topic-19563',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON (
timestamptz.handling.mode = 'utc_without_suffix'
);

# Note that StreamSourceScan is in the StreamDynamicFilter fragment, which has 3 upstream fragments.
query T
explain create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000;
----
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamDynamicFilter { predicate: (v1 <= $expr1), output: [v1, _row_id], cleaned_by_watermark: true }
├─StreamProject { exprs: [v1, _row_id], output_watermarks: [v1] }
│ └─StreamDynamicFilter { predicate: (v1 >= now), output_watermarks: [v1], output: [v1, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], cleaned_by_watermark: true }
│ ├─StreamRowIdGen { row_id_index: 4 }
│ │ └─StreamSourceScan { columns: [v1, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [AddWithTimeZone(now, '730000 days':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└─StreamNow


# The following test is adapted from `temporal_filter.slt`.

# This statement should be correct for the next ~1000 years
# We cannot have a variable interval for now, so we use 2000 year's worth of days as the upper bound.
statement ok
create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000;

query I
select array_length(upstream_fragment_ids) from rw_fragments where array_contains(flags, Array['SOURCE_SCAN']);
----
3

system ok
cat <<EOF | rpk topic produce test-topic-19563
{"v1": "3031-01-01 19:00:00"}
{"v1": "3031-01-01 20:00:00"}
{"v1": "3031-01-01 21:00:00"}
{"v1": "5031-01-01 21:00:00"}
{"v1": "0001-01-01 21:00:00"}
EOF

sleep 3s

# Below lower bound and above upper bound are not shown
query I
select * from mv1 order by v1;
----
3031-01-01 19:00:00+00:00
3031-01-01 20:00:00+00:00
3031-01-01 21:00:00+00:00


statement ok
DROP SOURCE kafkasource CASCADE;

system ok
rpk topic delete test-topic-18308
9 changes: 9 additions & 0 deletions e2e_test/source_inline/kafka/nexmark.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include ../../nexmark/produce_kafka.slt.part
include ../../nexmark/create_sources_kafka.slt.part

control substitution off

include ../../streaming/nexmark/create_views.slt.part
include ../../streaming/nexmark/test_mv_result.slt.part

include ../../nexmark/drop_sources_kafka.slt.part
30 changes: 30 additions & 0 deletions e2e_test/source_inline/kafka/tpch.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
include ../../tpch/produce_kafka.slt.part
include ../../tpch/create_sources_kafka.slt.part

control substitution off

include ../../streaming/tpch/create_views.slt.part
include ../../streaming/tpch/q1.slt.part
include ../../streaming/tpch/q2.slt.part
include ../../streaming/tpch/q3.slt.part
include ../../streaming/tpch/q4.slt.part
include ../../streaming/tpch/q5.slt.part
include ../../streaming/tpch/q6.slt.part
include ../../streaming/tpch/q7.slt.part
include ../../streaming/tpch/q8.slt.part
include ../../streaming/tpch/q9.slt.part
include ../../streaming/tpch/q10.slt.part
include ../../streaming/tpch/q11.slt.part
include ../../streaming/tpch/q12.slt.part
include ../../streaming/tpch/q13.slt.part
include ../../streaming/tpch/q14.slt.part
include ../../streaming/tpch/q15.slt.part
include ../../streaming/tpch/q16.slt.part
include ../../streaming/tpch/q17.slt.part
include ../../streaming/tpch/q18.slt.part
include ../../streaming/tpch/q19.slt.part
include ../../streaming/tpch/q20.slt.part
include ../../streaming/tpch/q21.slt.part
include ../../streaming/tpch/q22.slt.part

include ../../tpch/drop_sources_kafka.slt.part
118 changes: 118 additions & 0 deletions e2e_test/tpch/create_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
control substitution on

statement ok
CREATE SOURCE supplier (
s_suppkey INTEGER,
s_name VARCHAR,
s_address VARCHAR,
s_nationkey INTEGER,
s_phone VARCHAR,
s_acctbal NUMERIC,
s_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-supplier'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE part (
p_partkey INTEGER,
p_name VARCHAR,
p_mfgr VARCHAR,
p_brand VARCHAR,
p_type VARCHAR,
p_size INTEGER,
p_container VARCHAR,
p_retailprice NUMERIC,
p_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-part'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE partsupp (
ps_partkey INTEGER,
ps_suppkey INTEGER,
ps_availqty INTEGER,
ps_supplycost NUMERIC,
ps_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-partsupp'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE customer (
c_custkey INTEGER,
c_name VARCHAR,
c_address VARCHAR,
c_nationkey INTEGER,
c_phone VARCHAR,
c_acctbal NUMERIC,
c_mktsegment VARCHAR,
c_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-customer'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE orders (
o_orderkey BIGINT,
o_custkey INTEGER,
o_orderstatus VARCHAR,
o_totalprice NUMERIC,
o_orderdate DATE,
o_orderpriority VARCHAR,
o_clerk VARCHAR,
o_shippriority INTEGER,
o_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-orders'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE lineitem (
l_orderkey BIGINT,
l_partkey INTEGER,
l_suppkey INTEGER,
l_linenumber INTEGER,
l_quantity NUMERIC,
l_extendedprice NUMERIC,
l_discount NUMERIC,
l_tax NUMERIC,
l_returnflag VARCHAR,
l_linestatus VARCHAR,
l_shipdate DATE,
l_commitdate DATE,
l_receiptdate DATE,
l_shipinstruct VARCHAR,
l_shipmode VARCHAR,
l_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-lineitem'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE nation (
n_nationkey INTEGER,
n_name VARCHAR,
n_regionkey INTEGER,
n_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-nation'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE region (
r_regionkey INTEGER,
r_name VARCHAR,
r_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-region'
) FORMAT PLAIN ENCODE JSON;
23 changes: 23 additions & 0 deletions e2e_test/tpch/drop_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
statement ok
DROP SOURCE supplier CASCADE;

statement ok
DROP SOURCE region CASCADE;

statement ok
DROP SOURCE nation CASCADE;

statement ok
DROP SOURCE lineitem CASCADE;

statement ok
DROP SOURCE orders CASCADE;

statement ok
DROP SOURCE customer CASCADE;

statement ok
DROP SOURCE partsupp CASCADE;

statement ok
DROP SOURCE part CASCADE;
Loading