-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace Connection::query_all with Connection::query_iter #645
Conversation
A common part of new_for_query and new_for_prepared_statement which handles spawning a task and returns a non-empty RowIterator is extracted to a common function. It will also help implement the query_iter method for Connection.
We have two places in the iterator worker code that attempt to send an empty page, and we will have one more in the upcoming commits, so let's abstract this pattern away to a method.
https://github.com/scylladb/scylla-rust-driver/actions/runs/4203056593/jobs/7291978661
Strange, sounds a little bit as if a node has crashed. It's hard to tell for sure because we don't print logs from the scylla docker containers at all. I'll try to improve the CI so that it does print the logs from the cluster. In the meantime, I'll just re-run and hope that the error was an unimportant fluke. |
7c7c095
to
7f76625
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good overall, but some old naming is still left.
7f76625
to
310f817
Compare
v2:
|
@cvybhu ping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks ok, left a few nits.
Given that, I'm not a fan of this change. The streams are very fancy, but IMO they are much harder to read and reason about. Previously there were a few clearly defined steps where one thing happened at a time and I could easily follow it step after step. With streams it feels different - it's like first we prepare what to do and then do everything at once.
With normal code there are usually a few intermediate variables with clearly defined types, but here all the chained stream combinators make it impossible to be sure about the type of things at each step. The error handling is also much more complicated, all the errors must be propagated in the stream, and then all the try_something
functions hopefully handle them, but I find it very hard to reason about. Then there are also the complications with .buffer_unordered(256)
IMO it would be better to keep using the query_all
function. It could be implemented using query_iter
, although I guess it doesn't make much sense.
I'm a big fan of KISS.
I might be complaining too much, maybe streams are just a Rust thing that I'm not used to yet, but my initial feeling is that they introduce new complexity that could easily be avoided.
In the PR description you mentioned that it will be hard to keep query_all
after the deserialization refactor, but why is that? Will it be impossible to simply collect the rows to a Vec?
/// A massively simplified version of the RowIteratorWorker. It does not have | ||
/// any complicated logic related to retries, it just fetches pages from | ||
/// a single connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to use a RetryPolicy
for retries. I think it wouldn't be too hard to incorporate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether it makes sense for queries that are limited to a single connection and only read data from a single node. For example: what should happen on RetryNextNode
?
I assume that your comment is about the changes in I'm not sure if I agree. The biggest advantage of streams here is that they reduce the peak memory use. Instead of loading all results from a query into memory, we only need to keep 1-2 pages in memory at a given time. I'll admit that the benefit is not huge because we are constructing the metadata anyway, but it's not only about making the code look more "fancy".
We are already using iterators extensively in the codebase, so I thought that streams would be familiar. They have a slightly different API due to their asynchronicity, but the concept is the same. I'm not sure what do you mean with complications with
With
I guess it will be possible to change query_all to just return a Vec of parsed rows, not a full QueryResponse. I just felt that solving the problem in this way will prevent us from utilizing the new interface to its full potential and will deny us from testing its ergonomics on our internal code. In general, the new deserialization interface will not return pre-parsed rows in the style of |
310f817
to
d1affd5
Compare
Adds the RowIterator::new_for_connection_query_iter method which will be the main workhorse of the Connection::query_iter method. It uses a new iterator worker which is a much simpler version of the proper, multi-connection iterator worker.
The new Connection::query_iter method will serve as a replacement for the query_all method. Contrary to query_all, query_iter will not materialize all query results as a single QueryResult but will allow to process the results row by row.
In order to use Connection::query_iter it is neccessary to pass self wrapped in an Arc - the iterator worker is spawned as a separate tokio task, so it needs to co-own the Connection.
Thanks to scylladb#628, it is now possible to provide an explicit path to a crate which provides necessary items for the FromRow macro. It is now possible to use this macro from the `scylla` crate itself, as we just need to point it to the `scylla-cql` crate instead. The NodeInfoRow struct is introduced to represent rows fetched from system.local and system.peers. It looks nicer than using a 5-element tuple and will reduce some noise when performing refactors in the commits that follow.
An enum expresses intent a little more clearly than a bool.
Conversion from NodeInfoRow to Peer is done across two lambdas applied to the untranslated_rows iterator. Let's simplify it a bit and extract this logic to a new separate function create_peer_from_row. It is recommended to hide whitespace changes when reviewing the diff of this commit.
Previously, query_peers used query_all to fetch topology information and then processed it using iterators. Now, this method is changed to use query_iter and rows are processed on the fly using asynchronous streams. In order to preserve the behavior of system.peers and system.local being queried in parallel, stream::select is used which merges results of both streams as they become available. Previously, we just put results of one query after results of the other one. Because of this, the order of Peers in the final result might be slightly different on each fetch - however, this order was never specified and could actually change e.g. when the control connection changes, so I think this should be OK.
There are other functions in topology.rs that use query_all, but they are less complicated and changing them to use query_iter is relatively straightforward. Therefore, we do it in a single commit.
This method is a sibling of query_all that was never actually used. In the future we will consider switching to prepared statements for internal queries, but 1) this will be done via not-yet-existing execute_iter and 2) we might do it by converting query_iter to use prepared statements. Therefore, there is no need to keep the dead code. This commit gets rid of execute_all and its all occurrences in the connection_query_all_execute_all_test (now renamed to: connection_query_all_test).
The query_iter method is meant to serve similar purpose to what query_all does currently - fetch data on control connection for internal queries - so it makes sense to adapt the (only) existing test for the new method. The last assertion (4) had to be removed because the new method does not check that the page size is set by the caller.
Now that we got rid of all uses of query_all in the code, we can finally get rid of it and of all the other code that depended on it - most notably, QueryResult::merge_with_next_page_res for which it won't be possible to translate it to the upcoming iterator-based deserialization interface.
d1affd5
to
012caa6
Compare
v3:
|
@cvybhu review ping |
For internal queries that can return a large number of rows (topology info, schema) we use currently use
Connection::query_all
to perform them. That method fetches results of the query in multiple pages and then returns it as a single combinedQueryResult
. This PR replaces it with a newConnection::query_iter
method which has semantics more similar to the iterator-based query methods from the public interface (Session::query_iter
,Session::execute_iter
).The motivations for this change are as follows:
QueryResult
will be hard to translate to use the upcoming iterator-based deserialization interface (Switch deserialization to an iterator-based interface to avoid allocations #462). Although the old interface will still be available as a deprecated fallback, I would like to adapt those methods as a form of dogfooding test for the new interface.Pre-review checklist
I added appropriateFixes:
annotations to PR description.