Skip to content

Commit

Permalink
refactor(test): move some tests from source_legacy to source_inline
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Oct 15, 2024
1 parent 4cf3a48 commit 194940b
Show file tree
Hide file tree
Showing 24 changed files with 63 additions and 78 deletions.
4 changes: 2 additions & 2 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ risedev ci-kill
echo "--- e2e, ci-1cn-1fe, nexmark endless"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-1cn-1fe
sqllogictest -p 4566 -d dev './e2e_test/source_legacy/nexmark_endless_mvs/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/source_legacy/nexmark_endless_sinks/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/nexmark_endless_mvs/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/nexmark_endless_sinks/*.slt'

echo "--- Kill cluster"
risedev ci-kill
3 changes: 0 additions & 3 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
export SQLCMDSERVER=sqlserver-server SQLCMDUSER=SA SQLCMDPASSWORD="SomeTestOnly@SA" SQLCMDDBNAME=mydb SQLCMDPORT=1433
risedev slt './e2e_test/source_legacy/cdc_inline/**/*.slt'

echo "--- opendal source test"
risedev slt './e2e_test/source_legacy/opendal/**/*.slt'

echo "--- mysql & postgres cdc validate test"
risedev slt './e2e_test/source_legacy/cdc/cdc.validate.mysql.slt'
risedev slt './e2e_test/source_legacy/cdc/cdc.validate.postgres.slt'
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE TABLE diamonds (
) WITH (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source_legacy/opendal/data',
posix_fs.root = 'e2e_test/source_inline/fs/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 10s
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
control substitution on

system ok
rpk topic delete json_timestamptz_handling_mode || true

system ok
rpk topic create json_timestamptz_handling_mode -p 1

system ok
cat <<EOF | rpk topic produce json_timestamptz_handling_mode -f "%v\n"
{"case":"0 number small","payload":{"after":{"case":"0 number small","at":100},"op":"r"}}
{"case":"1 number recent","payload":{"after":{"case":"1 number recent","at":1712800800123456},"op":"r"}}
{"case":"2 string utc","payload":{"after":{"case":"2 string utc","at":"2024-04-11T02:00:00.654321Z"},"op":"r"}}
{"case":"3 string naive","payload":{"after":{"case":"3 string naive","at":"2024-04-11 02:00:00.234321"},"op":"r"}}
EOF

statement error unrecognized
create table t (
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'mili');

Expand All @@ -13,8 +28,7 @@ create table plain_guess (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mod = 'mili');

Expand All @@ -23,8 +37,7 @@ create table plain_milli (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'milli');

Expand All @@ -33,8 +46,7 @@ create table plain_micro (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'micro');

Expand All @@ -43,8 +55,7 @@ create table plain_utc (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_string');

Expand All @@ -53,17 +64,15 @@ create table plain_naive (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_without_suffix');

statement ok
create table debezium_milli (
"case" varchar, at timestamptz, primary key("case"))
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format debezium encode json (timestamptz.handling.mode = 'milli');

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
control substitution on

system ok
rpk topic delete test_temporary_kafka_batch || true

system ok
rpk topic create test_temporary_kafka_batch -p 1

system ok
cat <<EOF | rpk topic produce test_temporary_kafka_batch -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create temporary source s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_temporary_kafka_batch',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
control substitution on

system ok
rpk topic delete test_ttl_table_with_con || true

system ok
rpk topic create test_ttl_table_with_con -p 1

system ok
cat <<EOF | rpk topic produce test_ttl_table_with_con -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create table t (v1 int, v2 varchar) APPEND ONLY with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_ttl_table_with_con',
scan.startup.mode = 'earliest',
retention_seconds = 5
) FORMAT PLAIN ENCODE JSON;
Expand Down

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion e2e_test/source_legacy/basic/scripts/test_data/weiling.1

This file was deleted.

2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl BackfillStage {
vis
}

/// Updates backfill states and returns whether the row from upstream `SourceExecutor` is visible.
/// Updates backfill states and returns whether the row backfilled from external system is visible.
fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
let state = self.states.get_mut(split_id).unwrap();
match state {
Expand Down

0 comments on commit 194940b

Please sign in to comment.