diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py index 61fd7721acc3f..3d1dec7f66914 100644 --- a/ci/scripts/gen-integration-test-yaml.py +++ b/ci/scripts/gen-integration-test-yaml.py @@ -44,6 +44,7 @@ 'presto-trino': ['json'], 'client-library': ['none'], 'kafka-cdc': ['json'], + 'pubsub': ['json'], } def gen_pipeline_steps(): diff --git a/integration_tests/prometheus/data_check b/integration_tests/prometheus/data_check index 6a39c46f26f9c..609e974d6c230 100644 --- a/integration_tests/prometheus/data_check +++ b/integration_tests/prometheus/data_check @@ -1 +1 @@ -prometheus,metric_avg_30s \ No newline at end of file +prometheus,metric_avg_30s diff --git a/integration_tests/pubsub/create_sink.sql b/integration_tests/pubsub/create_sink.sql new file mode 100644 index 0000000000000..82016040c3cbf --- /dev/null +++ b/integration_tests/pubsub/create_sink.sql @@ -0,0 +1,14 @@ +CREATE SINK pubsub_sink +FROM + personnel +WITH +( + connector = 'google_pubsub', + pubsub.endpoint = 'pubsub-emulator:8900', + pubsub.emulator_host = 'pubsub-emulator:8900', + pubsub.project_id = 'demo', + pubsub.topic = 'test', +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true', +); + diff --git a/integration_tests/pubsub/create_source.sql b/integration_tests/pubsub/create_source.sql new file mode 100644 index 0000000000000..5856abfabbf47 --- /dev/null +++ b/integration_tests/pubsub/create_source.sql @@ -0,0 +1 @@ +CREATE TABLE IF NOT EXISTS personnel (id integer, name varchar); diff --git a/integration_tests/pubsub/data_check b/integration_tests/pubsub/data_check new file mode 100644 index 0000000000000..0528e08f7601a --- /dev/null +++ b/integration_tests/pubsub/data_check @@ -0,0 +1 @@ +personnel diff --git a/integration_tests/pubsub/docker-compose.yml b/integration_tests/pubsub/docker-compose.yml new file mode 100644 index 0000000000000..e04def6dd88cd --- /dev/null +++ b/integration_tests/pubsub/docker-compose.yml @@ -0,0 +1,47 @@ +--- +version: "3" +services: + pubsub-emulator: + image: google/cloud-sdk:latest + command: > + gcloud beta emulators pubsub start --project=demo --host-port=0.0.0.0:8900 + ports: + - "8900:8900" + risingwave-standalone: + extends: + file: ../../docker/docker-compose.yml + service: risingwave-standalone + postgres-0: + extends: + file: ../../docker/docker-compose.yml + service: postgres-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + message_queue: + extends: + file: ../../docker/docker-compose.yml + service: message_queue +volumes: + risingwave-standalone: + external: false + postgres-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose diff --git a/integration_tests/pubsub/prepare.sh b/integration_tests/pubsub/prepare.sh new file mode 100644 index 0000000000000..eabf514e151ca --- /dev/null +++ b/integration_tests/pubsub/prepare.sh @@ -0,0 +1,9 @@ +export PUBSUB_EMULATOR_HOST=localhost:8900 + +curl -X PUT http://localhost:8900/v1/projects/demo/topics/test +curl -X PUT http://localhost:8900/v1/projects/demo/subscriptions/sub \ + -H 'content-type: application/json' \ + --data '{"topic":"projects/demo/topics/test"}' + +curl -X GET http://localhost:8900/v1/projects/demo/topics +curl -X GET http://localhost:8900/v1/projects/demo/subscriptions diff --git a/integration_tests/pubsub/query.sql b/integration_tests/pubsub/query.sql new file mode 100644 index 0000000000000..124ce9133634e --- /dev/null +++ b/integration_tests/pubsub/query.sql @@ -0,0 +1,13 @@ +INSERT INTO + personnel +VALUES + (1, 'Alice'), + (2, 'Bob'), + (3, 'Tom'), + (4, 'Jerry'), + (5, 'Araminta'), + (6, 'Clover'), + (7, 'Posey'), + (8, 'Waverly'); + +FLUSH; diff --git a/integration_tests/pubsub/sink_check.py b/integration_tests/pubsub/sink_check.py new file mode 100644 index 0000000000000..3957d99817542 --- /dev/null +++ b/integration_tests/pubsub/sink_check.py @@ -0,0 +1,26 @@ +import sys +import subprocess +import json + +curl = """ +PUBSUB_EMULATOR_HOST=localhost:8900 curl -s -X POST 'http://localhost:8900/v1/projects/demo/subscriptions/sub:pull' \ + -H 'content-type: application/json' \ + --data '{"maxMessages":10}' +""" + +output = subprocess.Popen( + ["bash", "-c", curl,], + stdout=subprocess.PIPE, +) +msgs = json.loads(output.stdout.read()) +print(msgs) + +msg_count = len(msgs["receivedMessages"]) +print("msg count", msg_count) + +output.stdout.close() +output.wait() + +if msg_count != 8: + print("Data check failed") + sys.exit(1)