Skip to content

Commit

Permalink
iterator: introduce poll_next_page
Browse files Browse the repository at this point in the history
This commit refactors the part responsible for acquiring the next page
by the LegacyRowIterator to a different function. This change is a
preparation necessary to support the new deserialization interface -
there will be a method that can return deserialized type that borrows
from the current iterator, and - for "lifetimes reasons" - acquiring the
next page must be put into a separate method.

Co-authored-by: Wojciech Przytuła <[email protected]>
  • Loading branch information
piodul and wprzytula committed Oct 16, 2024
1 parent 7692b8a commit dc4694e
Showing 1 changed file with 36 additions and 21 deletions.
57 changes: 36 additions & 21 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,41 +89,56 @@ impl LegacyRowIterator {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Row, QueryError>>> {
let mut s = self.as_mut();

if s.is_current_page_exhausted() {
let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx));
let rows = match received_page
.rows
// As RowIteratorWorker manages paging itself, the paging state response
// returned to the user is always NoMorePages. It used to be so before
// the deserialization refactor, too.
.into_legacy_rows(PagingStateResponse::NoMorePages)
{
Ok(rows) => rows,
Err(err) => return Poll::Ready(Some(Err(err.into()))),
};
s.current_page = rows;
s.current_row_idx = 0;

if let Some(tracing_id) = received_page.tracing_id {
s.tracing_ids.push(tracing_id);
}
if self.as_ref().is_current_page_exhausted() {
ready_some_ok!(self.as_mut().poll_next_page(cx));
}

let mut s = self.as_mut();

let idx = s.current_row_idx;
if idx < s.current_page.rows.len() {
let row = mem::take(&mut s.current_page.rows[idx]);
s.current_row_idx += 1;
return Poll::Ready(Some(Ok(row)));
}

// We probably got a zero-sized page
// Yield, but tell that we are ready
cx.waker().wake_by_ref();
Poll::Pending
}

/// Makes an attempt to acquire the next page (which may be empty).
///
/// On success, returns Some(Ok()).
/// On failure, returns Some(Err()).
/// If there are no more pages, returns None.
fn poll_next_page<'r>(
mut self: Pin<&'r mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<(), QueryError>>> {
let mut s = self.as_mut();

let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx));
let rows = match received_page
.rows
// As RowIteratorWorker manages paging itself, the paging state response
// returned to the user is always NoMorePages. It used to be so before
// the deserialization refactor, too.
.into_legacy_rows(PagingStateResponse::NoMorePages)
{
Ok(rows) => rows,
Err(err) => return Poll::Ready(Some(Err(err.into()))),
};
s.current_page = rows;
s.current_row_idx = 0;

if let Some(tracing_id) = received_page.tracing_id {
s.tracing_ids.push(tracing_id);
}

Poll::Ready(Some(Ok(())))
}

/// Converts this iterator into an iterator over rows parsed as given type
pub fn into_typed<RowT: FromRow>(self) -> LegacyTypedRowIterator<RowT> {
LegacyTypedRowIterator {
Expand Down

0 comments on commit dc4694e

Please sign in to comment.