Skip to content

Commit

Permalink
codewide: rename {query,execute}_paged to %_single_page
Browse files Browse the repository at this point in the history
The name %_paged has long confused users. The new name indicates
explicitly that only a single page is fetched with a single call to
those methods.
  • Loading branch information
wprzytula committed Aug 22, 2024
1 parent 6d760d8 commit 988aeb6
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 84 deletions.
4 changes: 2 additions & 2 deletions docs/source/queries/paged.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ use scylla::query::Query;
let paged_query = Query::new("SELECT a, b, c FROM ks.t").with_page_size(6);
let res1 = session.query(paged_query.clone(), &[]).await?;
let res2 = session
.query_paged(paged_query.clone(), &[], res1.paging_state)
.query_single_page(paged_query.clone(), &[], res1.paging_state)
.await?;
# Ok(())
# }
Expand All @@ -145,7 +145,7 @@ let paged_prepared = session
.await?;
let res1 = session.execute(&paged_prepared, &[]).await?;
let res2 = session
.execute_paged(&paged_prepared, &[], res1.paging_state)
.execute_single_page(&paged_prepared, &[], res1.paging_state)
.await?;
# Ok(())
# }
Expand Down
10 changes: 4 additions & 6 deletions examples/select-paging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ async fn main() -> Result<()> {
// Manual paging in a loop, unprepared statement.
let mut paging_state = PagingState::start();
loop {
let res = session
.query_paged(paged_query.clone(), &[], paging_state)
let (res, paging_state_response) = session
.query_single_page(paged_query.clone(), &[], paging_state)
.await?;

let paging_state_response = res.paging_state_response.clone();
println!(
"Paging state: {:#?} ({} rows)",
paging_state_response,
Expand Down Expand Up @@ -82,11 +81,10 @@ async fn main() -> Result<()> {
// Manual paging in a loop, prepared statement.
let mut paging_state = PagingState::default();
loop {
let res = session
.execute_paged(&paged_prepared, &[], paging_state)
let (res, paging_state_response) = session
.execute_single_page(&paged_prepared, &[], paging_state)
.await?;

let paging_state_response = res.paging_state_response.clone();
println!(
"Paging state from the prepared statement execution: {:#?} ({} rows)",
paging_state_response,
Expand Down
4 changes: 2 additions & 2 deletions scylla/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
//! # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
//! // Insert an int and text into the table
//! session
//! .query(
//! .query_unpaged(
//! "INSERT INTO ks.tab (a, b) VALUES(?, ?)",
//! (2_i32, "some text")
//! )
Expand All @@ -76,7 +76,7 @@
//!
//! // Read rows containing an int and text
//! let rows_opt = session
//! .query("SELECT a, b FROM ks.tab", &[])
//! .query_unpaged("SELECT a, b FROM ks.tab", &[])
//! .await?
//! .rows;
//!
Expand Down
20 changes: 10 additions & 10 deletions scylla/src/transport/caching_session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::batch::{Batch, BatchStatement};
use crate::prepared_statement::PreparedStatement;
use crate::query::Query;
use crate::statement::PagingState;
use crate::statement::{PagingState, PagingStateResponse};
use crate::transport::errors::QueryError;
use crate::transport::iterator::RowIterator;
use crate::transport::partitioner::PartitionerName;
Expand Down Expand Up @@ -91,17 +91,17 @@ where
self.session.execute_iter(prepared, values).await
}

/// Does the same thing as [`Session::execute_paged`] but uses the prepared statement cache
pub async fn execute_paged(
/// Does the same thing as [`Session::execute_single_page`] but uses the prepared statement cache
pub async fn execute_single_page(
&self,
query: impl Into<Query>,
values: impl SerializeRow,
paging_state: PagingState,
) -> Result<QueryResult, QueryError> {
) -> Result<(QueryResult, PagingStateResponse), QueryError> {
let query = query.into();
let prepared = self.add_prepared_statement_owned(query).await?;
self.session
.execute_paged(&prepared, values, paging_state)
.execute_single_page(&prepared, values, paging_state)
.await
}

Expand Down Expand Up @@ -325,7 +325,7 @@ mod tests {

/// Checks that the same prepared statement is reused when executing the same query twice
#[tokio::test]
async fn test_execute_cached() {
async fn test_execute_unpaged_cached() {
setup_tracing();
let session = create_caching_session().await;
let result = session
Expand Down Expand Up @@ -364,16 +364,16 @@ mod tests {
assert_eq!(1, session.cache.len());
}

/// Checks that caching works with execute_paged
/// Checks that caching works with execute_single_page
#[tokio::test]
async fn test_execute_paged_cached() {
async fn test_execute_single_page_cached() {
setup_tracing();
let session = create_caching_session().await;

assert!(session.cache.is_empty());

let result = session
.execute_paged("select * from test_table", &[], PagingState::start())
let (result, _paging_state) = session
.execute_single_page("select * from test_table", &[], PagingState::start())
.await
.unwrap();

Expand Down
79 changes: 53 additions & 26 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ impl QueryResponse {
})
}

pub(crate) fn into_query_result_and_paging_state(
self,
) -> Result<(QueryResult, PagingStateResponse), QueryError> {
self.into_non_error_query_response()?
.into_query_result_and_paging_state()
}

pub(crate) fn into_query_result(self) -> Result<QueryResult, QueryError> {
self.into_non_error_query_response()?.into_query_result()
}
Expand All @@ -257,7 +264,9 @@ impl NonErrorQueryResponse {
}
}

pub(crate) fn into_query_result(self) -> Result<QueryResult, QueryError> {
pub(crate) fn into_query_result_and_paging_state(
self,
) -> Result<(QueryResult, PagingStateResponse), QueryError> {
let (rows, paging_state, col_specs, serialized_size) = match self.response {
NonErrorResponse::Result(result::Result::Rows(rs)) => (
Some(rs.rows),
Expand All @@ -273,14 +282,29 @@ impl NonErrorQueryResponse {
}
};

Ok(QueryResult {
rows,
warnings: self.warnings,
tracing_id: self.tracing_id,
paging_state_response: paging_state,
col_specs,
serialized_size,
})
Ok((
QueryResult {
rows,
warnings: self.warnings,
tracing_id: self.tracing_id,
col_specs,
serialized_size,
},
paging_state,
))
}

pub(crate) fn into_query_result(self) -> Result<QueryResult, QueryError> {
let (result, paging_state) = self.into_query_result_and_paging_state()?;

if !paging_state.finished() {
panic!(
"Internal driver API misuse or a server bug: nonfinished paging state\
discarded by `NonErrorQueryResponse::into_query_result`"
);
}

Ok(result)
}
}
#[cfg(feature = "ssl")]
Expand Down Expand Up @@ -802,7 +826,7 @@ impl Connection {
&self,
query: impl Into<Query>,
page_size: PageSize,
) -> Result<QueryResult, QueryError> {
) -> Result<(QueryResult, PagingStateResponse), QueryError> {
let query: Query = query.into();

// This method is used only for driver internal queries, so no need to consult execution profile here.
Expand All @@ -826,7 +850,7 @@ impl Connection {
consistency: Consistency,
serial_consistency: Option<SerialConsistency>,
page_size: PageSize,
) -> Result<QueryResult, QueryError> {
) -> Result<(QueryResult, PagingStateResponse), QueryError> {
let query: Query = query.into();
self.query_with_consistency(
&query,
Expand All @@ -836,7 +860,7 @@ impl Connection {
PagingState::start(),
)
.await?
.into_query_result()
.into_query_result_and_paging_state()
}

pub(crate) async fn query(
Expand Down Expand Up @@ -1199,21 +1223,24 @@ impl Connection {
}

pub(crate) async fn fetch_schema_version(&self) -> Result<Uuid, QueryError> {
let (version_id,) = self
let (result, paging_state) = self
.query_single_page(LOCAL_VERSION, PageSize::default())
.await?
.single_row_typed()
.map_err(|err| match err {
SingleRowTypedError::RowsExpected(_) => {
QueryError::ProtocolError("Version query returned not rows")
}
SingleRowTypedError::BadNumberOfRows(_) => {
QueryError::ProtocolError("system.local query returned a wrong number of rows")
}
SingleRowTypedError::FromRowError(_) => {
QueryError::ProtocolError("Row is not uuid type as it should be")
}
})?;
.await?;
if !paging_state.finished() {
warn!("Fetching local schema version returned nonfinished paging state");
}

let (version_id,): (Uuid,) = result.single_row_typed().map_err(|err| match err {
SingleRowTypedError::RowsExpected(_) => {
QueryError::ProtocolError("Version query returned not rows")
}
SingleRowTypedError::BadNumberOfRows(_) => {
QueryError::ProtocolError("system.local query returned a wrong number of rows")
}
SingleRowTypedError::FromRowError(_) => {
QueryError::ProtocolError("Row is not uuid type as it should be")
}
})?;
Ok(version_id)
}

Expand Down
5 changes: 0 additions & 5 deletions scylla/src/transport/query_result.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::frame::response::cql_to_rust::{FromRow, FromRowError};
use crate::frame::response::result::ColumnSpec;
use crate::frame::response::result::Row;
use crate::statement::PagingStateResponse;
use crate::transport::session::{IntoTypedRows, TypedRowIter};
use thiserror::Error;
use uuid::Uuid;
Expand All @@ -19,8 +18,6 @@ pub struct QueryResult {
pub warnings: Vec<String>,
/// CQL Tracing uuid - can only be Some if tracing is enabled for this query
pub tracing_id: Option<Uuid>,
/// Paging state returned from the server
pub paging_state_response: PagingStateResponse,
/// Column specification returned from the server
pub col_specs: Vec<ColumnSpec>,
/// The original size of the serialized rows in request
Expand All @@ -33,7 +30,6 @@ impl QueryResult {
rows: None,
warnings: Vec::new(),
tracing_id: None,
paging_state_response: PagingStateResponse::NoMorePages,
col_specs: Vec::new(),
serialized_size: 0,
}
Expand Down Expand Up @@ -318,7 +314,6 @@ mod tests {
rows: None,
warnings: vec![],
tracing_id: None,
paging_state_response: PagingStateResponse::NoMorePages,
col_specs: vec![column_spec],
serialized_size: 0,
}
Expand Down
Loading

0 comments on commit 988aeb6

Please sign in to comment.