Skip to content

Commit

Permalink
Merge pull request #1122 from wprzytula/delete-lending-stream
Browse files Browse the repository at this point in the history
iterator: delete `TypedRowLendingStream` as unusable
  • Loading branch information
wprzytula authored Nov 12, 2024
2 parents 814de55 + f1dc5b6 commit 27ce6b5
Showing 1 changed file with 18 additions and 77 deletions.
95 changes: 18 additions & 77 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ impl QueryPager {

/// Type-checks the iterator against given type.
///
/// This is automatically called upon transforming [QueryPager] into [TypedRowLendingStream].
/// This is automatically called upon transforming [QueryPager] into [TypedRowStream].
/// Can be used with `next()` for manual deserialization. See `next()` for an example.
#[inline]
pub fn type_check<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>(
Expand All @@ -643,21 +643,6 @@ impl QueryPager {
RowT::type_check(self.column_specs().inner())
}

/// Casts the iterator to a given row type, enabling Stream'ed operations
/// on rows, which deserialize them on-the-fly to that given type.
/// It allows deserializing borrowed types, but hence cannot implement [Stream]
/// (because [Stream] is not lending).
/// Begins with performing type check.
#[inline]
pub fn rows_lending_stream<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>(
self,
) -> Result<TypedRowLendingStream<RowT>, TypeCheckError>
where
'frame: 'metadata,
{
TypedRowLendingStream::<RowT>::new(self)
}

/// Casts the iterator to a given row type, enabling [Stream]'ed operations
/// on rows, which deserialize them on-the-fly to that given type.
/// It only allows deserializing owned types, because [Stream] is not lending.
Expand All @@ -666,9 +651,7 @@ impl QueryPager {
pub fn rows_stream<RowT: 'static + for<'frame, 'metadata> DeserializeRow<'frame, 'metadata>>(
self,
) -> Result<TypedRowStream<RowT>, TypeCheckError> {
TypedRowLendingStream::<RowT>::new(self).map(|typed_row_lending_stream| TypedRowStream {
typed_row_lending_stream,
})
TypedRowStream::<RowT>::new(self)
}

/// Converts this iterator into an iterator over rows parsed as given type,
Expand Down Expand Up @@ -982,35 +965,20 @@ impl QueryPager {
}
}

/// Returned by [QueryPager::rows_lending_stream].
/// Returned by [QueryPager::rows_stream].
///
/// Does not implement [Stream], but permits deserialization of borrowed types.
/// Implements [Stream], but only permits deserialization of owned types.
/// To use [Stream] API (only accessible for owned types), use [QueryPager::rows_stream].
pub struct TypedRowLendingStream<RowT> {
pub struct TypedRowStream<RowT: 'static> {
raw_row_lending_stream: QueryPager,
_phantom: std::marker::PhantomData<RowT>,
}

impl<RowT> Unpin for TypedRowLendingStream<RowT> {}

impl<RowT> TypedRowLendingStream<RowT> {
/// If tracing was enabled, returns tracing ids of all finished page queries.
#[inline]
pub fn tracing_ids(&self) -> &[Uuid] {
self.raw_row_lending_stream.tracing_ids()
}

/// Returns specification of row columns
#[inline]
pub fn column_specs(&self) -> ColumnSpecs {
self.raw_row_lending_stream.column_specs()
}
}
impl<RowT> Unpin for TypedRowStream<RowT> {}

impl<'frame, 'metadata, RowT> TypedRowLendingStream<RowT>
impl<RowT> TypedRowStream<RowT>
where
'frame: 'metadata,
RowT: DeserializeRow<'frame, 'metadata>,
RowT: for<'frame, 'metadata> DeserializeRow<'frame, 'metadata>,
{
fn new(raw_stream: QueryPager) -> Result<Self, TypeCheckError> {
raw_stream.type_check::<RowT>()?;
Expand All @@ -1020,52 +988,19 @@ where
_phantom: Default::default(),
})
}

/// Stream-like next() implementation for TypedRowLendingStream.
///
/// It also works with borrowed types! For example, &str is supported.
/// However, this is not a Stream. To create a Stream, use `into_stream()`.
#[inline]
pub async fn next(&'frame mut self) -> Option<Result<RowT, QueryError>> {
self.raw_row_lending_stream.next().await.map(|res| {
res.and_then(|column_iterator| {
<RowT as DeserializeRow>::deserialize(column_iterator)
.map_err(|err| RowsParseError::from(err).into())
})
})
}

/// Stream-like try_next() implementation for TypedRowLendingStream.
///
/// It also works with borrowed types! For example, &str is supported.
/// However, this is not a Stream. To create a Stream, use `into_stream()`.
#[inline]
pub async fn try_next(&'frame mut self) -> Result<Option<RowT>, QueryError> {
self.next().await.transpose()
}
}

/// Returned by [QueryPager::rows_stream].
///
/// Implements [Stream], but only permits deserialization of owned types.
/// To use [Stream] API (only accessible for owned types), use [QueryPager::rows_stream].
pub struct TypedRowStream<RowT: 'static> {
typed_row_lending_stream: TypedRowLendingStream<RowT>,
}

impl<RowT> Unpin for TypedRowStream<RowT> {}

impl<RowT> TypedRowStream<RowT> {
/// If tracing was enabled, returns tracing ids of all finished page queries.
#[inline]
pub fn tracing_ids(&self) -> &[Uuid] {
self.typed_row_lending_stream.tracing_ids()
self.raw_row_lending_stream.tracing_ids()
}

/// Returns specification of row columns
#[inline]
pub fn column_specs(&self) -> ColumnSpecs {
self.typed_row_lending_stream.column_specs()
self.raw_row_lending_stream.column_specs()
}
}

Expand All @@ -1079,9 +1014,15 @@ where
type Item = Result<RowT, QueryError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut s = self.as_mut();
let next_fut = async {
self.raw_row_lending_stream.next().await.map(|res| {
res.and_then(|column_iterator| {
<RowT as DeserializeRow>::deserialize(column_iterator)
.map_err(|err| RowsParseError::from(err).into())
})
})
};

let next_fut = s.typed_row_lending_stream.next();
futures::pin_mut!(next_fut);
let value = ready_some_ok!(next_fut.poll(cx));
Poll::Ready(Some(Ok(value)))
Expand Down

0 comments on commit 27ce6b5

Please sign in to comment.