diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index a337fdfbd..6b77b1b46 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -47,7 +47,7 @@ use std::{ }; use super::errors::{ProtocolError, UseKeyspaceProtocolError}; -use super::iterator::LegacyRowIterator; +use super::iterator::{LegacyRowIterator, RawIterator}; use super::locator::tablets::{RawTablet, TabletParsingError}; use super::query_result::QueryResult; use super::session::AddressTranslator; @@ -1188,13 +1188,9 @@ impl Connection { .determine_consistency(self.config.default_consistency); let serial_consistency = query.config.serial_consistency.flatten(); - LegacyRowIterator::new_for_connection_query_iter( - query, - self, - consistency, - serial_consistency, - ) - .await + RawIterator::new_for_connection_query_iter(query, self, consistency, serial_consistency) + .await + .map(RawIterator::into_legacy) } /// Executes a prepared statements and fetches its results over multiple pages, using @@ -1209,7 +1205,7 @@ impl Connection { .determine_consistency(self.config.default_consistency); let serial_consistency = prepared_statement.config.serial_consistency.flatten(); - LegacyRowIterator::new_for_connection_execute_iter( + RawIterator::new_for_connection_execute_iter( prepared_statement, values, self, @@ -1217,6 +1213,7 @@ impl Connection { serial_consistency, ) .await + .map(RawIterator::into_legacy) } #[allow(dead_code)] diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 28d3c8dd3..07d5f94d8 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -1,7 +1,6 @@ //! Iterators over rows returned by paged queries use std::future::Future; -use std::mem; use std::net::SocketAddr; use std::ops::ControlFlow; use std::pin::Pin; @@ -9,21 +8,25 @@ use std::sync::Arc; use std::task::{Context, Poll}; use futures::Stream; -use scylla_cql::frame::request::query::PagingStateResponse; +use scylla_cql::frame::frame_errors::RowsParseError; use scylla_cql::frame::response::result::RawRows; use scylla_cql::frame::response::NonErrorResponse; +use scylla_cql::types::deserialize::result::RawRowsLendingIterator; +use scylla_cql::types::deserialize::row::{ColumnIterator, DeserializeRow}; +use scylla_cql::types::deserialize::TypeCheckError; use scylla_cql::types::serialize::row::SerializedValues; use std::result::Result; use thiserror::Error; use tokio::sync::mpsc; use super::execution_profile::ExecutionProfileInner; +use super::query_result::ColumnSpecs; use super::session::RequestSpan; use crate::cql_to_rust::{FromRow, FromRowError}; use crate::frame::response::{ result, - result::{ColumnSpec, Row, Rows}, + result::{ColumnSpec, Row}, }; use crate::history::{self, HistoryListener}; use crate::statement::{prepared_statement::PreparedStatement, query::Query}; @@ -535,46 +538,99 @@ where } } -/// Iterator over rows returned by paged queries\ -/// Allows to easily access rows without worrying about handling multiple pages -pub struct LegacyRowIterator { - current_row_idx: usize, - current_page: Rows, +/// An intermediate object that allows to construct an iterator over a query +/// that is asynchronously paged in the background. +/// +/// Before the results can be processed in a convenient way, the RawIterator +/// needs to be cast into a typed iterator. This is done by use of `into_typed()` method. +/// As the method is generic over the target type, the turbofish syntax +/// can come in handy there, e.g. `raw_iter.into_typed::<(i32, &str, Uuid)>()`. +/// +/// A pre-0.14.0 interface is also available, although deprecated: +/// `into_legacy()` method converts RawIterator to LegacyRowIterator, +/// enabling Stream'ed operation on rows being eagerly deserialized +/// to a middle-man Row type. This is inefficient, especially if +/// the Row type is not the intended target type. +pub struct RawIterator { + current_page: RawRowsLendingIterator, page_receiver: mpsc::Receiver>, tracing_ids: Vec, } -/// Fetching pages is asynchronous so `RowIterator` does not implement the `Iterator` trait.\ -/// Instead it uses the asynchronous `Stream` trait -impl Stream for LegacyRowIterator { - type Item = Result; +/// RawIterator is not an iterator or a stream! However, it implements +/// a `next()` method that returns a ColumnIterator<'r>, which can be used +/// to manually deserialize a row. +/// The ColumnIterator borrows from the RawIterator, and the futures::Stream trait +/// does not allow for such a pattern. Lending streams are not a thing yet. +impl RawIterator { + /// Returns the next item (ColumnIterator) from the stream. + + /// This can be used with `type_check() for manual deserialization - see example below. + /// + /// This is not a part of the Stream interface because the returned iterator + /// borrows from self. + /// + /// This is cancel-safe. + /// + /// # Example + /// + /// // FIXME: change `text` to `rust` when Session API is migrated to the new deserialization framework. + /// ```text + /// # use scylla::Session; + /// # use std::error::Error; + /// # async fn check_only_compiles(session: &Session) -> Result<(), Box> { + /// use futures::stream::StreamExt; + /// use scylla::deserialize::DeserializeRow; + /// + /// let mut raw_iter = session + /// .query_iter("SELECT a, b FROM ks.t", &[]) + /// .await?; + /// + /// // Remember to type check! Failure to call type_check() can result + /// // in panics upon deserialization. + /// raw_iter.type_check::<(i32, i32)>()?; + /// + /// // Now that we type-checked, we can manually deserialize from RawIterator. + /// while let Some(column_iterator) = raw_iter.next().await.transpose()? { + /// let (a, b) = <(i32, i32) as DeserializeRow>::deserialize(column_iterator)?; + /// println!("a, b: {}, {}", a, b); + /// } + /// # Ok(()) + /// # } + /// ``` + pub async fn next(&mut self) -> Option> { + let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await; + match res { + Some(Ok(())) => {} + Some(Err(err)) => return Some(Err(err)), + None => return None, + } - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_next_internal(cx) + // We are guaranteed here to have a non-empty page, so unwrap + Some( + self.current_page + .next() + .unwrap() + .map_err(|e| RowsParseError::from(e).into()), + ) } -} -impl LegacyRowIterator { - fn poll_next_internal( - mut self: Pin<&mut Self>, + /// Tries to acquire a non-empty page, if current page is exhausted. + fn poll_fill_page<'r>( + mut self: Pin<&'r mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - if self.as_ref().is_current_page_exhausted() { - ready_some_ok!(self.as_mut().poll_next_page(cx)); + ) -> Poll>> { + if !self.is_current_page_exhausted() { + return Poll::Ready(Some(Ok(()))); } - - let mut s = self.as_mut(); - - let idx = s.current_row_idx; - if idx < s.current_page.rows.len() { - let row = mem::take(&mut s.current_page.rows[idx]); - s.current_row_idx += 1; - return Poll::Ready(Some(Ok(row))); + ready_some_ok!(self.as_mut().poll_next_page(cx)); + if self.is_current_page_exhausted() { + // Try again later + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(Some(Ok(()))) } - // We probably got a zero-sized page - // Yield, but tell that we are ready - cx.waker().wake_by_ref(); - Poll::Pending } /// Makes an attempt to acquire the next page (which may be empty). @@ -589,18 +645,9 @@ impl LegacyRowIterator { let mut s = self.as_mut(); let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx)); - let rows = match received_page - .rows - // As RowIteratorWorker manages paging itself, the paging state response - // returned to the user is always NoMorePages. It used to be so before - // the deserialization refactor, too. - .into_legacy_rows(PagingStateResponse::NoMorePages) - { - Ok(rows) => rows, - Err(err) => return Poll::Ready(Some(Err(err.into()))), - }; - s.current_page = rows; - s.current_row_idx = 0; + let raw_rows_with_deserialized_metadata = + received_page.rows.deserialize_owned_metadata()?; + s.current_page = RawRowsLendingIterator::new(raw_rows_with_deserialized_metadata); if let Some(tracing_id) = received_page.tracing_id { s.tracing_ids.push(tracing_id); @@ -609,12 +656,30 @@ impl LegacyRowIterator { Poll::Ready(Some(Ok(()))) } - /// Converts this iterator into an iterator over rows parsed as given type - pub fn into_typed(self) -> LegacyTypedRowIterator { - LegacyTypedRowIterator { - row_iterator: self, - phantom_data: Default::default(), - } + /// Type-checks the iterator against given type. + /// + /// This is automatically called upon transforming [RawIterator] into [TypedRowIterator]. + /// Can be used with `next()` for manual deserialization. See `next()` for an example. + #[inline] + pub fn type_check<'frame, RowT: DeserializeRow<'frame>>(&self) -> Result<(), TypeCheckError> { + RowT::type_check(self.column_specs().inner()) + } + + /// Casts the iterator to a given row type, enabling Stream'ed operations + /// on rows, which deserialize them in-fly to that given type. + /// Begins with performing type check. + #[inline] + pub fn into_typed<'frame, RowT: DeserializeRow<'frame>>( + self, + ) -> Result, TypeCheckError> { + TypedRowIterator::::new(self) + } + + /// Converts this iterator into an iterator over rows parsed as given type, + /// using the legacy deserialization framework. + #[inline] + pub fn into_legacy(self) -> LegacyRowIterator { + LegacyRowIterator { raw_iterator: self } } pub(crate) async fn new_for_query( @@ -888,16 +953,14 @@ impl LegacyRowIterator { // to the channel (the PageSendAttemptedProof helps enforce this) // - That future is polled in a tokio::task which isn't going to be // cancelled - let pages_received = receiver.recv().await.unwrap()?; - let rows = pages_received - .rows - .into_legacy_rows(PagingStateResponse::NoMorePages)?; + let page_received = receiver.recv().await.unwrap()?; + let raw_rows_with_deserialized_metadata = + page_received.rows.deserialize_owned_metadata()?; Ok(Self { - current_row_idx: 0, - current_page: rows, + current_page: RawRowsLendingIterator::new(raw_rows_with_deserialized_metadata), page_receiver: receiver, - tracing_ids: if let Some(tracing_id) = pages_received.tracing_id { + tracing_ids: if let Some(tracing_id) = page_received.tracing_id { vec![tracing_id] } else { Vec::new() @@ -906,17 +969,165 @@ impl LegacyRowIterator { } /// If tracing was enabled returns tracing ids of all finished page queries - pub fn get_tracing_ids(&self) -> &[Uuid] { + #[inline] + pub fn tracing_ids(&self) -> &[Uuid] { &self.tracing_ids } /// Returns specification of row columns - pub fn get_column_specs(&self) -> &[ColumnSpec] { - self.current_page.metadata.col_specs() + #[inline] + pub fn column_specs(&self) -> ColumnSpecs<'_> { + ColumnSpecs::new(self.current_page.metadata().col_specs()) } fn is_current_page_exhausted(&self) -> bool { - self.current_row_idx >= self.current_page.rows.len() + self.current_page.rows_remaining() == 0 + } +} + +/// Returned by [RawIterator::into_typed]. +/// +/// Does not implement [Stream], but permits deserialization of borrowed types. +/// To use [Stream] API (only accessible for owned types), use [TypedRowIterator::into_stream]. +pub struct TypedRowIterator { + raw_iterator: RawIterator, + _phantom: std::marker::PhantomData, +} + +impl Unpin for TypedRowIterator {} + +impl<'frame, RowT> TypedRowIterator +where + RowT: DeserializeRow<'frame>, +{ + fn new(raw_iterator: RawIterator) -> Result { + raw_iterator.type_check::()?; + + Ok(Self { + raw_iterator, + _phantom: Default::default(), + }) + } + + /// If tracing was enabled, returns tracing ids of all finished page queries. + #[inline] + pub fn tracing_ids(&self) -> &[Uuid] { + self.raw_iterator.tracing_ids() + } + + /// Returns specification of row columns + #[inline] + pub fn column_specs(&self) -> ColumnSpecs { + self.raw_iterator.column_specs() + } + + /// Stream-like next() implementation for TypedRowIterator. + /// + /// It also works with borrowed types! For example, &str is supported. + /// However, this is not a Stream. To create a Stream, use `into_stream()`. + #[inline] + pub async fn next(&'frame mut self) -> Option> { + self.raw_iterator.next().await.map(|res| { + res.and_then(|column_iterator| { + ::deserialize(column_iterator) + .map_err(|err| RowsParseError::from(err).into()) + }) + }) + } + + /// Stream-like try_next() implementation for TypedRowIterator. + /// + /// It also works with borrowed types! For example, &str is supported. + /// However, this is not a Stream. To create a Stream, use `into_stream()`. + #[inline] + pub async fn try_next(&'frame mut self) -> Result, QueryError> { + self.next().await.transpose() + } +} + +impl TypedRowIterator { + /// Transforms [TypedRowIterator] into [TypedRowStream]. + /// + /// If you deserialize to owned types only, use this method to unleash power of the `Stream` API. + /// This operation involves no runtime cost, but it limits the iterator to owned types only. + /// Therefore, if you want to work with borrowed types (e.g., to avoid heap allocations), + /// you can't use the `Stream` trait. + pub fn into_stream(self) -> TypedRowStream { + TypedRowStream { + typed_row_iterator: self, + } + } +} + +/// Returned by [TypedRowIterator::into_stream]. +/// +/// Implements [Stream], but only permits deserialization of owned types. +pub struct TypedRowStream { + typed_row_iterator: TypedRowIterator, +} + +impl Unpin for TypedRowStream {} + +/// Stream implementation for TypedRowStream. +/// +/// It only works with owned types! For example, &str is not supported. +impl Stream for TypedRowStream +where + RowT: for<'r> DeserializeRow<'r>, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut s = self.as_mut(); + + let next_fut = s.typed_row_iterator.next(); + futures::pin_mut!(next_fut); + let value = ready_some_ok!(next_fut.poll(cx)); + Poll::Ready(Some(Ok(value))) + } +} + +/// Iterator over rows returned by paged queries. +/// +/// Allows to easily access rows without worrying about handling multiple pages. +pub struct LegacyRowIterator { + raw_iterator: RawIterator, +} + +impl Stream for LegacyRowIterator { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut s = self.as_mut(); + + let next_fut = s.raw_iterator.next(); + futures::pin_mut!(next_fut); + + let next_column_iter = ready_some_ok!(next_fut.poll(cx)); + + let next_ready_row = + Row::deserialize(next_column_iter).map_err(|e| RowsParseError::from(e).into()); + + Poll::Ready(Some(next_ready_row)) + } +} + +impl LegacyRowIterator { + /// If tracing was enabled returns tracing ids of all finished page queries + pub fn get_tracing_ids(&self) -> &[Uuid] { + self.raw_iterator.tracing_ids() + } + + /// Returns specification of row columns + pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] { + self.raw_iterator.column_specs().inner() + } + + pub fn into_typed(self) -> LegacyTypedRowIterator { + LegacyTypedRowIterator { + row_iterator: self, + _phantom_data: Default::default(), + } } } @@ -925,16 +1136,18 @@ impl LegacyRowIterator { /// Returned by `RowIterator::into_typed` pub struct LegacyTypedRowIterator { row_iterator: LegacyRowIterator, - phantom_data: std::marker::PhantomData, + _phantom_data: std::marker::PhantomData, } impl LegacyTypedRowIterator { /// If tracing was enabled returns tracing ids of all finished page queries + #[inline] pub fn get_tracing_ids(&self) -> &[Uuid] { self.row_iterator.get_tracing_ids() } /// Returns specification of row columns + #[inline] pub fn get_column_specs(&self) -> &[ColumnSpec] { self.row_iterator.get_column_specs() } diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 0ec99ef41..4315909a1 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -4,6 +4,7 @@ use crate::batch::batch_values; #[cfg(feature = "cloud")] use crate::cloud::CloudConfig; +use crate::LegacyQueryResult; use crate::history; use crate::history::HistoryListener; @@ -42,7 +43,8 @@ use super::connection::QueryResponse; use super::connection::SslConfig; use super::errors::TracingProtocolError; use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner}; -use super::legacy_query_result::{LegacyQueryResult, MaybeFirstRowTypedError}; +use super::iterator::RawIterator; +use super::legacy_query_result::MaybeFirstRowTypedError; #[cfg(feature = "cloud")] use super::node::CloudEndpoint; use super::node::KnownNode; @@ -910,20 +912,21 @@ impl Session { .access(); if values.is_empty() { - LegacyRowIterator::new_for_query( + RawIterator::new_for_query( query, execution_profile, self.cluster.get_data(), self.metrics.clone(), ) .await + .map(RawIterator::into_legacy) } else { - // Making LegacyRowIterator::new_for_query work with values is too hard (if even possible) + // Making RawIterator::new_for_query work with values is too hard (if even possible) // so instead of sending one prepare to a specific connection on each iterator query, // we fully prepare a statement beforehand. let prepared = self.prepare(query).await?; let values = prepared.serialize_values(&values)?; - LegacyRowIterator::new_for_prepared_statement(PreparedIteratorConfig { + RawIterator::new_for_prepared_statement(PreparedIteratorConfig { prepared, values, execution_profile, @@ -931,6 +934,7 @@ impl Session { metrics: self.metrics.clone(), }) .await + .map(RawIterator::into_legacy) } } @@ -1313,7 +1317,7 @@ impl Session { .unwrap_or_else(|| self.get_default_execution_profile_handle()) .access(); - LegacyRowIterator::new_for_prepared_statement(PreparedIteratorConfig { + RawIterator::new_for_prepared_statement(PreparedIteratorConfig { prepared, values: serialized_values, execution_profile, @@ -1321,6 +1325,7 @@ impl Session { metrics: self.metrics.clone(), }) .await + .map(RawIterator::into_legacy) } /// Perform a batch request.\