Skip to content

Commit

Permalink
fix: s3_v2 connector cannot read incremental files (#17702)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Jul 17, 2024
1 parent 9b09726 commit d2d881b
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 35 deletions.
50 changes: 47 additions & 3 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ steps:
key: "slow-e2e-test-release"
command: "ci/scripts/slow-e2e-test.sh -p ci-release -m ci-3streaming-2serving-3fe"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-slow-e2e-tests"
|| build.env("CI_STEPS") =~ /(^|,)slow-e2e-tests?(,|$$)/
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-slow-e2e-tests"
|| build.env("CI_STEPS") =~ /(^|,)slow-e2e-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
Expand Down Expand Up @@ -478,6 +478,50 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source new file check on AWS (json)"
key: "s3-v2-source-new-file-check-aws"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2_new_file.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source check on parquet file"
key: "s3-v2-source-check-parquet-file"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source batch read on AWS (json parser)"
key: "s3-v2-source-batch-read-check-aws-json-parser"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_batch.py -t json"
Expand Down
15 changes: 8 additions & 7 deletions e2e_test/s3/fs_source_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def format_csv(data, with_header):
csv_files.append(ostream.getvalue())
return csv_files

def do_test(config, file_num, item_num_per_file, prefix, fmt):
def do_test(config, file_num, item_num_per_file, prefix, fmt, need_drop_table=True):
conn = psycopg2.connect(
host="localhost",
port="4566",
Expand Down Expand Up @@ -105,22 +105,23 @@ def _assert_eq(field, got, expect):

print('Test pass')

cur.execute(f'drop table {_table()}')
if need_drop_table:
cur.execute(f'drop table {_table()}')
cur.close()
conn.close()

FORMATTER = {
'json': format_json,
'csv_with_header': partial(format_csv, with_header=True),
'csv_without_header': partial(format_csv, with_header=False),
}

if __name__ == "__main__":
FILE_NUM = 4001
ITEM_NUM_PER_FILE = 2
data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE)

fmt = sys.argv[1]
FORMATTER = {
'json': format_json,
'csv_with_header': partial(format_csv, with_header=True),
'csv_without_header': partial(format_csv, with_header=False),
}
assert fmt in FORMATTER, f"Unsupported format: {fmt}"
formatted_files = FORMATTER[fmt](data)

Expand Down
86 changes: 86 additions & 0 deletions e2e_test/s3/fs_source_v2_new_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from fs_source_v2 import gen_data, FORMATTER, do_test
import json
import os
import random
import psycopg2
import time
from minio import Minio


def upload_to_s3_bucket(config, minio_client, run_id, files, start_bias):
_local = lambda idx, start_bias: f"data_{idx + start_bias}.{fmt}"
_s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}"
for idx, file_str in enumerate(files):
with open(_local(idx, start_bias), "w") as f:
f.write(file_str)
os.fsync(f.fileno())

minio_client.fput_object(
config["S3_BUCKET"], _s3(idx, start_bias), _local(idx, start_bias)
)


def check_for_new_files(file_num, item_num_per_file, fmt):
conn = psycopg2.connect(host="localhost", port="4566", user="root", database="dev")

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return f"s3_test_{fmt}"

total_rows = file_num * item_num_per_file

MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f"select count(*) from {_table()}")
result = cur.fetchone()
if result[0] == total_rows:
return True
print(
f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s"
)
time.sleep(10)
return False


if __name__ == "__main__":
FILE_NUM = 101
ITEM_NUM_PER_FILE = 2
data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE)
fmt = "json"

split_idx = 51
data_batch1 = data[:split_idx]
data_batch2 = data[split_idx:]

config = json.loads(os.environ["S3_SOURCE_TEST_CONF"])
client = Minio(
config["S3_ENDPOINT"],
access_key=config["S3_ACCESS_KEY"],
secret_key=config["S3_SECRET_KEY"],
secure=True,
)
run_id = str(random.randint(1000, 9999))
print(f"S3 Source New File Test: run ID: {run_id} to bucket {config['S3_BUCKET']}")

formatted_batch1 = FORMATTER[fmt](data_batch1)
upload_to_s3_bucket(config, client, run_id, formatted_batch1, 0)

do_test(
config, len(data_batch1), ITEM_NUM_PER_FILE, run_id, fmt, need_drop_table=False
)

formatted_batch2 = FORMATTER[fmt](data_batch2)
upload_to_s3_bucket(config, client, run_id, formatted_batch2, split_idx)

success_flag = check_for_new_files(FILE_NUM, ITEM_NUM_PER_FILE, fmt)
if success_flag:
print("Test(add new file) pass")
else:
print("Test(add new file) fail")

_s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}"
# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx, 0))
59 changes: 34 additions & 25 deletions src/stream/src/executor/source/list_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,36 +146,45 @@ impl<S: StateStore> FsListExecutor<S> {

yield Message::Barrier(barrier);

while let Some(msg) = stream.next().await {
match msg {
Err(e) => {
tracing::warn!(error = %e.as_report(), "encountered an error, recovering");
// todo: rebuild stream here
}
Ok(msg) => match msg {
// Barrier arrives.
Either::Left(msg) => match &msg {
Message::Barrier(barrier) => {
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
_ => (),
loop {
// a list file stream never ends, keep list to find if there is any new file.
while let Some(msg) = stream.next().await {
match msg {
Err(e) => {
tracing::warn!(error = %e.as_report(), "encountered an error, recovering");
stream
.replace_data_stream(self.build_chunked_paginate_stream(&source_desc)?);
}
Ok(msg) => match msg {
// Barrier arrives.
Either::Left(msg) => match &msg {
Message::Barrier(barrier) => {
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
_ => (),
}
}
}

// Propagate the barrier.
yield msg;
// Propagate the barrier.
yield msg;
}
// Only barrier can be received.
_ => unreachable!(),
},
// Chunked FsPage arrives.
Either::Right(chunk) => {
yield Message::Chunk(chunk);
}
// Only barrier can be received.
_ => unreachable!(),
},
// Chunked FsPage arrives.
Either::Right(chunk) => {
yield Message::Chunk(chunk);
}
},
}
}

stream.replace_data_stream(
self.build_chunked_paginate_stream(&source_desc)
.map_err(StreamExecutorError::from)?,
);
}
}
}
Expand Down

0 comments on commit d2d881b

Please sign in to comment.