-
Notifications
You must be signed in to change notification settings - Fork 912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multi-file and Parquet-aware prefetching from remote storage #16657
Multi-file and Parquet-aware prefetching from remote storage #16657
Conversation
Adds new benchmark for parquet read performance using a `LocalCUDACluster`. The user can pass in `--key` and `--secret` options to specify S3 credentials. E.g. ``` $ python ./local_read_parquet.py --devs 0,1,2,3,4,5,6,7 --filesystem fsspec --type gpu --file-count 48 --aggregate-files Parquet read benchmark -------------------------------------------------------------------------------- Path | s3://dask-cudf-parquet-testing/dedup_parquet Columns | None Backend | cudf Filesystem | fsspec Blocksize | 244.14 MiB Aggregate files | True Row count | 372066 Size on disk | 1.03 GiB Number of workers | 8 ================================================================================ Wall clock | Throughput -------------------------------------------------------------------------------- 36.75 s | 28.78 MiB/s 21.29 s | 49.67 MiB/s 17.91 s | 59.05 MiB/s ================================================================================ Throughput | 41.77 MiB/s +/- 7.81 MiB/s Bandwidth | 0 B/s +/- 0 B/s Wall clock | 25.32 s +/- 8.20 s ================================================================================ ... ``` **Notes**: - S3 Performance generally scales with the number of workers (multiplied the number of threads per worker) - The example shown above was not executed from an EC2 instance - The example shown above *should* perform better after rapidsai/cudf#16657 - Using `--filesystem arrow` together with `--type gpu` performs well, but depends on rapidsai/cudf#16684 Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) - Peter Andreas Entschev (https://github.com/pentschev) URL: #1371
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few small questions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small suggestions but overall this LGTM now.
python/cudf/cudf/utils/ioutils.py
Outdated
"all": _get_remote_bytes_all, | ||
}[method] | ||
except KeyError: | ||
raise NotImplementedError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Since this is an internal function I wouldn't bother with exception handling. The only callers should be internal, so if we provide an invalid method we can be responsible for tracking down the problem when the KeyError is observed. Alternatively, convert the method to an enum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The user can technically pass in prefetch_options={"method": "foo"}
, and it's probably best to return a clear error message. (Though, ValueError
seems better than NotImplementedError
in this case)
unique_count = dict(zip(*np.unique(paths, return_counts=True))) | ||
offset = np.cumsum([0] + [unique_count[p] for p in remote_paths]) | ||
buffers = [ | ||
functools.reduce(operator.add, chunks[offset[i] : offset[i + 1]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (non-blocking): I thought reduce(add, foo)
is just sum(foo)
, what am I missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this had me a bit confused as well. It turns out that operator.add
will effectively join byte strings, but sum
will require the intermediate values to be numeric values:
import operator
assert operator.add(b"asdf", b"jkl;") == b'asdfjkl;' # Assertion passes
assert sum([b"asdf", b"jkl;"]) == b'asdfjkl;' # Raises
TypeError: unsupported operand type(s) for +: 'int' and 'bytes'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To the best of my understanding, looks good
/merge |
…i#16657) Follow up to rapidsai#16613 Supersedes rapidsai#16166 Improves remote-IO read performance when multiple files are read at once. Also enables partial IO for remote Parquet files (previously removed in `24.10` by rapidsai#16589). Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Lawrence Mitchell (https://github.com/wence-) URL: rapidsai#16657
Description
Follow up to #16613
Supersedes #16166
Improves remote-IO read performance when multiple files are read at once. Also enables partial IO for remote Parquet files (previously removed in
24.10
by #16589).Checklist