Skip to content

Commit

Permalink
iterator: adjust to the new deserialization framework
Browse files Browse the repository at this point in the history
This commit makes the RowIteratorWorker pass raw rows to the main tokio
task, instead of the eagerly deserialized Rows.

The equivalent of the old RowIterator is now QueryPager. It cannot be
conveniently iterated on, as it does not have any information about the
column types. It features (yet not exposes) a `next()` method for
deserializing consecutive `ColumnIterator`s. Users cannot manually
perform deserialization using this method directly, because will be able
to utilise the preferred (typed) API that will be added in the next
commit. If they prefer manual deserialization, they will be be able to
specify `ColumnIterator` as the deserialized row type, and proceed with
manual deserialization from there.

The legacy iterators are preserved by wrapping around QueryPager.

Co-authored-by: Wojciech Przytuła <[email protected]>
  • Loading branch information
piodul and wprzytula committed Nov 6, 2024
1 parent 00c1754 commit ac7121b
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 77 deletions.
15 changes: 6 additions & 9 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use std::{
};

use super::errors::{ProtocolError, UseKeyspaceProtocolError};
use super::iterator::LegacyRowIterator;
use super::iterator::{LegacyRowIterator, QueryPager};
use super::locator::tablets::{RawTablet, TabletParsingError};
use super::query_result::QueryResult;
use super::session::AddressTranslator;
Expand Down Expand Up @@ -1188,13 +1188,9 @@ impl Connection {
.determine_consistency(self.config.default_consistency);
let serial_consistency = query.config.serial_consistency.flatten();

LegacyRowIterator::new_for_connection_query_iter(
query,
self,
consistency,
serial_consistency,
)
.await
QueryPager::new_for_connection_query_iter(query, self, consistency, serial_consistency)
.await
.map(QueryPager::into_legacy)
}

/// Executes a prepared statements and fetches its results over multiple pages, using
Expand All @@ -1209,14 +1205,15 @@ impl Connection {
.determine_consistency(self.config.default_consistency);
let serial_consistency = prepared_statement.config.serial_consistency.flatten();

LegacyRowIterator::new_for_connection_execute_iter(
QueryPager::new_for_connection_execute_iter(
prepared_statement,
values,
self,
consistency,
serial_consistency,
)
.await
.map(QueryPager::into_legacy)
}

#[allow(dead_code)]
Expand Down
188 changes: 125 additions & 63 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
//! Iterators over rows returned by paged queries
use std::future::Future;
use std::mem;
use std::net::SocketAddr;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::Stream;
use scylla_cql::frame::request::query::PagingStateResponse;
use scylla_cql::frame::frame_errors::RowsParseError;
use scylla_cql::frame::response::result::RawMetadataAndRawRows;
use scylla_cql::frame::response::NonErrorResponse;
use scylla_cql::types::deserialize::result::RawRowLendingIterator;
use scylla_cql::types::deserialize::row::{ColumnIterator, DeserializeRow};
use scylla_cql::types::serialize::row::SerializedValues;
use std::result::Result;
use thiserror::Error;
use tokio::sync::mpsc;

use super::execution_profile::ExecutionProfileInner;
use super::query_result::ColumnSpecs;
use super::session::RequestSpan;
use crate::cql_to_rust::{FromRow, FromRowError};

use crate::frame::response::{
result,
result::{ColumnSpec, Row, Rows},
result::{ColumnSpec, Row},
};
use crate::history::{self, HistoryListener};
use crate::statement::{prepared_statement::PreparedStatement, query::Query};
Expand Down Expand Up @@ -535,46 +537,70 @@ where
}
}

