Skip to content

Commit

Permalink
test: add redis-sink test into the integration test workflow (#13661)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuefengze authored Nov 27, 2023
1 parent b02ac4c commit 5cac7a8
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 9 deletions.
5 changes: 5 additions & 0 deletions ci/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ steps:
- "kafka-cdc-sink"
- "cassandra-and-scylladb-sink"
- "elasticsearch-sink"
- "redis-sink"
format:
- "json"
- "protobuf"
Expand Down Expand Up @@ -169,3 +170,7 @@ steps:
testcase: "elasticsearch-sink"
format: "protobuf"
skip: true
- with:
testcase: "redis-sink"
format: "protobuf"
skip: true
6 changes: 1 addition & 5 deletions integration_tests/redis-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ The cluster contains a RisingWave cluster and its necessary dependencies, a data
3. Execute a simple query:

```sh
docker compose exec redis redis-ctl keys *
docker compose exec redis redis-cli keys '*'

```
We also can use 'get' to query value

```sql
select user_id, count(*) from default.demo_test group by user_id
```
6 changes: 3 additions & 3 deletions integration_tests/redis-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ FROM
bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
redis.url= 'redis://redis:6379/',
)FORMAT PLAIN ENCODE JSON(force_append_only='true');

CREATE SINK bhv_redis_sink_2
FROM
bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}');
redis.url= 'redis://redis:6379/',
)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}');
2 changes: 2 additions & 0 deletions integration_tests/redis-sink/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ services:
image: 'redis:latest'
expose:
- 6379
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
Expand Down
1 change: 1 addition & 0 deletions integration_tests/redis-sink/sink_check
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
user_id,UserID
2 changes: 1 addition & 1 deletion integration_tests/scripts/check_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def run_psql(sql):

demo = sys.argv[1]
upstream = sys.argv[2] # mysql, postgres, etc. see scripts/integration_tests.sh
if demo in ['docker', 'iceberg-sink','clickhouse-sink', 'iceberg-cdc', 'kafka-cdc-sink', 'cassandra-and-scylladb-sink', 'elasticsearch-sink']:
if demo in ['docker', 'iceberg-sink','clickhouse-sink', 'iceberg-cdc', 'kafka-cdc-sink', 'cassandra-and-scylladb-sink', 'elasticsearch-sink', 'redis-sink']:
print('Skip for running test for `%s`' % demo)
sys.exit(0)

Expand Down
37 changes: 37 additions & 0 deletions integration_tests/scripts/run_demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,41 @@ def run_elasticsearch_sink_demo():
if len(failed_cases) != 0:
raise Exception("Data check failed for case {}".format(failed_cases))

def run_redis_demo():
demo = "redis-sink"
file_dir = dirname(abspath(__file__))
project_dir = dirname(file_dir)
demo_dir = os.path.join(project_dir, demo)
print("Running demo: {}".format(demo))

subprocess.run(["docker", "compose", "up", "-d", "--build"], cwd=demo_dir, check=True)
sleep(40)

sql_files = ['create_source.sql', 'create_mv.sql', 'create_sink.sql']
for fname in sql_files:
sql_file = os.path.join(demo_dir, fname)
print("executing sql: ", open(sql_file).read())
run_sql_file(sql_file, demo_dir)

sleep(40)
sink_check_file = os.path.join(demo_dir, 'sink_check')
with open(sink_check_file) as f:
relations = f.read().strip().split(",")
failed_cases = []
for rel in relations:
query = "*{}*".format(rel)
print("Running query: scan on Redis".format(query))
output = subprocess.Popen(["docker", "compose", "exec", "redis", "redis-cli", "--scan", "--pattern", query], cwd=demo_dir, stdout=subprocess.PIPE)
rows = subprocess.check_output(["wc", "-l"], cwd=demo_dir, stdin=output.stdout)
output.stdout.close()
output.wait()
rows = int(rows.decode('utf8').strip())
print("{} keys in '*{}*'".format(rows, rel))
if rows < 1:
failed_cases.append(rel)
if len(failed_cases) != 0:
raise Exception("Data check failed for case {}".format(failed_cases))

arg_parser = argparse.ArgumentParser(description='Run the demo')
arg_parser.add_argument('--format',
metavar='format',
Expand Down Expand Up @@ -301,5 +336,7 @@ def run_elasticsearch_sink_demo():
run_cassandra_and_scylladb_sink_demo()
elif args.case == "elasticsearch-sink":
run_elasticsearch_sink_demo()
elif args.case == "redis-sink":
run_redis_demo()
else:
run_demo(args.case, args.format)

0 comments on commit 5cac7a8

Please sign in to comment.