Skip to content

Commit

Permalink
iterator: create module for legacy abstractions
Browse files Browse the repository at this point in the history
Not to clutter iterator.rs with legacy abstractions, they are moved to
a new module: `legacy`.

I recommend viewing this commit **without** whitespace changes.
  • Loading branch information
wprzytula committed Nov 6, 2024
1 parent ac7121b commit ffaf69a
Showing 1 changed file with 81 additions and 72 deletions.
153 changes: 81 additions & 72 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ impl QueryPager {
/// to a middle-man [Row] type.
#[inline]
pub fn into_legacy(self) -> LegacyRowIterator {
LegacyRowIterator { raw_iterator: self }
LegacyRowIterator::new(self)
}

pub(crate) async fn new_for_query(
Expand Down Expand Up @@ -936,97 +936,106 @@ impl QueryPager {
}
}

/// Iterator over rows returned by paged queries.
///
/// Allows to easily access rows without worrying about handling multiple pages.
pub struct LegacyRowIterator {
raw_iterator: QueryPager,
}
mod legacy {
use super::*;

impl Stream for LegacyRowIterator {
type Item = Result<Row, QueryError>;
/// Iterator over rows returned by paged queries.
///
/// Allows to easily access rows without worrying about handling multiple pages.
pub struct LegacyRowIterator {
raw_stream: QueryPager,
}

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut s = self.as_mut();
impl Stream for LegacyRowIterator {
type Item = Result<Row, QueryError>;

let next_fut = s.raw_iterator.next();
futures::pin_mut!(next_fut);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut s = self.as_mut();

let next_column_iter = ready_some_ok!(next_fut.poll(cx));
let next_fut = s.raw_stream.next();
futures::pin_mut!(next_fut);

let next_ready_row =
Row::deserialize(next_column_iter).map_err(|e| RowsParseError::from(e).into());
let next_column_iter = ready_some_ok!(next_fut.poll(cx));

Poll::Ready(Some(next_ready_row))
}
}
let next_ready_row =
Row::deserialize(next_column_iter).map_err(|e| RowsParseError::from(e).into());

impl LegacyRowIterator {
/// If tracing was enabled returns tracing ids of all finished page queries
pub fn get_tracing_ids(&self) -> &[Uuid] {
self.raw_iterator.tracing_ids()
Poll::Ready(Some(next_ready_row))
}
}

/// Returns specification of row columns
pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] {
self.raw_iterator.column_specs().inner()
}
impl LegacyRowIterator {
pub(super) fn new(raw_stream: QueryPager) -> Self {
Self { raw_stream }
}

pub fn into_typed<RowT>(self) -> LegacyTypedRowIterator<RowT> {
LegacyTypedRowIterator {
row_iterator: self,
_phantom_data: Default::default(),
/// If tracing was enabled returns tracing ids of all finished page queries
pub fn get_tracing_ids(&self) -> &[Uuid] {
self.raw_stream.tracing_ids()
}
}
}

/// Iterator over rows returned by paged queries
/// where each row is parsed as the given type\
/// Returned by `RowIterator::into_typed`
pub struct LegacyTypedRowIterator<RowT> {
row_iterator: LegacyRowIterator,
_phantom_data: std::marker::PhantomData<RowT>,
}
/// Returns specification of row columns
pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] {
self.raw_stream.column_specs().inner()
}

impl<RowT> LegacyTypedRowIterator<RowT> {
/// If tracing was enabled returns tracing ids of all finished page queries
#[inline]
pub fn get_tracing_ids(&self) -> &[Uuid] {
self.row_iterator.get_tracing_ids()
pub fn into_typed<RowT>(self) -> LegacyTypedRowIterator<RowT> {
LegacyTypedRowIterator {
row_iterator: self,
_phantom_data: Default::default(),
}
}
}

/// Returns specification of row columns
#[inline]
pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] {
self.row_iterator.get_column_specs()
/// Iterator over rows returned by paged queries
/// where each row is parsed as the given type\
/// Returned by `RowIterator::into_typed`
pub struct LegacyTypedRowIterator<RowT> {
row_iterator: LegacyRowIterator,
_phantom_data: std::marker::PhantomData<RowT>,
}
}

/// Couldn't get next typed row from the iterator
#[derive(Error, Debug, Clone)]
pub enum NextRowError {
/// Query to fetch next page has failed
#[error(transparent)]
QueryError(#[from] QueryError),
impl<RowT> LegacyTypedRowIterator<RowT> {
/// If tracing was enabled returns tracing ids of all finished page queries
#[inline]
pub fn get_tracing_ids(&self) -> &[Uuid] {
self.row_iterator.get_tracing_ids()
}

/// Parsing values in row as given types failed
#[error(transparent)]
FromRowError(#[from] FromRowError),
}
/// Returns specification of row columns
#[inline]
pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] {
self.row_iterator.get_column_specs()
}
}

/// Fetching pages is asynchronous so `LegacyTypedRowIterator` does not implement the `Iterator` trait.\
/// Instead it uses the asynchronous `Stream` trait
impl<RowT: FromRow> Stream for LegacyTypedRowIterator<RowT> {
type Item = Result<RowT, NextRowError>;
/// Couldn't get next typed row from the iterator
#[derive(Error, Debug, Clone)]
pub enum NextRowError {
/// Query to fetch next page has failed
#[error(transparent)]
QueryError(#[from] QueryError),

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut s = self.as_mut();
/// Parsing values in row as given types failed
#[error(transparent)]
FromRowError(#[from] FromRowError),
}

/// Fetching pages is asynchronous so `LegacyTypedRowIterator` does not implement the `Iterator` trait.\
/// Instead it uses the asynchronous `Stream` trait
impl<RowT: FromRow> Stream for LegacyTypedRowIterator<RowT> {
type Item = Result<RowT, NextRowError>;

let next_row = ready_some_ok!(Pin::new(&mut s.row_iterator).poll_next(cx));
let typed_row_res = RowT::from_row(next_row).map_err(|e| e.into());
Poll::Ready(Some(typed_row_res))
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut s = self.as_mut();

let next_row = ready_some_ok!(Pin::new(&mut s.row_iterator).poll_next(cx));
let typed_row_res = RowT::from_row(next_row).map_err(|e| e.into());
Poll::Ready(Some(typed_row_res))
}
}
}

// LegacyTypedRowIterator can be moved freely for any RowT so it's Unpin
impl<RowT> Unpin for LegacyTypedRowIterator<RowT> {}
// LegacyTypedRowIterator can be moved freely for any RowT so it's Unpin
impl<RowT> Unpin for LegacyTypedRowIterator<RowT> {}
}
pub use legacy::{LegacyRowIterator, LegacyTypedRowIterator, NextRowError};

0 comments on commit ffaf69a

Please sign in to comment.