From e02d2097e439bbd8f30bde09ca490e2ba638cf18 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Wed, 3 Jan 2024 17:19:57 -0500 Subject: [PATCH] add test --- ci/scripts/notify.py | 3 +- ci/workflows/main-cron.yml | 16 ++++ e2e_test/s3/posix_fs_source.py | 138 +++++++++++++++++++++++++++++++++ e2e_test/s3/run_csv.py | 1 - 4 files changed, 156 insertions(+), 2 deletions(-) create mode 100644 e2e_test/s3/posix_fs_source.py diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py index 818dfce72143a..5266998b0045f 100755 --- a/ci/scripts/notify.py +++ b/ci/scripts/notify.py @@ -19,7 +19,8 @@ "e2e-java-binding-tests": ["yiming"], "e2e-clickhouse-sink-tests": ["bohan"], "e2e-pulsar-sink-tests": ["renjie"], - "s3-source-test-for-opendal-fs-engine": ["congyi"], + "s3-source-test-for-opendal-fs-engine": ["congyi", "kexiang"], + "s3-source-tests": ["congyi", "kexiang"], "pulsar-source-tests": ["renjie"], "connector-node-integration-test": ["siyuan"], } diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 75f58eadf2492..653578e4688e2 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -489,6 +489,22 @@ steps: timeout_in_minutes: 25 retry: *auto-retry + - label: "PosixFs source on OpenDAL fs engine (csv parser)" + command: "ci/scripts/s3-source-test.sh -p ci-release -s 'posix_fs_source.py csv_without_header'" + if: | + !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-s3-source-tests" + || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ + depends_on: build + plugins: + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 25 + retry: *auto-retry + - label: "S3 source on OpenDAL fs engine" key: "s3-source-test-for-opendal-fs-engine" command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run" diff --git a/e2e_test/s3/posix_fs_source.py b/e2e_test/s3/posix_fs_source.py new file mode 100644 index 0000000000000..a4142b1e0d959 --- /dev/null +++ b/e2e_test/s3/posix_fs_source.py @@ -0,0 +1,138 @@ +import os +import sys +import csv +import random +import psycopg2 +import opendal + +from time import sleep +from io import StringIO +from functools import partial + +def gen_data(file_num, item_num_per_file): + assert item_num_per_file % 2 == 0, \ + f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' + return [ + [{ + 'id': file_id * item_num_per_file + item_id, + 'name': f'{file_id}_{item_id}', + 'sex': item_id % 2, + 'mark': (-1) ** (item_id % 2), + } for item_id in range(item_num_per_file)] + for file_id in range(file_num) + ] + +def format_csv(data, with_header): + csv_files = [] + + for file_data in data: + ostream = StringIO() + writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys()) + if with_header: + writer.writeheader() + for item_data in file_data: + writer.writerow(item_data) + csv_files.append(ostream.getvalue()) + return csv_files + + +def do_test(file_num, item_num_per_file, prefix, fmt): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _table(): + return f'posix_fs_test_{fmt}' + + def _encode(): + return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" + + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE {_table()}( + id int, + name TEXT, + sex int, + mark int, + ) WITH ( + connector = 'posix_fs', + match_pattern = '{prefix}*.{fmt}', + posix_fs.root = '/tmp', + ) FORMAT PLAIN ENCODE {_encode()};''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from {_table()}') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") + sleep(30) + + stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' + print(f'Execute {stmt}') + cur.execute(stmt) + result = cur.fetchone() + + print('Got:', result) + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], total_rows) + _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) + _assert_eq('sum(sex)', result[2], total_rows / 2) + _assert_eq('sum(mark)', result[3], 0) + + print('Test pass') + + cur.execute(f'drop table {_table()}') + cur.close() + conn.close() + + +if __name__ == "__main__": + FILE_NUM = 4001 + ITEM_NUM_PER_FILE = 2 + data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) + + fmt = sys.argv[1] + FORMATTER = { + 'csv_with_header': partial(format_csv, with_header=True), + 'csv_without_header': partial(format_csv, with_header=False), + } + assert fmt in FORMATTER, f"Unsupported format: {fmt}" + formatted_files = FORMATTER[fmt](data) + + run_id = str(random.randint(1000, 9999)) + _local = lambda idx: f'data_{idx}.{fmt}' + _posix = lambda idx: f"{run_id}_data_{idx}.{fmt}" + # put local files + op = opendal.Operator("fs", root="/tmp") + + print("write file to /tmp") + for idx, file_str in enumerate(formatted_files): + with open(_local(idx), "w") as f: + f.write(file_str) + os.fsync(f.fileno()) + file_name = _posix(idx) + print(f"Wrote {file_name} to /tmp") + file_bytes = file_str.encode('utf-8') + op.write(file_name, file_bytes) + + # do test + print("do test") + do_test(FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) + + # clean up local files + print("clean up local files in /tmp") + for idx, _ in enumerate(formatted_files): + file_name = _posix(idx) + print(f"Removed {file_name} from /tmp") + op.delete(file_name) diff --git a/e2e_test/s3/run_csv.py b/e2e_test/s3/run_csv.py index b721e3c796066..a6c0dc37bc4ca 100644 --- a/e2e_test/s3/run_csv.py +++ b/e2e_test/s3/run_csv.py @@ -1,5 +1,4 @@ import os -import string import json import string from time import sleep