Skip to content

Commit

Permalink
conn: narrow return type for execute and query
Browse files Browse the repository at this point in the history
Unfortunately, I didn't know how to decouple this commit
into two separate commits. This is due to the trait bounds
on e.g. RowIteratorWorker::work implementation.
  • Loading branch information
muzarski committed Aug 26, 2024
1 parent e28dffc commit 3cca52c
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
8 changes: 4 additions & 4 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ impl Connection {
&self,
query: &Query,
paging_state: Option<Bytes>,
) -> Result<QueryResponse, QueryError> {
) -> Result<QueryResponse, UserRequestError> {
// This method is used only for driver internal queries, so no need to consult execution profile here.
self.query_with_consistency(
query,
Expand All @@ -1050,7 +1050,7 @@ impl Connection {
consistency: Consistency,
serial_consistency: Option<SerialConsistency>,
paging_state: Option<Bytes>,
) -> Result<QueryResponse, QueryError> {
) -> Result<QueryResponse, UserRequestError> {
let query_frame = query::Query {
contents: Cow::Borrowed(&query.contents),
parameters: query::QueryParameters {
Expand All @@ -1075,7 +1075,7 @@ impl Connection {
prepared: PreparedStatement,
values: SerializedValues,
paging_state: Option<Bytes>,
) -> Result<QueryResponse, QueryError> {
) -> Result<QueryResponse, UserRequestError> {
// This method is used only for driver internal queries, so no need to consult execution profile here.
self.execute_with_consistency(
&prepared,
Expand All @@ -1096,7 +1096,7 @@ impl Connection {
consistency: Consistency,
serial_consistency: Option<SerialConsistency>,
paging_state: Option<Bytes>,
) -> Result<QueryResponse, QueryError> {
) -> Result<QueryResponse, UserRequestError> {
let execute_frame = execute::Execute {
id: prepared_statement.get_id().to_owned(),
parameters: query::QueryParameters {
Expand Down
8 changes: 5 additions & 3 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::task::{Context, Poll};

use bytes::Bytes;
use futures::Stream;
use scylla_cql::errors::UserRequestError;
use scylla_cql::frame::response::NonErrorResponse;
use scylla_cql::frame::types::SerialConsistency;
use scylla_cql::types::serialize::row::SerializedValues;
Expand Down Expand Up @@ -489,7 +490,7 @@ struct RowIteratorWorker<'a, QueryFunc, SpanCreatorFunc> {
sender: ProvingSender<Result<ReceivedPage, QueryError>>,

// Closure used to perform a single page query
// AsyncFn(Arc<Connection>, Option<Bytes>) -> Result<QueryResponse, QueryError>
// AsyncFn(Arc<Connection>, Option<Bytes>) -> Result<QueryResponse, UserRequestError>
page_query: QueryFunc,

statement_info: RoutingInfo<'a>,
Expand All @@ -512,7 +513,7 @@ struct RowIteratorWorker<'a, QueryFunc, SpanCreatorFunc> {
impl<QueryFunc, QueryFut, SpanCreator> RowIteratorWorker<'_, QueryFunc, SpanCreator>
where
QueryFunc: Fn(Arc<Connection>, Consistency, Option<Bytes>) -> QueryFut,
QueryFut: Future<Output = Result<QueryResponse, QueryError>>,
QueryFut: Future<Output = Result<QueryResponse, UserRequestError>>,
SpanCreator: Fn() -> RequestSpan,
{
// Contract: this function MUST send at least one item through self.sender
Expand Down Expand Up @@ -666,6 +667,7 @@ where
let query_response =
(self.page_query)(connection.clone(), consistency, self.paging_state.clone())
.await
.map_err(Into::into)
.and_then(QueryResponse::into_non_error_query_response);

let elapsed = query_start.elapsed();
Expand Down Expand Up @@ -831,7 +833,7 @@ struct SingleConnectionRowIteratorWorker<Fetcher> {
impl<Fetcher, FetchFut> SingleConnectionRowIteratorWorker<Fetcher>
where
Fetcher: Fn(Option<Bytes>) -> FetchFut + Send + Sync,
FetchFut: Future<Output = Result<QueryResponse, QueryError>> + Send,
FetchFut: Future<Output = Result<QueryResponse, UserRequestError>> + Send,
{
async fn work(mut self) -> PageSendAttemptedProof {
match self.do_work().await {
Expand Down
3 changes: 3 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ impl Session {
paging_state_ref.clone(),
)
.await
.map_err(Into::into)
.and_then(QueryResponse::into_non_error_query_response)
} else {
let prepared = connection.prepare(query_ref).await?;
Expand All @@ -704,6 +705,7 @@ impl Session {
paging_state_ref.clone(),
)
.await
.map_err(Into::into)
.and_then(QueryResponse::into_non_error_query_response)
}
}
Expand Down Expand Up @@ -1057,6 +1059,7 @@ impl Session {
paging_state_ref.clone(),
)
.await
.map_err(Into::into)
.and_then(QueryResponse::into_non_error_query_response)
}
},
Expand Down

0 comments on commit 3cca52c

Please sign in to comment.