From 5cac7a825efffe4d82e894ae6f7327561d700bee Mon Sep 17 00:00:00 2001 From: xfz <73645462+xuefengze@users.noreply.github.com> Date: Mon, 27 Nov 2023 18:14:26 +0800 Subject: [PATCH] test: add `redis-sink` test into the integration test workflow (#13661) --- ci/workflows/integration-tests.yml | 5 +++ integration_tests/redis-sink/README.md | 6 +-- integration_tests/redis-sink/create_sink.sql | 6 +-- .../redis-sink/docker-compose.yml | 2 + integration_tests/redis-sink/sink_check | 1 + integration_tests/scripts/check_data.py | 2 +- integration_tests/scripts/run_demos.py | 37 +++++++++++++++++++ 7 files changed, 50 insertions(+), 9 deletions(-) create mode 100644 integration_tests/redis-sink/sink_check diff --git a/ci/workflows/integration-tests.yml b/ci/workflows/integration-tests.yml index 19c5af18dc863..10636a6714146 100644 --- a/ci/workflows/integration-tests.yml +++ b/ci/workflows/integration-tests.yml @@ -117,6 +117,7 @@ steps: - "kafka-cdc-sink" - "cassandra-and-scylladb-sink" - "elasticsearch-sink" + - "redis-sink" format: - "json" - "protobuf" @@ -169,3 +170,7 @@ steps: testcase: "elasticsearch-sink" format: "protobuf" skip: true + - with: + testcase: "redis-sink" + format: "protobuf" + skip: true diff --git a/integration_tests/redis-sink/README.md b/integration_tests/redis-sink/README.md index f2e5e64aec795..62e6131aaab57 100644 --- a/integration_tests/redis-sink/README.md +++ b/integration_tests/redis-sink/README.md @@ -20,11 +20,7 @@ The cluster contains a RisingWave cluster and its necessary dependencies, a data 3. Execute a simple query: ```sh -docker compose exec redis redis-ctl keys * +docker compose exec redis redis-cli keys '*' ``` We also can use 'get' to query value - -```sql -select user_id, count(*) from default.demo_test group by user_id -``` diff --git a/integration_tests/redis-sink/create_sink.sql b/integration_tests/redis-sink/create_sink.sql index 2ba9ba67feb39..3e85c87035073 100644 --- a/integration_tests/redis-sink/create_sink.sql +++ b/integration_tests/redis-sink/create_sink.sql @@ -3,7 +3,7 @@ FROM bhv_mv WITH ( primary_key = 'user_id', connector = 'redis', - redis.url= 'redis://127.0.0.1:6379/', + redis.url= 'redis://redis:6379/', )FORMAT PLAIN ENCODE JSON(force_append_only='true'); CREATE SINK bhv_redis_sink_2 @@ -11,5 +11,5 @@ FROM bhv_mv WITH ( primary_key = 'user_id', connector = 'redis', - redis.url= 'redis://127.0.0.1:6379/', -)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}'); \ No newline at end of file + redis.url= 'redis://redis:6379/', +)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}'); diff --git a/integration_tests/redis-sink/docker-compose.yml b/integration_tests/redis-sink/docker-compose.yml index a850f9b35c431..38ab16344d52a 100644 --- a/integration_tests/redis-sink/docker-compose.yml +++ b/integration_tests/redis-sink/docker-compose.yml @@ -5,6 +5,8 @@ services: image: 'redis:latest' expose: - 6379 + ports: + - 6379:6379 healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s diff --git a/integration_tests/redis-sink/sink_check b/integration_tests/redis-sink/sink_check new file mode 100644 index 0000000000000..550b803e6e0e6 --- /dev/null +++ b/integration_tests/redis-sink/sink_check @@ -0,0 +1 @@ +user_id,UserID diff --git a/integration_tests/scripts/check_data.py b/integration_tests/scripts/check_data.py index a4a35a7e03414..02da5db5f4f54 100644 --- a/integration_tests/scripts/check_data.py +++ b/integration_tests/scripts/check_data.py @@ -54,7 +54,7 @@ def run_psql(sql): demo = sys.argv[1] upstream = sys.argv[2] # mysql, postgres, etc. see scripts/integration_tests.sh -if demo in ['docker', 'iceberg-sink','clickhouse-sink', 'iceberg-cdc', 'kafka-cdc-sink', 'cassandra-and-scylladb-sink', 'elasticsearch-sink']: +if demo in ['docker', 'iceberg-sink','clickhouse-sink', 'iceberg-cdc', 'kafka-cdc-sink', 'cassandra-and-scylladb-sink', 'elasticsearch-sink', 'redis-sink']: print('Skip for running test for `%s`' % demo) sys.exit(0) diff --git a/integration_tests/scripts/run_demos.py b/integration_tests/scripts/run_demos.py index b2726e7fb5174..28d298df5ccde 100644 --- a/integration_tests/scripts/run_demos.py +++ b/integration_tests/scripts/run_demos.py @@ -271,6 +271,41 @@ def run_elasticsearch_sink_demo(): if len(failed_cases) != 0: raise Exception("Data check failed for case {}".format(failed_cases)) +def run_redis_demo(): + demo = "redis-sink" + file_dir = dirname(abspath(__file__)) + project_dir = dirname(file_dir) + demo_dir = os.path.join(project_dir, demo) + print("Running demo: {}".format(demo)) + + subprocess.run(["docker", "compose", "up", "-d", "--build"], cwd=demo_dir, check=True) + sleep(40) + + sql_files = ['create_source.sql', 'create_mv.sql', 'create_sink.sql'] + for fname in sql_files: + sql_file = os.path.join(demo_dir, fname) + print("executing sql: ", open(sql_file).read()) + run_sql_file(sql_file, demo_dir) + + sleep(40) + sink_check_file = os.path.join(demo_dir, 'sink_check') + with open(sink_check_file) as f: + relations = f.read().strip().split(",") + failed_cases = [] + for rel in relations: + query = "*{}*".format(rel) + print("Running query: scan on Redis".format(query)) + output = subprocess.Popen(["docker", "compose", "exec", "redis", "redis-cli", "--scan", "--pattern", query], cwd=demo_dir, stdout=subprocess.PIPE) + rows = subprocess.check_output(["wc", "-l"], cwd=demo_dir, stdin=output.stdout) + output.stdout.close() + output.wait() + rows = int(rows.decode('utf8').strip()) + print("{} keys in '*{}*'".format(rows, rel)) + if rows < 1: + failed_cases.append(rel) + if len(failed_cases) != 0: + raise Exception("Data check failed for case {}".format(failed_cases)) + arg_parser = argparse.ArgumentParser(description='Run the demo') arg_parser.add_argument('--format', metavar='format', @@ -301,5 +336,7 @@ def run_elasticsearch_sink_demo(): run_cassandra_and_scylladb_sink_demo() elif args.case == "elasticsearch-sink": run_elasticsearch_sink_demo() +elif args.case == "redis-sink": + run_redis_demo() else: run_demo(args.case, args.format)