/// Iterator over rows returned by paged queries\
/// Allows to easily access rows without worrying about handling multiple pages
pub struct LegacyRowIterator {
current_row_idx: usize,
current_page: Rows,
/// An intermediate object that allows to construct an iterator over a query
/// that is asynchronously paged in the background.
///
/// TODO: implement and describe the new API
///
/// A pre-0.15.0 interface is also available, although deprecated:
/// `into_legacy()` method converts QueryPager to LegacyRowIterator,
/// enabling Stream'ed operation on rows being eagerly deserialized
/// to the middle-man [Row] type. This is inefficient, especially if
/// [Row] is not the intended target type.
pub struct QueryPager {
current_page: RawRowLendingIterator,
page_receiver: mpsc::Receiver<Result<ReceivedPage, QueryError>>,
tracing_ids: Vec<Uuid>,
}

/// Fetching pages is asynchronous so `RowIterator` does not implement the `Iterator` trait.\
/// Instead it uses the asynchronous `Stream` trait
impl Stream for LegacyRowIterator {
type Item = Result<Row, QueryError>;
// QueryPager is not an iterator or a stream! However, it implements
// a `next()` method that returns a [ColumnIterator], which can be used
// to manually deserialize a row.
// The `ColumnIterator` borrows from the `QueryPager`, and the [futures::Stream] trait
// does not allow for such a pattern. Lending streams are not a thing yet.
impl QueryPager {
/// Returns the next item (`ColumnIterator`) from the stream.
///
/// This can be used with `type_check() for manual deserialization - see example below.
///
/// This is not a part of the `Stream` interface because the returned iterator
/// borrows from self.
///
/// This is cancel-safe.
async fn next(&mut self) -> Option<Result<ColumnIterator, QueryError>> {
let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await;
match res {
Some(Ok(())) => {}
Some(Err(err)) => return Some(Err(err)),
None => return None,
}

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next_internal(cx)
// We are guaranteed here to have a non-empty page, so unwrap
Some(
self.current_page
.next()
.unwrap()
.map_err(|e| RowsParseError::from(e).into()),
)
}
}

impl LegacyRowIterator {
fn poll_next_internal(
mut self: Pin<&mut Self>,
/// Tries to acquire a non-empty page, if current page is exhausted.
fn poll_fill_page<'r>(
mut self: Pin<&'r mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Row, QueryError>>> {
if self.as_ref().is_current_page_exhausted() {
ready_some_ok!(self.as_mut().poll_next_page(cx));
) -> Poll<Option<Result<(), QueryError>>> {
if !self.is_current_page_exhausted() {
return Poll::Ready(Some(Ok(())));
}

let mut s = self.as_mut();

let idx = s.current_row_idx;
if idx < s.current_page.rows.len() {
let row = mem::take(&mut s.current_page.rows[idx]);
s.current_row_idx += 1;
return Poll::Ready(Some(Ok(row)));
ready_some_ok!(self.as_mut().poll_next_page(cx));
if self.is_current_page_exhausted() {
// We most likely got a zero-sized page.
// Try again later.
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(Some(Ok(())))
}
// We probably got a zero-sized page
// Yield, but tell that we are ready
cx.waker().wake_by_ref();
Poll::Pending
}

/// Makes an attempt to acquire the next page (which may be empty).
Expand All @@ -589,18 +615,8 @@ impl LegacyRowIterator {
let mut s = self.as_mut();

let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx));
let rows = match received_page
.rows
// As RowIteratorWorker manages paging itself, the paging state response
// returned to the user is always NoMorePages. It used to be so before
// the deserialization refactor, too.
.into_legacy_rows(PagingStateResponse::NoMorePages)
{
Ok(rows) => rows,
Err(err) => return Poll::Ready(Some(Err(err.into()))),
};
s.current_page = rows;
s.current_row_idx = 0;
let raw_rows_with_deserialized_metadata = received_page.rows.deserialize_metadata()?;
s.current_page = RawRowLendingIterator::new(raw_rows_with_deserialized_metadata);

if let Some(tracing_id) = received_page.tracing_id {
s.tracing_ids.push(tracing_id);
Expand All @@ -609,12 +625,13 @@ impl LegacyRowIterator {
Poll::Ready(Some(Ok(())))
}

/// Converts this iterator into an iterator over rows parsed as given type
pub fn into_typed<RowT: FromRow>(self) -> LegacyTypedRowIterator<RowT> {
LegacyTypedRowIterator {
row_iterator: self,
phantom_data: Default::default(),
}
/// Converts this iterator into an iterator over rows parsed as given type,
/// using the legacy deserialization framework.
/// This is inefficient, because all rows are being eagerly deserialized
/// to a middle-man [Row] type.
#[inline]
pub fn into_legacy(self) -> LegacyRowIterator {
LegacyRowIterator { raw_iterator: self }
}

pub(crate) async fn new_for_query(
Expand Down Expand Up @@ -888,16 +905,13 @@ impl LegacyRowIterator {
// to the channel (the PageSendAttemptedProof helps enforce this)
// - That future is polled in a tokio::task which isn't going to be
// cancelled
let pages_received = receiver.recv().await.unwrap()?;
let rows = pages_received
.rows
.into_legacy_rows(PagingStateResponse::NoMorePages)?;
let page_received = receiver.recv().await.unwrap()?;
let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?;

Ok(Self {
current_row_idx: 0,
current_page: rows,
current_page: RawRowLendingIterator::new(raw_rows_with_deserialized_metadata),
page_receiver: receiver,
tracing_ids: if let Some(tracing_id) = pages_received.tracing_id {
tracing_ids: if let Some(tracing_id) = page_received.tracing_id {
vec![tracing_id]
} else {
Vec::new()
Expand All @@ -906,17 +920,63 @@ impl LegacyRowIterator {
}

/// If tracing was enabled returns tracing ids of all finished page queries
pub fn get_tracing_ids(&self) -> &[Uuid] {
#[inline]
pub fn tracing_ids(&self) -> &[Uuid] {
&self.tracing_ids
}

/// Returns specification of row columns
pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] {
self.current_page.metadata.inner().col_specs()
#[inline]
pub fn column_specs(&self) -> ColumnSpecs<'_> {
ColumnSpecs::new(self.current_page.metadata().col_specs())
}

fn is_current_page_exhausted(&self) -> bool {
self.current_row_idx >= self.current_page.rows.len()
self.current_page.rows_remaining() == 0
}
}

/// Iterator over rows returned by paged queries.
///
/// Allows to easily access rows without worrying about handling multiple pages.
pub struct LegacyRowIterator {
raw_iterator: QueryPager,
}

impl Stream for LegacyRowIterator {
type Item = Result<Row, QueryError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut s = self.as_mut();

let next_fut = s.raw_iterator.next();
futures::pin_mut!(next_fut);

let next_column_iter = ready_some_ok!(next_fut.poll(cx));

let next_ready_row =
Row::deserialize(next_column_iter).map_err(|e| RowsParseError::from(e).into());

Poll::Ready(Some(next_ready_row))
}
}

impl LegacyRowIterator {
/// If tracing was enabled returns tracing ids of all finished page queries
pub fn get_tracing_ids(&self) -> &[Uuid] {
self.raw_iterator.tracing_ids()
}

/// Returns specification of row columns
pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] {
self.raw_iterator.column_specs().inner()
}

pub fn into_typed<RowT>(self) -> LegacyTypedRowIterator<RowT> {
LegacyTypedRowIterator {
row_iterator: self,
_phantom_data: Default::default(),
}
}
}

Expand All @@ -925,16 +985,18 @@ impl LegacyRowIterator {
/// Returned by `RowIterator::into_typed`
pub struct LegacyTypedRowIterator<RowT> {
row_iterator: LegacyRowIterator,
phantom_data: std::marker::PhantomData<RowT>,
_phantom_data: std::marker::PhantomData<RowT>,
}

impl<RowT> LegacyTypedRowIterator<RowT> {
/// If tracing was enabled returns tracing ids of all finished page queries
#[inline]
pub fn get_tracing_ids(&self) -> &[Uuid] {
self.row_iterator.get_tracing_ids()
}

/// Returns specification of row columns
#[inline]
pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] {
self.row_iterator.get_column_specs()
}
Expand Down
15 changes: 10 additions & 5 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::batch::batch_values;
#[cfg(feature = "cloud")]
use crate::cloud::CloudConfig;
use crate::LegacyQueryResult;

use crate::history;
use crate::history::HistoryListener;
Expand Down Expand Up @@ -42,7 +43,8 @@ use super::connection::QueryResponse;
use super::connection::SslConfig;
use super::errors::TracingProtocolError;
use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner};
use super::legacy_query_result::{LegacyQueryResult, MaybeFirstRowTypedError};
use super::iterator::QueryPager;
use super::legacy_query_result::MaybeFirstRowTypedError;
#[cfg(feature = "cloud")]
use super::node::CloudEndpoint;
use super::node::{InternalKnownNode, KnownNode};
Expand Down Expand Up @@ -885,27 +887,29 @@ impl Session {
.access();

if values.is_empty() {
LegacyRowIterator::new_for_query(
QueryPager::new_for_query(
query,
execution_profile,
self.cluster.get_data(),
self.metrics.clone(),
)
.await
.map(QueryPager::into_legacy)
} else {
// Making LegacyRowIterator::new_for_query work with values is too hard (if even possible)
// Making QueryPager::new_for_query work with values is too hard (if even possible)
// so instead of sending one prepare to a specific connection on each iterator query,
// we fully prepare a statement beforehand.
let prepared = self.prepare(query).await?;
let values = prepared.serialize_values(&values)?;
LegacyRowIterator::new_for_prepared_statement(PreparedIteratorConfig {
QueryPager::new_for_prepared_statement(PreparedIteratorConfig {
prepared,
values,
execution_profile,
cluster_data: self.cluster.get_data(),
metrics: self.metrics.clone(),
})
.await
.map(QueryPager::into_legacy)
}
}

Expand Down Expand Up @@ -1288,14 +1292,15 @@ impl Session {
.unwrap_or_else(|| self.get_default_execution_profile_handle())
.access();

LegacyRowIterator::new_for_prepared_statement(PreparedIteratorConfig {
QueryPager::new_for_prepared_statement(PreparedIteratorConfig {
prepared,
values: serialized_values,
execution_profile,
cluster_data: self.cluster.get_data(),
metrics: self.metrics.clone(),
})
.await
.map(QueryPager::into_legacy)
}

/// Perform a batch request.\
Expand Down

0 comments on commit ac7121b

Please sign in to comment.