Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add pinot-sink integration test into ci workflow #14257

Merged
merged 6 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/scripts/gen-integration-test-yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
'doris-sink': ['json'],
'starrocks-sink': ['json'],
'deltalake-sink': ['json'],
'pinot-sink': ['json'],
'client-library': ['none'],

}

def gen_pipeline_steps():
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"doris-sink": ["xinhao"],
"starrocks-sink": ["xinhao"],
"deltalake-sink": ["xinhao"],
"pinot-sink": ["yiming"],
"client-library": ["tao"],
}

Expand Down
3 changes: 1 addition & 2 deletions integration_tests/pinot-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ dev=> CREATE SINK orders_sink FROM orders WITH (
connector = 'kafka',
properties.bootstrap.server = 'kafka:9092',
topic = 'orders.upsert.log',
type = 'upsert',
primary_key = 'id'
);
) FORMAT UPSERT ENCODE JSON;
CREATE_SINK
```
4. Create a pinot table named `orders` that ingests data from the kafka topic
Expand Down
11 changes: 0 additions & 11 deletions integration_tests/pinot-sink/create-table.sql

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ create SINK orders_sink FROM orders WITH (
connector = 'kafka',
properties.bootstrap.server = 'kafka:9092',
topic = 'orders.upsert.log',
type = 'upsert',
primary_key = 'id'
);
) FORMAT UPSERT ENCODE JSON;
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
CREATE TABLE IF NOT EXISTS orders
(
id INT PRIMARY KEY,
user_id BIGINT,
product_id BIGINT,
status VARCHAR,
quantity INT,
total FLOAT,
created_at BIGINT,
updated_at BIGINT
);

insert into orders values (1, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
insert into orders values (2, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
insert into orders values (3, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
insert into orders values (3, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
22 changes: 12 additions & 10 deletions integration_tests/pinot-sink/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ services:
file: ../../docker/docker-compose.yml
service: prometheus-0
kafka:
image: confluentinc/cp-kafka:7.1.0
platform: linux/amd64
image: confluentinc/cp-kafka:latest
hostname: kafka
container_name: kafka
ports:
Expand All @@ -40,8 +39,7 @@ services:
[ zookeeper ]
healthcheck: { test: nc -z localhost 9092, interval: 1s, start_period: 120s }
pinot-controller:
image: apachepinot/pinot:0.12.0
platform: linux/amd64
image: apachepinot/pinot:latest
command: "StartController -zkAddress zookeeper:2181"
container_name: "pinot-controller"
volumes:
Expand All @@ -52,8 +50,7 @@ services:
depends_on:
- zookeeper
pinot-broker:
image: apachepinot/pinot:0.12.0
platform: linux/amd64
image: apachepinot/pinot:latest
command: "StartBroker -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-broker"
Expand All @@ -62,23 +59,28 @@ services:
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:0.12.0
platform: linux/amd64
image: apachepinot/pinot:latest
container_name: "pinot-server"
command: "StartServer -zkAddress zookeeper:2181"
restart: unless-stopped
depends_on:
- pinot-broker
zookeeper:
image: confluentinc/cp-zookeeper:7.1.0
platform: linux/amd64
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
postgres:
image: postgres:latest
command: tail -f /dev/null
volumes:
- "./update.sql:/update.sql"
restart: on-failure
container_name: postgres
volumes:
risingwave-standalone:
external: false
Expand Down
12 changes: 12 additions & 0 deletions integration_tests/pinot-sink/prepare.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

set -euo pipefail

# setup kafka
docker compose exec kafka \
kafka-topics --create --topic orders.upsert.log --bootstrap-server localhost:9092

# setup pinot
docker exec -it pinot-controller /opt/pinot/bin/pinot-admin.sh AddTable \
-tableConfigFile /config/orders_table.json \
-schemaFile /config/orders_schema.json -exec
31 changes: 31 additions & 0 deletions integration_tests/pinot-sink/sink_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import subprocess
import sys
import json

relations = ["orders"]

failed_cases = []
for rel in relations:
sql = f'SELECT COUNT(*) as count FROM {rel}'
print(f"Running SQL: {sql} on Pinot")
command = f'{{"sql":"{sql}"}}'
rows = subprocess.check_output(["docker", "compose", "exec", "pinot-broker", "curl", "-H", "Content-Type: application/json", "-X", "POST", "-d", command, "http://localhost:8099/query/sql"])
rows = json.loads(rows.decode('utf-8'))['resultTable']['rows'][0][0]
print(rows)
print(f"{rows} rows in {rel}")
if rows < 1:
failed_cases.append(rel)

# update data
subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update.sql"])

sql = f'SELECT status FROM orders WHERE id = 1'
command = f'{{"sql":"{sql}"}}'
output = subprocess.check_output(["docker", "compose", "exec", "pinot-broker", "curl", "-H", "Content-Type: application/json", "-X", "POST", "-d", command, "http://localhost:8099/query/sql"])
output = json.loads(output.decode('utf-8'))['resultTable']['rows'][0][0]
if output != "PROCESSING":
failed_cases.append(f"expected PROCESSING, get {output}")
Comment on lines +19 to +27
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the case for updating the value included in the integration?

yes.


if len(failed_cases) != 0:
print(f"Data check failed for case {failed_cases}")
sys.exit(1)
3 changes: 2 additions & 1 deletion integration_tests/pinot-sink/update.sql
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
update orders set status = 'PROCESSING' where id = 1;
update orders set status = 'PROCESSING' where id = 1;
FLUSH;
Loading