diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 50cd3cc7f45d..7867c588c822 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -78,9 +78,9 @@ steps: key: "slow-e2e-test-release" command: "ci/scripts/slow-e2e-test.sh -p ci-release -m ci-3streaming-2serving-3fe" if: | - !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - || build.pull_request.labels includes "ci/run-slow-e2e-tests" - || build.env("CI_STEPS") =~ /(^|,)slow-e2e-tests?(,|$$)/ + !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-slow-e2e-tests" + || build.env("CI_STEPS") =~ /(^|,)slow-e2e-tests?(,|$$)/ depends_on: - "build" - "build-other" @@ -478,6 +478,28 @@ steps: timeout_in_minutes: 25 retry: *auto-retry + - label: "S3_v2 source new file check on AWS (json)" + key: "s3-v2-source-new-file-check-aws" + command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2_new_file.py" + if: | + !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-s3-source-tests" + || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ + depends_on: build + plugins: + - seek-oss/aws-sm#v2.3.1: + env: + S3_SOURCE_TEST_CONF: ci_s3_source_test_aws + - docker-compose#v5.1.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - S3_SOURCE_TEST_CONF + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 25 + retry: *auto-retry + - label: "S3_v2 source check on parquet file" key: "s3-v2-source-check-parquet-file" command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source.py" diff --git a/e2e_test/s3/fs_source_v2.py b/e2e_test/s3/fs_source_v2.py index 760b8d07a09a..6706d4b6d4a9 100644 --- a/e2e_test/s3/fs_source_v2.py +++ b/e2e_test/s3/fs_source_v2.py @@ -43,7 +43,7 @@ def format_csv(data, with_header): csv_files.append(ostream.getvalue()) return csv_files -def do_test(config, file_num, item_num_per_file, prefix, fmt): +def do_test(config, file_num, item_num_per_file, prefix, fmt, need_drop_table=True): conn = psycopg2.connect( host="localhost", port="4566", @@ -106,10 +106,16 @@ def _assert_eq(field, got, expect): print('Test pass') - cur.execute(f'drop table {_table()}') + if need_drop_table: + cur.execute(f'drop table {_table()}') cur.close() conn.close() +FORMATTER = { + 'json': format_json, + 'csv_with_header': partial(format_csv, with_header=True), + 'csv_without_header': partial(format_csv, with_header=False), + } if __name__ == "__main__": FILE_NUM = 4001 @@ -117,11 +123,6 @@ def _assert_eq(field, got, expect): data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) fmt = sys.argv[1] - FORMATTER = { - 'json': format_json, - 'csv_with_header': partial(format_csv, with_header=True), - 'csv_without_header': partial(format_csv, with_header=False), - } assert fmt in FORMATTER, f"Unsupported format: {fmt}" formatted_files = FORMATTER[fmt](data) diff --git a/e2e_test/s3/fs_source_v2_new_file.py b/e2e_test/s3/fs_source_v2_new_file.py new file mode 100644 index 000000000000..a7ae53f3a37d --- /dev/null +++ b/e2e_test/s3/fs_source_v2_new_file.py @@ -0,0 +1,86 @@ +from fs_source_v2 import gen_data, FORMATTER, do_test +import json +import os +import random +import psycopg2 +import time +from minio import Minio + + +def upload_to_s3_bucket(config, minio_client, run_id, files, start_bias): + _local = lambda idx, start_bias: f"data_{idx + start_bias}.{fmt}" + _s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}" + for idx, file_str in enumerate(files): + with open(_local(idx, start_bias), "w") as f: + f.write(file_str) + os.fsync(f.fileno()) + + minio_client.fput_object( + config["S3_BUCKET"], _s3(idx, start_bias), _local(idx, start_bias) + ) + + +def check_for_new_files(file_num, item_num_per_file, fmt): + conn = psycopg2.connect(host="localhost", port="4566", user="root", database="dev") + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _table(): + return f"s3_test_{fmt}" + + total_rows = file_num * item_num_per_file + + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f"select count(*) from {_table()}") + result = cur.fetchone() + if result[0] == total_rows: + return True + print( + f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s" + ) + time.sleep(10) + return False + + +if __name__ == "__main__": + FILE_NUM = 101 + ITEM_NUM_PER_FILE = 2 + data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) + fmt = "json" + + split_idx = 51 + data_batch1 = data[:split_idx] + data_batch2 = data[split_idx:] + + config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) + client = Minio( + config["S3_ENDPOINT"], + access_key=config["S3_ACCESS_KEY"], + secret_key=config["S3_SECRET_KEY"], + secure=True, + ) + run_id = str(random.randint(1000, 9999)) + print(f"S3 Source New File Test: run ID: {run_id} to bucket {config['S3_BUCKET']}") + + formatted_batch1 = FORMATTER[fmt](data_batch1) + upload_to_s3_bucket(config, client, run_id, formatted_batch1, 0) + + do_test( + config, len(data_batch1), ITEM_NUM_PER_FILE, run_id, fmt, need_drop_table=False + ) + + formatted_batch2 = FORMATTER[fmt](data_batch2) + upload_to_s3_bucket(config, client, run_id, formatted_batch2, split_idx) + + success_flag = check_for_new_files(FILE_NUM, ITEM_NUM_PER_FILE, fmt) + if success_flag: + print("Test(add new file) pass") + else: + print("Test(add new file) fail") + + _s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}" + # clean up s3 files + for idx, _ in enumerate(data): + client.remove_object(config["S3_BUCKET"], _s3(idx, 0)) diff --git a/src/stream/src/executor/source/list_executor.rs b/src/stream/src/executor/source/list_executor.rs index 9317cfbc9aa4..25b32c0a0e4b 100644 --- a/src/stream/src/executor/source/list_executor.rs +++ b/src/stream/src/executor/source/list_executor.rs @@ -146,36 +146,45 @@ impl FsListExecutor { yield Message::Barrier(barrier); - while let Some(msg) = stream.next().await { - match msg { - Err(e) => { - tracing::warn!(error = %e.as_report(), "encountered an error, recovering"); - // todo: rebuild stream here - } - Ok(msg) => match msg { - // Barrier arrives. - Either::Left(msg) => match &msg { - Message::Barrier(barrier) => { - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Pause => stream.pause_stream(), - Mutation::Resume => stream.resume_stream(), - _ => (), + loop { + // a list file stream never ends, keep list to find if there is any new file. + while let Some(msg) = stream.next().await { + match msg { + Err(e) => { + tracing::warn!(error = %e.as_report(), "encountered an error, recovering"); + stream + .replace_data_stream(self.build_chunked_paginate_stream(&source_desc)?); + } + Ok(msg) => match msg { + // Barrier arrives. + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), + _ => (), + } } - } - // Propagate the barrier. - yield msg; + // Propagate the barrier. + yield msg; + } + // Only barrier can be received. + _ => unreachable!(), + }, + // Chunked FsPage arrives. + Either::Right(chunk) => { + yield Message::Chunk(chunk); } - // Only barrier can be received. - _ => unreachable!(), }, - // Chunked FsPage arrives. - Either::Right(chunk) => { - yield Message::Chunk(chunk); - } - }, + } } + + stream.replace_data_stream( + self.build_chunked_paginate_stream(&source_desc) + .map_err(StreamExecutorError::from)?, + ); } } }