diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 1a7fce16dc..188984393c 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -47,7 +47,7 @@ use std::{ }; use super::errors::{ProtocolError, UseKeyspaceProtocolError}; -use super::iterator::LegacyRowIterator; +use super::iterator::{LegacyRowIterator, QueryPager}; use super::locator::tablets::{RawTablet, TabletParsingError}; use super::query_result::QueryResult; use super::session::AddressTranslator; @@ -1188,13 +1188,9 @@ impl Connection { .determine_consistency(self.config.default_consistency); let serial_consistency = query.config.serial_consistency.flatten(); - LegacyRowIterator::new_for_connection_query_iter( - query, - self, - consistency, - serial_consistency, - ) - .await + QueryPager::new_for_connection_query_iter(query, self, consistency, serial_consistency) + .await + .map(QueryPager::into_legacy) } /// Executes a prepared statements and fetches its results over multiple pages, using @@ -1209,7 +1205,7 @@ impl Connection { .determine_consistency(self.config.default_consistency); let serial_consistency = prepared_statement.config.serial_consistency.flatten(); - LegacyRowIterator::new_for_connection_execute_iter( + QueryPager::new_for_connection_execute_iter( prepared_statement, values, self, @@ -1217,6 +1213,7 @@ impl Connection { serial_consistency, ) .await + .map(QueryPager::into_legacy) } #[allow(dead_code)] diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index ea79306c32..701b184c9c 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -1,7 +1,6 @@ //! Iterators over rows returned by paged queries use std::future::Future; -use std::mem; use std::net::SocketAddr; use std::ops::ControlFlow; use std::pin::Pin; @@ -9,21 +8,24 @@ use std::sync::Arc; use std::task::{Context, Poll}; use futures::Stream; -use scylla_cql::frame::request::query::PagingStateResponse; +use scylla_cql::frame::frame_errors::RowsParseError; use scylla_cql::frame::response::result::RawMetadataAndRawRows; use scylla_cql::frame::response::NonErrorResponse; +use scylla_cql::types::deserialize::result::RawRowLendingIterator; +use scylla_cql::types::deserialize::row::{ColumnIterator, DeserializeRow}; use scylla_cql::types::serialize::row::SerializedValues; use std::result::Result; use thiserror::Error; use tokio::sync::mpsc; use super::execution_profile::ExecutionProfileInner; +use super::query_result::ColumnSpecs; use super::session::RequestSpan; use crate::cql_to_rust::{FromRow, FromRowError}; use crate::frame::response::{ result, - result::{ColumnSpec, Row, Rows}, + result::{ColumnSpec, Row}, }; use crate::history::{self, HistoryListener}; use crate::statement::{prepared_statement::PreparedStatement, query::Query}; @@ -535,46 +537,70 @@ where } } -/// Iterator over rows returned by paged queries\ -/// Allows to easily access rows without worrying about handling multiple pages -pub struct LegacyRowIterator { - current_row_idx: usize, - current_page: Rows, +/// An intermediate object that allows to construct an iterator over a query +/// that is asynchronously paged in the background. +/// +/// TODO: implement and describe the new API +/// +/// A pre-0.15.0 interface is also available, although deprecated: +/// `into_legacy()` method converts QueryPager to LegacyRowIterator, +/// enabling Stream'ed operation on rows being eagerly deserialized +/// to the middle-man [Row] type. This is inefficient, especially if +/// [Row] is not the intended target type. +pub struct QueryPager { + current_page: RawRowLendingIterator, page_receiver: mpsc::Receiver>, tracing_ids: Vec, } -/// Fetching pages is asynchronous so `RowIterator` does not implement the `Iterator` trait.\ -/// Instead it uses the asynchronous `Stream` trait -impl Stream for LegacyRowIterator { - type Item = Result; +// QueryPager is not an iterator or a stream! However, it implements +// a `next()` method that returns a [ColumnIterator], which can be used +// to manually deserialize a row. +// The `ColumnIterator` borrows from the `QueryPager`, and the [futures::Stream] trait +// does not allow for such a pattern. Lending streams are not a thing yet. +impl QueryPager { + /// Returns the next item (`ColumnIterator`) from the stream. + /// + /// This can be used with `type_check() for manual deserialization - see example below. + /// + /// This is not a part of the `Stream` interface because the returned iterator + /// borrows from self. + /// + /// This is cancel-safe. + async fn next(&mut self) -> Option> { + let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await; + match res { + Some(Ok(())) => {} + Some(Err(err)) => return Some(Err(err)), + None => return None, + } - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_next_internal(cx) + // We are guaranteed here to have a non-empty page, so unwrap + Some( + self.current_page + .next() + .unwrap() + .map_err(|e| RowsParseError::from(e).into()), + ) } -} -impl LegacyRowIterator { - fn poll_next_internal( - mut self: Pin<&mut Self>, + /// Tries to acquire a non-empty page, if current page is exhausted. + fn poll_fill_page<'r>( + mut self: Pin<&'r mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - if self.as_ref().is_current_page_exhausted() { - ready_some_ok!(self.as_mut().poll_next_page(cx)); + ) -> Poll>> { + if !self.is_current_page_exhausted() { + return Poll::Ready(Some(Ok(()))); } - - let mut s = self.as_mut(); - - let idx = s.current_row_idx; - if idx < s.current_page.rows.len() { - let row = mem::take(&mut s.current_page.rows[idx]); - s.current_row_idx += 1; - return Poll::Ready(Some(Ok(row))); + ready_some_ok!(self.as_mut().poll_next_page(cx)); + if self.is_current_page_exhausted() { + // We most likely got a zero-sized page. + // Try again later. + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(Some(Ok(()))) } - // We probably got a zero-sized page - // Yield, but tell that we are ready - cx.waker().wake_by_ref(); - Poll::Pending } /// Makes an attempt to acquire the next page (which may be empty). @@ -589,18 +615,8 @@ impl LegacyRowIterator { let mut s = self.as_mut(); let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx)); - let rows = match received_page - .rows - // As RowIteratorWorker manages paging itself, the paging state response - // returned to the user is always NoMorePages. It used to be so before - // the deserialization refactor, too. - .into_legacy_rows(PagingStateResponse::NoMorePages) - { - Ok(rows) => rows, - Err(err) => return Poll::Ready(Some(Err(err.into()))), - }; - s.current_page = rows; - s.current_row_idx = 0; + let raw_rows_with_deserialized_metadata = received_page.rows.deserialize_metadata()?; + s.current_page = RawRowLendingIterator::new(raw_rows_with_deserialized_metadata); if let Some(tracing_id) = received_page.tracing_id { s.tracing_ids.push(tracing_id); @@ -609,12 +625,13 @@ impl LegacyRowIterator { Poll::Ready(Some(Ok(()))) } - /// Converts this iterator into an iterator over rows parsed as given type - pub fn into_typed(self) -> LegacyTypedRowIterator { - LegacyTypedRowIterator { - row_iterator: self, - phantom_data: Default::default(), - } + /// Converts this iterator into an iterator over rows parsed as given type, + /// using the legacy deserialization framework. + /// This is inefficient, because all rows are being eagerly deserialized + /// to a middle-man [Row] type. + #[inline] + pub fn into_legacy(self) -> LegacyRowIterator { + LegacyRowIterator { raw_iterator: self } } pub(crate) async fn new_for_query( @@ -888,16 +905,13 @@ impl LegacyRowIterator { // to the channel (the PageSendAttemptedProof helps enforce this) // - That future is polled in a tokio::task which isn't going to be // cancelled - let pages_received = receiver.recv().await.unwrap()?; - let rows = pages_received - .rows - .into_legacy_rows(PagingStateResponse::NoMorePages)?; + let page_received = receiver.recv().await.unwrap()?; + let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?; Ok(Self { - current_row_idx: 0, - current_page: rows, + current_page: RawRowLendingIterator::new(raw_rows_with_deserialized_metadata), page_receiver: receiver, - tracing_ids: if let Some(tracing_id) = pages_received.tracing_id { + tracing_ids: if let Some(tracing_id) = page_received.tracing_id { vec![tracing_id] } else { Vec::new() @@ -906,17 +920,63 @@ impl LegacyRowIterator { } /// If tracing was enabled returns tracing ids of all finished page queries - pub fn get_tracing_ids(&self) -> &[Uuid] { + #[inline] + pub fn tracing_ids(&self) -> &[Uuid] { &self.tracing_ids } /// Returns specification of row columns - pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] { - self.current_page.metadata.inner().col_specs() + #[inline] + pub fn column_specs(&self) -> ColumnSpecs<'_> { + ColumnSpecs::new(self.current_page.metadata().col_specs()) } fn is_current_page_exhausted(&self) -> bool { - self.current_row_idx >= self.current_page.rows.len() + self.current_page.rows_remaining() == 0 + } +} + +/// Iterator over rows returned by paged queries. +/// +/// Allows to easily access rows without worrying about handling multiple pages. +pub struct LegacyRowIterator { + raw_iterator: QueryPager, +} + +impl Stream for LegacyRowIterator { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut s = self.as_mut(); + + let next_fut = s.raw_iterator.next(); + futures::pin_mut!(next_fut); + + let next_column_iter = ready_some_ok!(next_fut.poll(cx)); + + let next_ready_row = + Row::deserialize(next_column_iter).map_err(|e| RowsParseError::from(e).into()); + + Poll::Ready(Some(next_ready_row)) + } +} + +impl LegacyRowIterator { + /// If tracing was enabled returns tracing ids of all finished page queries + pub fn get_tracing_ids(&self) -> &[Uuid] { + self.raw_iterator.tracing_ids() + } + + /// Returns specification of row columns + pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] { + self.raw_iterator.column_specs().inner() + } + + pub fn into_typed(self) -> LegacyTypedRowIterator { + LegacyTypedRowIterator { + row_iterator: self, + _phantom_data: Default::default(), + } } } @@ -925,16 +985,18 @@ impl LegacyRowIterator { /// Returned by `RowIterator::into_typed` pub struct LegacyTypedRowIterator { row_iterator: LegacyRowIterator, - phantom_data: std::marker::PhantomData, + _phantom_data: std::marker::PhantomData, } impl LegacyTypedRowIterator { /// If tracing was enabled returns tracing ids of all finished page queries + #[inline] pub fn get_tracing_ids(&self) -> &[Uuid] { self.row_iterator.get_tracing_ids() } /// Returns specification of row columns + #[inline] pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] { self.row_iterator.get_column_specs() } diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 717ff19fee..be491aa242 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -4,6 +4,7 @@ use crate::batch::batch_values; #[cfg(feature = "cloud")] use crate::cloud::CloudConfig; +use crate::LegacyQueryResult; use crate::history; use crate::history::HistoryListener; @@ -42,7 +43,8 @@ use super::connection::QueryResponse; use super::connection::SslConfig; use super::errors::TracingProtocolError; use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner}; -use super::legacy_query_result::{LegacyQueryResult, MaybeFirstRowTypedError}; +use super::iterator::QueryPager; +use super::legacy_query_result::MaybeFirstRowTypedError; #[cfg(feature = "cloud")] use super::node::CloudEndpoint; use super::node::{InternalKnownNode, KnownNode}; @@ -885,20 +887,21 @@ impl Session { .access(); if values.is_empty() { - LegacyRowIterator::new_for_query( + QueryPager::new_for_query( query, execution_profile, self.cluster.get_data(), self.metrics.clone(), ) .await + .map(QueryPager::into_legacy) } else { - // Making LegacyRowIterator::new_for_query work with values is too hard (if even possible) + // Making QueryPager::new_for_query work with values is too hard (if even possible) // so instead of sending one prepare to a specific connection on each iterator query, // we fully prepare a statement beforehand. let prepared = self.prepare(query).await?; let values = prepared.serialize_values(&values)?; - LegacyRowIterator::new_for_prepared_statement(PreparedIteratorConfig { + QueryPager::new_for_prepared_statement(PreparedIteratorConfig { prepared, values, execution_profile, @@ -906,6 +909,7 @@ impl Session { metrics: self.metrics.clone(), }) .await + .map(QueryPager::into_legacy) } } @@ -1288,7 +1292,7 @@ impl Session { .unwrap_or_else(|| self.get_default_execution_profile_handle()) .access(); - LegacyRowIterator::new_for_prepared_statement(PreparedIteratorConfig { + QueryPager::new_for_prepared_statement(PreparedIteratorConfig { prepared, values: serialized_values, execution_profile, @@ -1296,6 +1300,7 @@ impl Session { metrics: self.metrics.clone(), }) .await + .map(QueryPager::into_legacy) } /// Perform a batch request.\