Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scalability limitations of current implementation #46

Open
andygrove opened this issue Nov 16, 2024 · 3 comments · May be fixed by #48
Open

Fix scalability limitations of current implementation #46

andygrove opened this issue Nov 16, 2024 · 3 comments · May be fixed by #48

Comments

@andygrove
Copy link
Member

Query execution works by building up a tree of futures to execute each partition in each query stage.

The root node of each query stage is a RayShuffleWriterExec. It works by executing its child plan and fetching all of the results into memory (this is already not scalable because there could be millions or billions of rows). It then concatenates all of the batches into one large batch, which is returned. This large batch is then stored in Ray's object store and will be fetched by the next query stage.

The original disk-based shuffle mechanism (that was removed in #19) did not suffer from any of these issues because query results were streamed to disk in the writer and then streamed back out in the reader. However, this approach assumes that all workers have access to the same local file system.

@andygrove
Copy link
Member Author

One approach would be to bring back the original shuffle code and then add a mechanism for reading data from another node by implementing a gRPC based service (such as Arrow Flight protocol) in each worker.

@robtandy
Copy link

robtandy commented Dec 27, 2024

Would it be possible to use a ray generator for the shuffle writer and reader execution plans?

Rather than serializing all of the data to disk in the writer, and reading it in the reader, we yield record batches in the Task. Then ray sends those in the in memory object store (spilling if required).

Then we can pipeline the execution of the writer and readers so that stages need not wait for completion of dependent stages to start processing.

Would this work?

Willing to work on this if you think its a valid approach.

@robtandy
Copy link

robtandy commented Dec 27, 2024

It would be nice if ray facilitated streaming via network between Actors in addition to exchange of messages via the object store. If its only to exchange streams of batches would Arrow Flight be necessary or could something simpler be used, assuming the Actors exist only ephemerally.

EDIT: Is the serialization of Arrow to/from the object store zero copy? If so, then that might be good enough to start. That wasn't clear to me when reading the ray docs which mention zero copy for numpy data structures.

EDIT 2: It seems that the pickle protocol used has out of band support for zero copy serialization for arrow: https://peps.python.org/pep-0574/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants