-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(source): introduce gcs source and new s3 source via OpenDAL #13414
Merged
Merged
Changes from 36 commits
Commits
Show all changes
57 commits
Select commit
Hold shift + click to select a range
eb7b035
save work
wcy-fdu 02f63ee
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
wcy-fdu 4fa7226
save work
wcy-fdu 2871d05
save work
wcy-fdu 7a5321d
save work
wcy-fdu 3023575
implement lister, reader wi[
wcy-fdu 3a7bdef
save work
wcy-fdu 6b5a435
save work
wcy-fdu 28e7286
implement opendal reader
wcy-fdu 0314c2a
minor
wcy-fdu bb62cc2
introduce OpenDALConnectorTypeMarker
wcy-fdu ee2f87a
can apply gcs and s3
wcy-fdu c435b10
make clippy happy
wcy-fdu 536bed4
bring new s3 source
wcy-fdu e52a599
minor
wcy-fdu 078f506
can use opendal s3 as source, todo: add filter for prefix
wcy-fdu 98de2c5
remove origin s3_v2, and use opendal as new s3_v2
wcy-fdu 743226f
add credential
wcy-fdu 4c774eb
fix list
wcy-fdu a303fc3
use stream read
wcy-fdu 64f5f24
add python test script
wcy-fdu a0dc1cd
minor
wcy-fdu f37a0af
ready for review
wcy-fdu 4ce0a99
format python file
wcy-fdu b8792b8
fmt
wcy-fdu e44558b
fmt
wcy-fdu fdcd292
fmt
wcy-fdu 0ded6e0
strange fmt
wcy-fdu 8d5dc2c
Merge branch 'main' into wcy/gcs_source
wcy-fdu b762319
add e2e test for gcs source in main cron
wcy-fdu 6d08617
Merge branch 'wcy/gcs_source' of https://github.com/risingwavelabs/ri…
wcy-fdu 1bf66be
install opendal in python
wcy-fdu 0e6fa98
install opendal in python
wcy-fdu 7682ce2
fix python scripts
wcy-fdu de9f1bf
fix python scripts
wcy-fdu a29a574
Merge branch 'main' into wcy/gcs_source
wcy-fdu f41cc59
resolve some comments
wcy-fdu bacf0af
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
wcy-fdu 52a95f1
gcs source supports csv, add assume role for s3
wcy-fdu 1a08aa7
resolve some comments
wcy-fdu 8453836
resolve some comments
wcy-fdu 5081ee7
refactor: refine OpendalSource types
stdrc b7c26da
resolve some comments
wcy-fdu 1bab9fc
minor
wcy-fdu a352b0c
remove Timestamp
wcy-fdu 7beee7c
minor
wcy-fdu e04e46f
add github label
wcy-fdu a4188dd
rebase main
wcy-fdu 40ac24b
use prefix list
wcy-fdu c05c22e
Fix "cargo-hakari"
wcy-fdu cdce5db
rerun
wcy-fdu ff1084c
Merge branch 'wcy/gcs_source' of https://github.com/risingwavelabs/ri…
wcy-fdu 3ecb61c
can merge
wcy-fdu b320bac
minor
wcy-fdu a09f116
resolve conflict
wcy-fdu 6e3b492
Fix "cargo-hakari"
wcy-fdu dd22a8d
Merge branch 'main' into wcy/gcs_source
wcy-fdu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
import os | ||
import sys | ||
import csv | ||
import json | ||
import random | ||
import psycopg2 | ||
import opendal | ||
|
||
from time import sleep | ||
from io import StringIO | ||
from functools import partial | ||
|
||
def gen_data(file_num, item_num_per_file): | ||
assert item_num_per_file % 2 == 0, \ | ||
f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' | ||
return [ | ||
[{ | ||
'id': file_id * item_num_per_file + item_id, | ||
'name': f'{file_id}_{item_id}', | ||
'sex': item_id % 2, | ||
'mark': (-1) ** (item_id % 2), | ||
} for item_id in range(item_num_per_file)] | ||
for file_id in range(file_num) | ||
] | ||
|
||
def format_json(data): | ||
return [ | ||
'\n'.join([json.dumps(item) for item in file]) | ||
for file in data | ||
] | ||
|
||
|
||
def do_test(config, file_num, item_num_per_file, prefix, fmt, credential): | ||
conn = psycopg2.connect( | ||
host="localhost", | ||
port="4566", | ||
user="root", | ||
database="dev" | ||
) | ||
|
||
# Open a cursor to execute SQL statements | ||
cur = conn.cursor() | ||
|
||
def _table(): | ||
return f'gcs_test_{fmt}' | ||
|
||
def _encode(): | ||
return 'JSON' | ||
|
||
# Execute a SELECT statement | ||
cur.execute(f'''CREATE TABLE {_table()}( | ||
id int, | ||
name TEXT, | ||
sex int, | ||
mark int, | ||
) WITH ( | ||
connector = 'gcs', | ||
match_pattern = '{prefix}*.{fmt}', | ||
gcs.bucket_name = '{config['GCS_BUCKET']}', | ||
gcs.credentials = '{credential}', | ||
) FORMAT PLAIN ENCODE {_encode()};''') | ||
|
||
total_rows = file_num * item_num_per_file | ||
MAX_RETRIES = 40 | ||
for retry_no in range(MAX_RETRIES): | ||
cur.execute(f'select count(*) from {_table()}') | ||
result = cur.fetchone() | ||
if result[0] == total_rows: | ||
break | ||
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") | ||
sleep(30) | ||
|
||
stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' | ||
print(f'Execute {stmt}') | ||
cur.execute(stmt) | ||
result = cur.fetchone() | ||
|
||
print('Got:', result) | ||
|
||
def _assert_eq(field, got, expect): | ||
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' | ||
|
||
_assert_eq('count(*)', result[0], total_rows) | ||
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) | ||
_assert_eq('sum(sex)', result[2], total_rows / 2) | ||
_assert_eq('sum(mark)', result[3], 0) | ||
|
||
print('Test pass') | ||
|
||
cur.execute(f'drop table {_table()}') | ||
cur.close() | ||
conn.close() | ||
|
||
|
||
if __name__ == "__main__": | ||
FILE_NUM = 4001 | ||
ITEM_NUM_PER_FILE = 2 | ||
data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) | ||
|
||
fmt = sys.argv[1] | ||
FORMATTER = { | ||
'json': format_json, | ||
} | ||
assert fmt in FORMATTER, f"Unsupported format: {fmt}" | ||
formatted_files = FORMATTER[fmt](data) | ||
|
||
config = json.loads(os.environ["GCS_SOURCE_TEST_CONF"]) | ||
run_id = str(random.randint(1000, 9999)) | ||
_local = lambda idx: f'data_{idx}.{fmt}' | ||
_gcs = lambda idx: f"{run_id}_data_{idx}.{fmt}" | ||
credential_str = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS"]) | ||
# put gcs files | ||
op = opendal.Operator("gcs", root="/", bucket=config["GCS_BUCKET"], credential=credential_str) | ||
|
||
print("upload file to gcs") | ||
for idx, file_str in enumerate(formatted_files): | ||
with open(_local(idx), "w") as f: | ||
f.write(file_str) | ||
os.fsync(f.fileno()) | ||
file_bytes = file_str.encode('utf-8') | ||
op.write(_gcs(idx), file_bytes) | ||
|
||
# do test | ||
print("do test") | ||
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt, credential_str) | ||
|
||
# clean up gcs files | ||
print("clean up gcs files") | ||
for idx, _ in enumerate(formatted_files): | ||
op.delete(_gcs(idx)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test passed in gcp machine but fail in out CI, I guess it's because the gateway/network issue while doing credential check, so I suggest only leave s3 source in CI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huangjw806 any solution for testing GCS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gcloud auth activate-service-account
Maybe you need to auth gcloud with service account key.