Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add elasticsearch-sink integration test to the integration test workflow #13612

Merged
merged 17 commits into from
Nov 23, 2023
5 changes: 5 additions & 0 deletions ci/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ steps:
- "cockroach-sink"
- "kafka-cdc-sink"
- "cassandra-and-scylladb-sink"
- "elasticsearch-sink"
format:
- "json"
- "protobuf"
Expand Down Expand Up @@ -164,3 +165,7 @@ steps:
testcase: "cassandra-and-scylladb-sink"
format: "protobuf"
skip: true
- with:
testcase: "elasticsearch-sink"
format: "protobuf"
skip: true
21 changes: 5 additions & 16 deletions integration_tests/elasticsearch-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,21 @@

In this demo, we want to showcase how RisingWave is able to sink data to ElasticSearch.

1. Set the compose profile accordingly:
Demo with elasticsearch 7:
```
export COMPOSE_PROFILES=es7
```

Demo with elasticsearch 8
```
export COMPOSE_PROFILES=es8
```

2. Launch the cluster:
1. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a single-node elasticsearch for sink.

3. Execute the SQL queries in sequence:
2. Execute the SQL queries in sequence:

- create_source.sql
- create_mv.sql
- create_es[7/8]_sink.sql
- create_sink.sql

4. Check the contents in ES:
3. Check the contents in ES:

```sh
# Check the document counts
Expand All @@ -38,4 +27,4 @@ curl -XGET -u elastic:risingwave "http://localhost:9200/test/_search" -H 'Conten

# Get the first 10 documents sort by user_id
curl -XGET -u elastic:risingwave "http://localhost:9200/test/_search?size=10" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}, "sort": ["user_id"]}' | jq
```
```
9 changes: 0 additions & 9 deletions integration_tests/elasticsearch-sink/create_es7_sink.sql

This file was deleted.

9 changes: 0 additions & 9 deletions integration_tests/elasticsearch-sink/create_es8_sink.sql

This file was deleted.

6 changes: 3 additions & 3 deletions integration_tests/elasticsearch-sink/create_mv.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
target_id
-- event_timestamp
FROM
user_behaviors;
user_behaviors;
19 changes: 19 additions & 0 deletions integration_tests/elasticsearch-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE SINK bhv_es7_sink
FROM
bhv_mv WITH (
connector = 'elasticsearch',
index = 'test',
url = 'http://elasticsearch7:9200',
username = 'elastic',
password = 'risingwave'
);

CREATE SINK bhv_es8_sink
FROM
bhv_mv WITH (
connector = 'elasticsearch',
index = 'test',
url = 'http://elasticsearch8:9200',
username = 'elastic',
password = 'risingwave'
);
14 changes: 9 additions & 5 deletions integration_tests/elasticsearch-sink/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@ services:
- xpack.security.enabled=true
- discovery.type=single-node
- ELASTIC_PASSWORD=risingwave
deploy:
resources:
limits:
memory: 1G
ports:
- 9200:9200
profiles:
- es7
elasticsearch8:
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.0
environment:
- xpack.security.enabled=true
- discovery.type=single-node
- ELASTIC_PASSWORD=risingwave
deploy:
resources:
limits:
memory: 1G
ports:
- 9200:9200
profiles:
- es8
- 9201:9200
compactor-0:
extends:
file: ../../docker/docker-compose.yml
Expand Down
1 change: 1 addition & 0 deletions integration_tests/elasticsearch-sink/sink_check
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test
2 changes: 1 addition & 1 deletion integration_tests/scripts/check_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def run_psql(sql):

demo = sys.argv[1]
upstream = sys.argv[2] # mysql, postgres, etc. see scripts/integration_tests.sh
if demo in ['docker', 'iceberg-sink','clickhouse-sink', 'iceberg-cdc', 'kafka-cdc-sink', 'cassandra-and-scylladb-sink']:
if demo in ['docker', 'iceberg-sink','clickhouse-sink', 'iceberg-cdc', 'kafka-cdc-sink', 'cassandra-and-scylladb-sink', 'elasticsearch-sink']:
print('Skip for running test for `%s`' % demo)
sys.exit(0)

Expand Down
42 changes: 42 additions & 0 deletions integration_tests/scripts/run_demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import subprocess
from time import sleep
import argparse
import json


def run_sql_file(f: str, dir: str):
Expand Down Expand Up @@ -231,6 +232,45 @@ def run_cassandra_and_scylladb_sink_demo():
if len(failed_cases) != 0:
raise Exception("Data check failed for case {}".format(failed_cases))

def run_elasticsearch_sink_demo():
demo = "elasticsearch-sink"
file_dir = dirname(abspath(__file__))
project_dir = dirname(file_dir)
demo_dir = os.path.join(project_dir, demo)
print("Running demo: {}".format(demo))

subprocess.run(["docker", "compose", "up", "-d", "--build"], cwd=demo_dir, check=True)
sleep(60)

sql_files = ['create_source.sql', 'create_mv.sql', 'create_sink.sql']
for fname in sql_files:
sql_file = os.path.join(demo_dir, fname)
print("executing sql: ", open(sql_file).read())
run_sql_file(sql_file, demo_dir)

print("sink created. Wait for half min time for ingestion")

# wait for half min ingestion
sleep(30)

versions = ['7', '8']
sink_check_file = os.path.join(demo_dir, 'sink_check')
with open(sink_check_file) as f:
relations = f.read().strip().split(",")
failed_cases = []
for rel in relations:
query = 'curl -XGET -u elastic:risingwave "http://localhost:9200/{}/_count" -H "Content-Type: application/json"'.format(rel)
for v in versions:
es = 'elasticsearch{}'.format(v)
print("Running Query: {} on {}".format(query, es))
counts = subprocess.check_output(["docker", "compose", "exec", es, "bash", "-c", query], cwd=demo_dir)
counts = json.loads(counts)['count']
print("{} counts in {}_{}".format(counts, es, rel))
if counts < 1:
failed_cases.append(es + '_' + rel)
if len(failed_cases) != 0:
raise Exception("Data check failed for case {}".format(failed_cases))

arg_parser = argparse.ArgumentParser(description='Run the demo')
arg_parser.add_argument('--format',
metavar='format',
Expand Down Expand Up @@ -259,5 +299,7 @@ def run_cassandra_and_scylladb_sink_demo():
run_kafka_cdc_demo()
elif args.case == "cassandra-and-scylladb-sink":
run_cassandra_and_scylladb_sink_demo()
elif args.case == "elasticsearch-sink":
run_elasticsearch_sink_demo()
else:
run_demo(args.case, args.format)
Loading