-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix scalability limitations of current implementation #46
Comments
One approach would be to bring back the original shuffle code and then add a mechanism for reading data from another node by implementing a gRPC based service (such as Arrow Flight protocol) in each worker. |
Would it be possible to use a ray generator for the shuffle writer and reader execution plans? Rather than serializing all of the data to disk in the writer, and reading it in the reader, we yield record batches in the Task. Then ray sends those in the in memory object store (spilling if required). Then we can pipeline the execution of the writer and readers so that stages need not wait for completion of dependent stages to start processing. Would this work? Willing to work on this if you think its a valid approach. |
It would be nice if ray facilitated streaming via network between Actors in addition to exchange of messages via the object store. If its only to exchange streams of batches would Arrow Flight be necessary or could something simpler be used, assuming the Actors exist only ephemerally. EDIT: Is the serialization of Arrow to/from the object store zero copy? If so, then that might be good enough to start. That wasn't clear to me when reading the ray docs which mention zero copy for numpy data structures. EDIT 2: It seems that the pickle protocol used has out of band support for zero copy serialization for arrow: https://peps.python.org/pep-0574/ |
Query execution works by building up a tree of futures to execute each partition in each query stage.
The root node of each query stage is a
RayShuffleWriterExec
. It works by executing its child plan and fetching all of the results into memory (this is already not scalable because there could be millions or billions of rows). It then concatenates all of the batches into one large batch, which is returned. This large batch is then stored in Ray's object store and will be fetched by the next query stage.The original disk-based shuffle mechanism (that was removed in #19) did not suffer from any of these issues because query results were streamed to disk in the writer and then streamed back out in the reader. However, this approach assumes that all workers have access to the same local file system.
The text was updated successfully, but these errors were encountered: