From dc4694e682ef3d34068231068a9270aafe4f49b7 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Mon, 13 Mar 2023 15:31:12 +0100 Subject: [PATCH] iterator: introduce poll_next_page MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit refactors the part responsible for acquiring the next page by the LegacyRowIterator to a different function. This change is a preparation necessary to support the new deserialization interface - there will be a method that can return deserialized type that borrows from the current iterator, and - for "lifetimes reasons" - acquiring the next page must be put into a separate method. Co-authored-by: Wojciech Przytuła --- scylla/src/transport/iterator.rs | 57 ++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 6db7f6666..c3a81c08c 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -89,41 +89,56 @@ impl LegacyRowIterator { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let mut s = self.as_mut(); - - if s.is_current_page_exhausted() { - let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx)); - let rows = match received_page - .rows - // As RowIteratorWorker manages paging itself, the paging state response - // returned to the user is always NoMorePages. It used to be so before - // the deserialization refactor, too. - .into_legacy_rows(PagingStateResponse::NoMorePages) - { - Ok(rows) => rows, - Err(err) => return Poll::Ready(Some(Err(err.into()))), - }; - s.current_page = rows; - s.current_row_idx = 0; - - if let Some(tracing_id) = received_page.tracing_id { - s.tracing_ids.push(tracing_id); - } + if self.as_ref().is_current_page_exhausted() { + ready_some_ok!(self.as_mut().poll_next_page(cx)); } + let mut s = self.as_mut(); + let idx = s.current_row_idx; if idx < s.current_page.rows.len() { let row = mem::take(&mut s.current_page.rows[idx]); s.current_row_idx += 1; return Poll::Ready(Some(Ok(row))); } - // We probably got a zero-sized page // Yield, but tell that we are ready cx.waker().wake_by_ref(); Poll::Pending } + /// Makes an attempt to acquire the next page (which may be empty). + /// + /// On success, returns Some(Ok()). + /// On failure, returns Some(Err()). + /// If there are no more pages, returns None. + fn poll_next_page<'r>( + mut self: Pin<&'r mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut s = self.as_mut(); + + let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx)); + let rows = match received_page + .rows + // As RowIteratorWorker manages paging itself, the paging state response + // returned to the user is always NoMorePages. It used to be so before + // the deserialization refactor, too. + .into_legacy_rows(PagingStateResponse::NoMorePages) + { + Ok(rows) => rows, + Err(err) => return Poll::Ready(Some(Err(err.into()))), + }; + s.current_page = rows; + s.current_row_idx = 0; + + if let Some(tracing_id) = received_page.tracing_id { + s.tracing_ids.push(tracing_id); + } + + Poll::Ready(Some(Ok(()))) + } + /// Converts this iterator into an iterator over rows parsed as given type pub fn into_typed(self) -> LegacyTypedRowIterator { LegacyTypedRowIterator {