diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 160819f6c..a46de9d42 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -634,7 +634,7 @@ impl QueryPager { /// Type-checks the iterator against given type. /// - /// This is automatically called upon transforming [QueryPager] into [TypedRowLendingStream]. + /// This is automatically called upon transforming [QueryPager] into [TypedRowStream]. /// Can be used with `next()` for manual deserialization. See `next()` for an example. #[inline] pub fn type_check<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>( @@ -643,21 +643,6 @@ impl QueryPager { RowT::type_check(self.column_specs().inner()) } - /// Casts the iterator to a given row type, enabling Stream'ed operations - /// on rows, which deserialize them on-the-fly to that given type. - /// It allows deserializing borrowed types, but hence cannot implement [Stream] - /// (because [Stream] is not lending). - /// Begins with performing type check. - #[inline] - pub fn rows_lending_stream<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>( - self, - ) -> Result, TypeCheckError> - where - 'frame: 'metadata, - { - TypedRowLendingStream::::new(self) - } - /// Casts the iterator to a given row type, enabling [Stream]'ed operations /// on rows, which deserialize them on-the-fly to that given type. /// It only allows deserializing owned types, because [Stream] is not lending. @@ -666,9 +651,7 @@ impl QueryPager { pub fn rows_stream DeserializeRow<'frame, 'metadata>>( self, ) -> Result, TypeCheckError> { - TypedRowLendingStream::::new(self).map(|typed_row_lending_stream| TypedRowStream { - typed_row_lending_stream, - }) + TypedRowStream::::new(self) } /// Converts this iterator into an iterator over rows parsed as given type, @@ -982,35 +965,20 @@ impl QueryPager { } } -/// Returned by [QueryPager::rows_lending_stream]. +/// Returned by [QueryPager::rows_stream]. /// -/// Does not implement [Stream], but permits deserialization of borrowed types. +/// Implements [Stream], but only permits deserialization of owned types. /// To use [Stream] API (only accessible for owned types), use [QueryPager::rows_stream]. -pub struct TypedRowLendingStream { +pub struct TypedRowStream { raw_row_lending_stream: QueryPager, _phantom: std::marker::PhantomData, } -impl Unpin for TypedRowLendingStream {} - -impl TypedRowLendingStream { - /// If tracing was enabled, returns tracing ids of all finished page queries. - #[inline] - pub fn tracing_ids(&self) -> &[Uuid] { - self.raw_row_lending_stream.tracing_ids() - } - - /// Returns specification of row columns - #[inline] - pub fn column_specs(&self) -> ColumnSpecs { - self.raw_row_lending_stream.column_specs() - } -} +impl Unpin for TypedRowStream {} -impl<'frame, 'metadata, RowT> TypedRowLendingStream +impl TypedRowStream where - 'frame: 'metadata, - RowT: DeserializeRow<'frame, 'metadata>, + RowT: for<'frame, 'metadata> DeserializeRow<'frame, 'metadata>, { fn new(raw_stream: QueryPager) -> Result { raw_stream.type_check::()?; @@ -1020,52 +988,19 @@ where _phantom: Default::default(), }) } - - /// Stream-like next() implementation for TypedRowLendingStream. - /// - /// It also works with borrowed types! For example, &str is supported. - /// However, this is not a Stream. To create a Stream, use `into_stream()`. - #[inline] - pub async fn next(&'frame mut self) -> Option> { - self.raw_row_lending_stream.next().await.map(|res| { - res.and_then(|column_iterator| { - ::deserialize(column_iterator) - .map_err(|err| RowsParseError::from(err).into()) - }) - }) - } - - /// Stream-like try_next() implementation for TypedRowLendingStream. - /// - /// It also works with borrowed types! For example, &str is supported. - /// However, this is not a Stream. To create a Stream, use `into_stream()`. - #[inline] - pub async fn try_next(&'frame mut self) -> Result, QueryError> { - self.next().await.transpose() - } -} - -/// Returned by [QueryPager::rows_stream]. -/// -/// Implements [Stream], but only permits deserialization of owned types. -/// To use [Stream] API (only accessible for owned types), use [QueryPager::rows_stream]. -pub struct TypedRowStream { - typed_row_lending_stream: TypedRowLendingStream, } -impl Unpin for TypedRowStream {} - impl TypedRowStream { /// If tracing was enabled, returns tracing ids of all finished page queries. #[inline] pub fn tracing_ids(&self) -> &[Uuid] { - self.typed_row_lending_stream.tracing_ids() + self.raw_row_lending_stream.tracing_ids() } /// Returns specification of row columns #[inline] pub fn column_specs(&self) -> ColumnSpecs { - self.typed_row_lending_stream.column_specs() + self.raw_row_lending_stream.column_specs() } } @@ -1079,9 +1014,15 @@ where type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut s = self.as_mut(); + let next_fut = async { + self.raw_row_lending_stream.next().await.map(|res| { + res.and_then(|column_iterator| { + ::deserialize(column_iterator) + .map_err(|err| RowsParseError::from(err).into()) + }) + }) + }; - let next_fut = s.typed_row_lending_stream.next(); futures::pin_mut!(next_fut); let value = ready_some_ok!(next_fut.poll(cx)); Poll::Ready(Some(Ok(value)))