Skip to content

Commit

Permalink
refactor: move pubsub test to inline style (#16683)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored May 13, 2024
1 parent 5ea0b67 commit 02d5caa
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 97 deletions.
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[workspace]
members = [
"scripts/source/prepare_ci_pubsub",
"src/batch",
"src/bench",
"src/cmd",
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export CARGO_INCREMENTAL=0
export CARGO_MAKE_PRINT_TIME_SUMMARY=true
export MINIO_DOWNLOAD_BIN=https://rw-ci-deps-dist.s3.amazonaws.com/minio
export MCLI_DOWNLOAD_BIN=https://rw-ci-deps-dist.s3.amazonaws.com/mc
export GCLOUD_DOWNLOAD_TGZ=https://rw-ci-deps-dist.s3.amazonaws.com/google-cloud-cli-406.0.0-linux-x86_64.tar.gz
export GCLOUD_DOWNLOAD_TGZ=https://rw-ci-deps-dist.s3.amazonaws.com/google-cloud-cli-475.0.0-linux-x86_64.tar.gz
export NEXTEST_HIDE_PROGRESS_BAR=true
unset LANG
if [ -n "${BUILDKITE_COMMIT:-}" ]; then
Expand Down
3 changes: 1 addition & 2 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,8 @@ risedev ci-kill

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-pubsub-kafka
risedev ci-start ci-kafka
./scripts/source/prepare_ci_kafka.sh
cargo run --bin prepare_ci_pubsub
risedev slt './e2e_test/source/basic/*.slt'
risedev slt './e2e_test/source/basic/old_row_format_syntax/*.slt'
risedev slt './e2e_test/source/basic/alter/kafka.slt'
Expand Down
55 changes: 40 additions & 15 deletions scripts/source/prepare_ci_pubsub/src/main.rs → ...test/source_inline/pubsub/prepare-data.rs
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
#!/usr/bin/env -S cargo -Zscript
```cargo
[dependencies]
anyhow = "1"
google-cloud-googleapis = { version = "0.12", features = ["pubsub"] }
google-cloud-pubsub = "0.24"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
"sync",
"macros",
"time",
"signal",
"fs",
] }
```

use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::client::Client;
Expand Down Expand Up @@ -38,21 +51,11 @@ async fn main() -> anyhow::Result<()> {
.await?;
}

let path = std::env::current_exe()?
.parent()
.and_then(|p| p.parent())
.and_then(|p| p.parent())
.unwrap()
.join("scripts/source/test_data/pubsub_1_test_topic.1");

let file = File::open(path)?;
let file = BufReader::new(file);

let publisher = topic.new_publisher(Default::default());
for line in file.lines().map_while(Result::ok) {
for line in DATA.lines() {
let a = publisher
.publish(PubsubMessage {
data: line.clone().into_bytes(),
data: line.to_string().into_bytes(),
..Default::default()
})
.await;
Expand All @@ -62,3 +65,25 @@ async fn main() -> anyhow::Result<()> {

Ok(())
}

const DATA: &str = r#"{"v1":1,"v2":"name0"}
{"v1":2,"v2":"name0"}
{"v1":6,"v2":"name3"}
{"v1":0,"v2":"name5"}
{"v1":5,"v2":"name8"}
{"v1":6,"v2":"name4"}
{"v1":8,"v2":"name9"}
{"v1":9,"v2":"name2"}
{"v1":4,"v2":"name6"}
{"v1":5,"v2":"name3"}
{"v1":8,"v2":"name8"}
{"v1":9,"v2":"name2"}
{"v1":2,"v2":"name3"}
{"v1":4,"v2":"name7"}
{"v1":7,"v2":"name0"}
{"v1":0,"v2":"name9"}
{"v1":3,"v2":"name2"}
{"v1":7,"v2":"name5"}
{"v1":1,"v2":"name7"}
{"v1":3,"v2":"name9"}
"#;
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
control substitution on

system ok
e2e_test/source_inline/pubsub/prepare-data.rs

# fail with invalid emulator_host
statement error
statement error failed to lookup address information
CREATE TABLE s1 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'invalid_host:5981'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE s1 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON},
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

statement ok
Expand All @@ -18,29 +23,25 @@ SELECT * FROM s1;
statement ok
DROP TABLE s1;

# fail with invalid subscription
statement error
statement error subscription test-subscription-not-exist does not exist
CREATE TABLE s2 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-not-2',
pubsub.emulator_host = 'localhost:5980'
${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON},
pubsub.subscription = 'test-subscription-not-exist',
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE s2 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON},
pubsub.subscription = 'test-subscription-2',
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

# fail if both start_offset and start_snapshot are provided
statement error
statement error specify at most one of start_offset or start_snapshot
CREATE TABLE s3 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON},
pubsub.subscription = 'test-subscription-3',
pubsub.emulator_host = 'localhost:5980',
pubsub.start_offset = "121212",
pubsub.start_snapshot = "snapshot-that-doesnt-exist"
pubsub.start_offset.nanos = '121212',
pubsub.start_snapshot = 'snapshot-that-doesnt-exist'
) FORMAT PLAIN ENCODE JSON;

# wait for source
Expand Down
4 changes: 1 addition & 3 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ profile:
health-check-port: 6788
- use: compactor

ci-pubsub-kafka:
ci-kafka:
config-path: src/config/ci.toml
steps:
- use: minio
Expand All @@ -805,8 +805,6 @@ profile:
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: pubsub
persist-data: true
- use: kafka
user-managed: true
address: message_queue
Expand Down
28 changes: 0 additions & 28 deletions scripts/source/prepare_ci_pubsub/Cargo.toml

This file was deleted.

20 changes: 0 additions & 20 deletions scripts/source/test_data/pubsub_1_test_topic.1

This file was deleted.

6 changes: 4 additions & 2 deletions src/risedevtool/gcloud-pubsub.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ extend = "common.toml"

[env]
GCLOUD_DOWNLOAD_PATH = "${PREFIX_TMP}/gcloud.tgz"
GCLOUD_DOWNLOAD_TGZ = { value = "https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-406.0.0-linux-x86_64.tar.gz", condition = { env_not_set = [ "GCLOUD_DOWNLOAD_TGZ" ] } }
GCLOUD_DOWNLOAD_TGZ = { value = "https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-475.0.0-linux-x86_64.tar.gz", condition = { env_not_set = [
"GCLOUD_DOWNLOAD_TGZ",
] } }
GCLOUD_SDK_DIR = "google-cloud-sdk"

[tasks.download-pubsub]
private = true
category = "RiseDev - Components"
dependencies = ["prepare"]
condition = { env_set = [ "ENABLE_PUBSUB" ] }
condition = { env_set = ["ENABLE_PUBSUB"] }
description = "Download and enable Google Pubsub Emulator"
script = '''
#!/usr/bin/env bash
Expand Down
5 changes: 5 additions & 0 deletions src/risedevtool/src/risedev_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ pub fn generate_risedev_env(services: &Vec<ServiceConfig>) -> String {
// It's expected to create another dedicated user for the source.
writeln!(env, r#"RISEDEV_MYSQL_WITH_OPTIONS_COMMON="connector='mysql-cdc',hostname='{host}',port='{port}'""#,).unwrap();
}
ServiceConfig::Pubsub(c) => {
let address = &c.address;
let port = &c.port;
writeln!(env, r#"RISEDEV_PUBSUB_WITH_OPTIONS_COMMON="connector='google_pubsub',pubsub.emulator_host='{address}:{port}'""#,).unwrap();
}
_ => {}
}
}
Expand Down

0 comments on commit 02d5caa

Please sign in to comment.