Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Jan 3, 2024
1 parent 8f4b503 commit e02d209
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 2 deletions.
3 changes: 2 additions & 1 deletion ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"e2e-java-binding-tests": ["yiming"],
"e2e-clickhouse-sink-tests": ["bohan"],
"e2e-pulsar-sink-tests": ["renjie"],
"s3-source-test-for-opendal-fs-engine": ["congyi"],
"s3-source-test-for-opendal-fs-engine": ["congyi", "kexiang"],
"s3-source-tests": ["congyi", "kexiang"],
"pulsar-source-tests": ["renjie"],
"connector-node-integration-test": ["siyuan"],
}
Expand Down
16 changes: 16 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,22 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "PosixFs source on OpenDAL fs engine (csv parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s 'posix_fs_source.py csv_without_header'"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 source on OpenDAL fs engine"
key: "s3-source-test-for-opendal-fs-engine"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run"
Expand Down
138 changes: 138 additions & 0 deletions e2e_test/s3/posix_fs_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import os
import sys
import csv
import random
import psycopg2
import opendal

from time import sleep
from io import StringIO
from functools import partial

def gen_data(file_num, item_num_per_file):
assert item_num_per_file % 2 == 0, \
f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}'
return [
[{
'id': file_id * item_num_per_file + item_id,
'name': f'{file_id}_{item_id}',
'sex': item_id % 2,
'mark': (-1) ** (item_id % 2),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]

def format_csv(data, with_header):
csv_files = []

for file_data in data:
ostream = StringIO()
writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys())
if with_header:
writer.writeheader()
for item_data in file_data:
writer.writerow(item_data)
csv_files.append(ostream.getvalue())
return csv_files


def do_test(file_num, item_num_per_file, prefix, fmt):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return f'posix_fs_test_{fmt}'

def _encode():
return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})"

# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id int,
name TEXT,
sex int,
mark int,
) WITH (
connector = 'posix_fs',
match_pattern = '{prefix}*.{fmt}',
posix_fs.root = '/tmp',
) FORMAT PLAIN ENCODE {_encode()};''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from {_table()}')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s")
sleep(30)

stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}'
print(f'Execute {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)
_assert_eq('sum(sex)', result[2], total_rows / 2)
_assert_eq('sum(mark)', result[3], 0)

print('Test pass')

cur.execute(f'drop table {_table()}')
cur.close()
conn.close()


if __name__ == "__main__":
FILE_NUM = 4001
ITEM_NUM_PER_FILE = 2
data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE)

fmt = sys.argv[1]
FORMATTER = {
'csv_with_header': partial(format_csv, with_header=True),
'csv_without_header': partial(format_csv, with_header=False),
}
assert fmt in FORMATTER, f"Unsupported format: {fmt}"
formatted_files = FORMATTER[fmt](data)

run_id = str(random.randint(1000, 9999))
_local = lambda idx: f'data_{idx}.{fmt}'
_posix = lambda idx: f"{run_id}_data_{idx}.{fmt}"
# put local files
op = opendal.Operator("fs", root="/tmp")

print("write file to /tmp")
for idx, file_str in enumerate(formatted_files):
with open(_local(idx), "w") as f:
f.write(file_str)
os.fsync(f.fileno())
file_name = _posix(idx)
print(f"Wrote {file_name} to /tmp")
file_bytes = file_str.encode('utf-8')
op.write(file_name, file_bytes)

# do test
print("do test")
do_test(FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt)

# clean up local files
print("clean up local files in /tmp")
for idx, _ in enumerate(formatted_files):
file_name = _posix(idx)
print(f"Removed {file_name} from /tmp")
op.delete(file_name)
1 change: 0 additions & 1 deletion e2e_test/s3/run_csv.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import string
import json
import string
from time import sleep
Expand Down

0 comments on commit e02d209

Please sign in to comment.