Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(test): reorganize file connector CI tests #19230

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 3 additions & 86 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -466,29 +466,7 @@ steps:

- label: "S3 source check on AWS (json parser)"
key: "s3-v2-source-check-aws-json-parser"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2.py -t json"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 source new file check on AWS (json)"
key: "s3-v2-source-new-file-check-aws"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2_new_file.py"
command: "ci/scripts/s3-source-test.sh -p ci-release -s file_source.py -t json"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand All @@ -510,51 +488,7 @@ steps:

- label: "S3 sink on parquet and json file"
key: "s3-sink-parquet-and-json-encode"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_sink.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 sink batching test"
key: "s3-sink-batching-test"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_sink_batch.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 source batch read on AWS (json parser)"
key: "s3-v2-source-batch-read-check-aws-json-parser"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_batch.py -t json"
command: "ci/scripts/s3-source-test.sh -p ci-release -s file_sink.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand All @@ -576,7 +510,7 @@ steps:

- label: "S3 source check on AWS (csv parser)"
key: "s3-v2-source-check-aws-csv-parser"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2.py -t csv_without_header"
command: "ci/scripts/s3-source-test.sh -p ci-release -s file_source.py -t csv_without_header"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand All @@ -596,23 +530,6 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "PosixFs source on OpenDAL fs engine (csv parser)"
key: "s3-source-test-for-opendal-fs-engine-csv-parser"
command: "ci/scripts/s3-source-test.sh -p ci-release -s posix_fs_source.py -t csv_without_header"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "pulsar source check"
key: "pulsar-source-tests"
command: "ci/scripts/pulsar-source-test.sh -p ci-release"
Expand Down
126 changes: 125 additions & 1 deletion e2e_test/s3/fs_sink.py → e2e_test/s3/file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from time import sleep
from minio import Minio
from random import uniform
from time import sleep
import time

def gen_data(file_num, item_num_per_file):
assert item_num_per_file % 2 == 0, \
Expand Down Expand Up @@ -266,9 +268,129 @@ def _assert_eq(field, got, expect):
cur.execute(f'drop table test_parquet_sink_table')
cur.execute(f'drop sink test_file_sink_json')
cur.execute(f'drop table test_json_sink_table')
cur.execute(f'drop table s3_test_parquet')
cur.close()
conn.close()

def test_file_sink_batching():
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()


# Execute a SELECT statement
cur.execute(f'''CREATE TABLE t (v1 int, v2 int);''')

print('test file sink batching...\n')
cur.execute(f'''CREATE sink test_file_sink_batching as select
v1, v2 from t WITH (
connector = 's3',
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
s3.path = 'test_file_sink_batching/',
s3.file_type = 'parquet',
type = 'append-only',
rollover_seconds = 5,
max_row_count = 5,
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''')

cur.execute(f'''CREATE TABLE test_file_sink_batching_table(
v1 int,
v2 int,
) WITH (
connector = 's3',
match_pattern = 'test_file_sink_batching/*.parquet',
refresh.interval.sec = 1,
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
) FORMAT PLAIN ENCODE PARQUET;''')

cur.execute(f'''ALTER SINK test_file_sink_batching SET PARALLELISM = 2;''')

cur.execute(f'''INSERT INTO t VALUES (10, 10);''')


cur.execute(f'select count(*) from test_file_sink_batching_table')
# no item will be selectedpsq
result = cur.fetchone()

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'
def _assert_greater(field, got, expect):
assert got > expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], 0)
print('the rollover_seconds has not reached, count(*) = 0')


time.sleep(11)

cur.execute(f'select count(*) from test_file_sink_batching_table')
result = cur.fetchone()
_assert_eq('count(*)', result[0], 1)
print('the rollover_seconds has reached, count(*) = ', result[0])

cur.execute(f'''
INSERT INTO t VALUES (20, 20);
INSERT INTO t VALUES (30, 30);
INSERT INTO t VALUES (40, 40);
INSERT INTO t VALUES (50, 10);
''')

cur.execute(f'select count(*) from test_file_sink_batching_table')
# count(*) = 1
result = cur.fetchone()
_assert_eq('count(*)', result[0], 1)
print('the max row count has not reached, count(*) = ', result[0])

cur.execute(f'''
INSERT INTO t VALUES (60, 20);
INSERT INTO t VALUES (70, 30);
INSERT INTO t VALUES (80, 10);
INSERT INTO t VALUES (90, 20);
INSERT INTO t VALUES (100, 30);
INSERT INTO t VALUES (100, 10);
''')

time.sleep(10)

cur.execute(f'select count(*) from test_file_sink_batching_table')
result = cur.fetchone()
_assert_greater('count(*)', result[0], 1)
print('the rollover_seconds has reached, count(*) = ', result[0])

cur.execute(f'drop sink test_file_sink_batching;')
cur.execute(f'drop table t;')
cur.execute(f'drop table test_file_sink_batching_table;')
cur.close()
conn.close()
# delete objects

client = Minio(
"127.0.0.1:9301",
"hummockadmin",
"hummockadmin",
secure=False,
)
objects = client.list_objects("hummock001", prefix="test_file_sink_batching/", recursive=True)

for obj in objects:
client.remove_object("hummock001", obj.object_name)
print(f"Deleted: {obj.object_name}")



if __name__ == "__main__":
Expand Down Expand Up @@ -307,7 +429,9 @@ def _assert_eq(field, got, expect):

do_sink(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)

# clean up s3 files
# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object("hummock001", _s3(idx))

# test file sink batching
test_file_sink_batching()
Loading
Loading