Skip to content

Commit

Permalink
test: add pinot-sink integration test into ci workflow (#14257)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuefengze authored and yezizp2012 committed Jan 2, 2024
1 parent 547395b commit 1a1b0e5
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 27 deletions.
2 changes: 2 additions & 0 deletions ci/scripts/gen-integration-test-yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
'doris-sink': ['json'],
'starrocks-sink': ['json'],
'deltalake-sink': ['json'],
'pinot-sink': ['json'],
'client-library': ['none'],

}

def gen_pipeline_steps():
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"doris-sink": ["xinhao"],
"starrocks-sink": ["xinhao"],
"deltalake-sink": ["xinhao"],
"pinot-sink": ["yiming"],
"client-library": ["tao"],
}

Expand Down
3 changes: 1 addition & 2 deletions integration_tests/pinot-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ dev=> CREATE SINK orders_sink FROM orders WITH (
connector = 'kafka',
properties.bootstrap.server = 'kafka:9092',
topic = 'orders.upsert.log',
type = 'upsert',
primary_key = 'id'
);
) FORMAT UPSERT ENCODE JSON;
CREATE_SINK
```
4. Create a pinot table named `orders` that ingests data from the kafka topic
Expand Down
11 changes: 0 additions & 11 deletions integration_tests/pinot-sink/create-table.sql

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ create SINK orders_sink FROM orders WITH (
connector = 'kafka',
properties.bootstrap.server = 'kafka:9092',
topic = 'orders.upsert.log',
type = 'upsert',
primary_key = 'id'
);
) FORMAT UPSERT ENCODE JSON;
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
CREATE TABLE IF NOT EXISTS orders
(
id INT PRIMARY KEY,
user_id BIGINT,
product_id BIGINT,
status VARCHAR,
quantity INT,
total FLOAT,
created_at BIGINT,
updated_at BIGINT
);

insert into orders values (1, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
insert into orders values (2, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
insert into orders values (3, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
insert into orders values (3, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
22 changes: 12 additions & 10 deletions integration_tests/pinot-sink/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ services:
file: ../../docker/docker-compose.yml
service: prometheus-0
kafka:
image: confluentinc/cp-kafka:7.1.0
platform: linux/amd64
image: confluentinc/cp-kafka:latest
hostname: kafka
container_name: kafka
ports:
Expand All @@ -40,8 +39,7 @@ services:
[ zookeeper ]
healthcheck: { test: nc -z localhost 9092, interval: 1s, start_period: 120s }
pinot-controller:
image: apachepinot/pinot:0.12.0
platform: linux/amd64
image: apachepinot/pinot:latest
command: "StartController -zkAddress zookeeper:2181"
container_name: "pinot-controller"
volumes:
Expand All @@ -52,8 +50,7 @@ services:
depends_on:
- zookeeper
pinot-broker:
image: apachepinot/pinot:0.12.0
platform: linux/amd64
image: apachepinot/pinot:latest
command: "StartBroker -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-broker"
Expand All @@ -62,23 +59,28 @@ services:
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:0.12.0
platform: linux/amd64
image: apachepinot/pinot:latest
container_name: "pinot-server"
command: "StartServer -zkAddress zookeeper:2181"
restart: unless-stopped
depends_on:
- pinot-broker
zookeeper:
image: confluentinc/cp-zookeeper:7.1.0
platform: linux/amd64
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
postgres:
image: postgres:latest
command: tail -f /dev/null
volumes:
- "./update.sql:/update.sql"
restart: on-failure
container_name: postgres
volumes:
risingwave-standalone:
external: false
Expand Down
12 changes: 12 additions & 0 deletions integration_tests/pinot-sink/prepare.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

set -euo pipefail

# setup kafka
docker compose exec kafka \
kafka-topics --create --topic orders.upsert.log --bootstrap-server localhost:9092

# setup pinot
docker exec -it pinot-controller /opt/pinot/bin/pinot-admin.sh AddTable \
-tableConfigFile /config/orders_table.json \
-schemaFile /config/orders_schema.json -exec
31 changes: 31 additions & 0 deletions integration_tests/pinot-sink/sink_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import subprocess
import sys
import json

relations = ["orders"]

failed_cases = []
for rel in relations:
sql = f'SELECT COUNT(*) as count FROM {rel}'
print(f"Running SQL: {sql} on Pinot")
command = f'{{"sql":"{sql}"}}'
rows = subprocess.check_output(["docker", "compose", "exec", "pinot-broker", "curl", "-H", "Content-Type: application/json", "-X", "POST", "-d", command, "http://localhost:8099/query/sql"])
rows = json.loads(rows.decode('utf-8'))['resultTable']['rows'][0][0]
print(rows)
print(f"{rows} rows in {rel}")
if rows < 1:
failed_cases.append(rel)

# update data
subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update.sql"])

sql = f'SELECT status FROM orders WHERE id = 1'
command = f'{{"sql":"{sql}"}}'
output = subprocess.check_output(["docker", "compose", "exec", "pinot-broker", "curl", "-H", "Content-Type: application/json", "-X", "POST", "-d", command, "http://localhost:8099/query/sql"])
output = json.loads(output.decode('utf-8'))['resultTable']['rows'][0][0]
if output != "PROCESSING":
failed_cases.append(f"expected PROCESSING, get {output}")

if len(failed_cases) != 0:
print(f"Data check failed for case {failed_cases}")
sys.exit(1)
3 changes: 2 additions & 1 deletion integration_tests/pinot-sink/update.sql
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
update orders set status = 'PROCESSING' where id = 1;
update orders set status = 'PROCESSING' where id = 1;
FLUSH;

0 comments on commit 1a1b0e5

Please sign in to comment.