diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py index 5822d8e2f4946..f2cf770be8ece 100644 --- a/ci/scripts/gen-integration-test-yaml.py +++ b/ci/scripts/gen-integration-test-yaml.py @@ -36,7 +36,9 @@ 'doris-sink': ['json'], 'starrocks-sink': ['json'], 'deltalake-sink': ['json'], + 'pinot-sink': ['json'], 'client-library': ['none'], + } def gen_pipeline_steps(): diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py index dc5d062a1efe8..818dfce72143a 100755 --- a/ci/scripts/notify.py +++ b/ci/scripts/notify.py @@ -59,6 +59,7 @@ "doris-sink": ["xinhao"], "starrocks-sink": ["xinhao"], "deltalake-sink": ["xinhao"], + "pinot-sink": ["yiming"], "client-library": ["tao"], } diff --git a/integration_tests/pinot-sink/README.md b/integration_tests/pinot-sink/README.md index bc1a38091f2a6..edf72fda94f2b 100644 --- a/integration_tests/pinot-sink/README.md +++ b/integration_tests/pinot-sink/README.md @@ -32,9 +32,8 @@ dev=> CREATE SINK orders_sink FROM orders WITH ( connector = 'kafka', properties.bootstrap.server = 'kafka:9092', topic = 'orders.upsert.log', - type = 'upsert', primary_key = 'id' -); +) FORMAT UPSERT ENCODE JSON; CREATE_SINK ``` 4. Create a pinot table named `orders` that ingests data from the kafka topic diff --git a/integration_tests/pinot-sink/create-table.sql b/integration_tests/pinot-sink/create-table.sql deleted file mode 100644 index 2d3bcaa000b6b..0000000000000 --- a/integration_tests/pinot-sink/create-table.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE IF NOT EXISTS orders -( - id INT PRIMARY KEY, - user_id BIGINT, - product_id BIGINT, - status VARCHAR, - quantity INT, - total FLOAT, - created_at BIGINT, - updated_at BIGINT -); \ No newline at end of file diff --git a/integration_tests/pinot-sink/create-sink.sql b/integration_tests/pinot-sink/create_sink.sql similarity index 85% rename from integration_tests/pinot-sink/create-sink.sql rename to integration_tests/pinot-sink/create_sink.sql index 966a1114724a9..0b3e964d25f98 100644 --- a/integration_tests/pinot-sink/create-sink.sql +++ b/integration_tests/pinot-sink/create_sink.sql @@ -2,6 +2,5 @@ create SINK orders_sink FROM orders WITH ( connector = 'kafka', properties.bootstrap.server = 'kafka:9092', topic = 'orders.upsert.log', - type = 'upsert', primary_key = 'id' -); \ No newline at end of file +) FORMAT UPSERT ENCODE JSON; diff --git a/integration_tests/pinot-sink/insert.sql b/integration_tests/pinot-sink/create_source.sql similarity index 50% rename from integration_tests/pinot-sink/insert.sql rename to integration_tests/pinot-sink/create_source.sql index 3627cef256d48..c8050754cca6e 100644 --- a/integration_tests/pinot-sink/insert.sql +++ b/integration_tests/pinot-sink/create_source.sql @@ -1,3 +1,15 @@ +CREATE TABLE IF NOT EXISTS orders +( + id INT PRIMARY KEY, + user_id BIGINT, + product_id BIGINT, + status VARCHAR, + quantity INT, + total FLOAT, + created_at BIGINT, + updated_at BIGINT +); + insert into orders values (1, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000); insert into orders values (2, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000); -insert into orders values (3, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000); \ No newline at end of file +insert into orders values (3, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000); diff --git a/integration_tests/pinot-sink/docker-compose.yml b/integration_tests/pinot-sink/docker-compose.yml index 3faf769de9e67..222c1d7e39735 100644 --- a/integration_tests/pinot-sink/docker-compose.yml +++ b/integration_tests/pinot-sink/docker-compose.yml @@ -22,8 +22,7 @@ services: file: ../../docker/docker-compose.yml service: prometheus-0 kafka: - image: confluentinc/cp-kafka:7.1.0 - platform: linux/amd64 + image: confluentinc/cp-kafka:latest hostname: kafka container_name: kafka ports: @@ -40,8 +39,7 @@ services: [ zookeeper ] healthcheck: { test: nc -z localhost 9092, interval: 1s, start_period: 120s } pinot-controller: - image: apachepinot/pinot:0.12.0 - platform: linux/amd64 + image: apachepinot/pinot:latest command: "StartController -zkAddress zookeeper:2181" container_name: "pinot-controller" volumes: @@ -52,8 +50,7 @@ services: depends_on: - zookeeper pinot-broker: - image: apachepinot/pinot:0.12.0 - platform: linux/amd64 + image: apachepinot/pinot:latest command: "StartBroker -zkAddress zookeeper:2181" restart: unless-stopped container_name: "pinot-broker" @@ -62,16 +59,14 @@ services: depends_on: - pinot-controller pinot-server: - image: apachepinot/pinot:0.12.0 - platform: linux/amd64 + image: apachepinot/pinot:latest container_name: "pinot-server" command: "StartServer -zkAddress zookeeper:2181" restart: unless-stopped depends_on: - pinot-broker zookeeper: - image: confluentinc/cp-zookeeper:7.1.0 - platform: linux/amd64 + image: confluentinc/cp-zookeeper:latest hostname: zookeeper container_name: zookeeper ports: @@ -79,6 +74,13 @@ services: environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 + postgres: + image: postgres:latest + command: tail -f /dev/null + volumes: + - "./update.sql:/update.sql" + restart: on-failure + container_name: postgres volumes: risingwave-standalone: external: false diff --git a/integration_tests/pinot-sink/prepare.sh b/integration_tests/pinot-sink/prepare.sh new file mode 100755 index 0000000000000..6e4ec06611985 --- /dev/null +++ b/integration_tests/pinot-sink/prepare.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -euo pipefail + +# setup kafka +docker compose exec kafka \ +kafka-topics --create --topic orders.upsert.log --bootstrap-server localhost:9092 + +# setup pinot +docker exec -it pinot-controller /opt/pinot/bin/pinot-admin.sh AddTable \ +-tableConfigFile /config/orders_table.json \ +-schemaFile /config/orders_schema.json -exec diff --git a/integration_tests/pinot-sink/sink_check.py b/integration_tests/pinot-sink/sink_check.py new file mode 100644 index 0000000000000..9ee4243b1754a --- /dev/null +++ b/integration_tests/pinot-sink/sink_check.py @@ -0,0 +1,31 @@ +import subprocess +import sys +import json + +relations = ["orders"] + +failed_cases = [] +for rel in relations: + sql = f'SELECT COUNT(*) as count FROM {rel}' + print(f"Running SQL: {sql} on Pinot") + command = f'{{"sql":"{sql}"}}' + rows = subprocess.check_output(["docker", "compose", "exec", "pinot-broker", "curl", "-H", "Content-Type: application/json", "-X", "POST", "-d", command, "http://localhost:8099/query/sql"]) + rows = json.loads(rows.decode('utf-8'))['resultTable']['rows'][0][0] + print(rows) + print(f"{rows} rows in {rel}") + if rows < 1: + failed_cases.append(rel) + +# update data +subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update.sql"]) + +sql = f'SELECT status FROM orders WHERE id = 1' +command = f'{{"sql":"{sql}"}}' +output = subprocess.check_output(["docker", "compose", "exec", "pinot-broker", "curl", "-H", "Content-Type: application/json", "-X", "POST", "-d", command, "http://localhost:8099/query/sql"]) +output = json.loads(output.decode('utf-8'))['resultTable']['rows'][0][0] +if output != "PROCESSING": + failed_cases.append(f"expected PROCESSING, get {output}") + +if len(failed_cases) != 0: + print(f"Data check failed for case {failed_cases}") + sys.exit(1) diff --git a/integration_tests/pinot-sink/update.sql b/integration_tests/pinot-sink/update.sql index e01810edf4f94..b8e7d64ffe019 100644 --- a/integration_tests/pinot-sink/update.sql +++ b/integration_tests/pinot-sink/update.sql @@ -1 +1,2 @@ -update orders set status = 'PROCESSING' where id = 1; \ No newline at end of file +update orders set status = 'PROCESSING' where id = 1; +FLUSH;