Skip to content

Commit

Permalink
Merge pull request #1156 from muzarski/rename-run-do-execute-query
Browse files Browse the repository at this point in the history
session: adjust naming in internal session functions
  • Loading branch information
wprzytula authored Dec 30, 2024
2 parents 089f26f + f6a9439 commit 57ad5ad
Showing 1 changed file with 74 additions and 68 deletions.
142 changes: 74 additions & 68 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ impl Default for SessionConfig {
}
}

pub(crate) enum RunQueryResult<ResT> {
pub(crate) enum RunRequestResult<ResT> {
IgnoredWriteError,
Completed(ResT),
}
Expand Down Expand Up @@ -1203,8 +1203,8 @@ where

let span = RequestSpan::new_query(&query.contents);
let span_ref = &span;
let run_query_result = self
.run_query(
let run_request_result = self
.run_request(
statement_info,
&query.config,
execution_profile,
Expand Down Expand Up @@ -1257,13 +1257,13 @@ where
.instrument(span.span().clone())
.await?;

let response = match run_query_result {
RunQueryResult::IgnoredWriteError => NonErrorQueryResponse {
let response = match run_request_result {
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
response: NonErrorResponse::Result(result::Result::Void),
tracing_id: None,
warnings: Vec::new(),
},
RunQueryResult::Completed(response) => response,
RunRequestResult::Completed(response) => response,
};

self.handle_set_keyspace_response(&response).await?;
Expand Down Expand Up @@ -1526,8 +1526,8 @@ where
}
}

let run_query_result: RunQueryResult<NonErrorQueryResponse> = self
.run_query(
let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
.run_request(
statement_info,
&prepared.config,
execution_profile,
Expand Down Expand Up @@ -1558,13 +1558,13 @@ where
.instrument(span.span().clone())
.await?;

let response = match run_query_result {
RunQueryResult::IgnoredWriteError => NonErrorQueryResponse {
let response = match run_request_result {
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
response: NonErrorResponse::Result(result::Result::Void),
tracing_id: None,
warnings: Vec::new(),
},
RunQueryResult::Completed(response) => response,
RunRequestResult::Completed(response) => response,
};

self.handle_set_keyspace_response(&response).await?;
Expand Down Expand Up @@ -1650,8 +1650,8 @@ where

let span = RequestSpan::new_batch();

let run_query_result = self
.run_query(
let run_request_result = self
.run_request(
statement_info,
&batch.config,
execution_profile,
Expand All @@ -1678,9 +1678,9 @@ where
.instrument(span.span().clone())
.await?;

let result = match run_query_result {
RunQueryResult::IgnoredWriteError => QueryResult::mock_empty(),
RunQueryResult::Completed(result) => {
let result = match run_request_result {
RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(),
RunRequestResult::Completed(result) => {
span.record_result_fields(&result);
result
}
Expand Down Expand Up @@ -1916,26 +1916,27 @@ where
Ok(Some(tracing_info))
}

// This method allows to easily run a query using load balancing, retry policy etc.
// Requires some information about the query and a closure.
// The closure is used to do the query itself on a connection.
// - query will use connection.query()
// - execute will use connection.execute()
// If this query closure fails with some errors retry policy is used to perform retries
// On success this query's result is returned
/// This method allows to easily run a request using load balancing, retry policy etc.
/// Requires some information about the request and a closure.
/// The closure is used to execute the request once on a chosen connection.
/// - query will use connection.query()
/// - execute will use connection.execute()
///
/// If this closure fails with some errors, retry policy is used to perform retries.
/// On success, this request's result is returned.
// I tried to make this closures take a reference instead of an Arc but failed
// maybe once async closures get stabilized this can be fixed
async fn run_query<'a, QueryFut, ResT>(
async fn run_request<'a, QueryFut, ResT>(
&'a self,
statement_info: RoutingInfo<'a>,
statement_config: &'a StatementConfig,
execution_profile: Arc<ExecutionProfileInner>,
do_query: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
request_span: &'a RequestSpan,
) -> Result<RunQueryResult<ResT>, QueryError>
) -> Result<RunRequestResult<ResT>, QueryError>
where
QueryFut: Future<Output = Result<ResT, QueryError>>,
ResT: AllowedRunQueryResTType,
ResT: AllowedRunRequestResTType,
{
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::QueryId)> =
statement_config
Expand All @@ -1947,11 +1948,11 @@ where

let runner = async {
let cluster_data = self.cluster.get_data();
let query_plan =
let request_plan =
load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_data);

// If a speculative execution policy is used to run query, query_plan has to be shared
// between different async functions. This struct helps to wrap query_plan in mutex so it
// If a speculative execution policy is used to run request, request_plan has to be shared
// between different async functions. This struct helps to wrap request_plan in mutex so it
// can be shared safely.
struct SharedPlan<'a, I>
where
Expand Down Expand Up @@ -1980,11 +1981,11 @@ where

match speculative_policy {
Some(speculative) if statement_config.is_idempotent => {
let shared_query_plan = SharedPlan {
iter: std::sync::Mutex::new(query_plan),
let shared_request_plan = SharedPlan {
iter: std::sync::Mutex::new(request_plan),
};

let execute_query_generator = |is_speculative: bool| {
let request_runner_generator = |is_speculative: bool| {
let history_data: Option<HistoryData> = history_listener_and_id
.as_ref()
.map(|(history_listener, query_id)| {
Expand All @@ -2005,11 +2006,11 @@ where
request_span.inc_speculative_executions();
}

self.execute_query(
&shared_query_plan,
&do_query,
self.run_request_speculative_fiber(
&shared_request_plan,
&run_request_once,
&execution_profile,
ExecuteQueryContext {
ExecuteRequestContext {
is_idempotent: statement_config.is_idempotent,
consistency_set_on_statement: statement_config.consistency,
retry_session: retry_policy.new_session(),
Expand All @@ -2027,7 +2028,7 @@ where
speculative_execution::execute(
speculative.as_ref(),
&context,
execute_query_generator,
request_runner_generator,
)
.await
}
Expand All @@ -2040,11 +2041,11 @@ where
query_id: *query_id,
speculative_id: None,
});
self.execute_query(
query_plan,
&do_query,
self.run_request_speculative_fiber(
request_plan,
&run_request_once,
&execution_profile,
ExecuteQueryContext {
ExecuteRequestContext {
is_idempotent: statement_config.is_idempotent,
consistency_set_on_statement: statement_config.consistency,
retry_session: retry_policy.new_session(),
Expand Down Expand Up @@ -2085,24 +2086,29 @@ where
result
}

async fn execute_query<'a, QueryFut, ResT>(
/// Executes the closure `run_request_once`, provided the load balancing plan and some information
/// about the request, including retry session.
/// If request fails, retry session is used to perform retries.
///
/// Returns None, if provided plan is empty.
async fn run_request_speculative_fiber<'a, QueryFut, ResT>(
&'a self,
query_plan: impl Iterator<Item = (NodeRef<'a>, Shard)>,
do_query: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
request_plan: impl Iterator<Item = (NodeRef<'a>, Shard)>,
run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
execution_profile: &ExecutionProfileInner,
mut context: ExecuteQueryContext<'a>,
) -> Option<Result<RunQueryResult<ResT>, QueryError>>
mut context: ExecuteRequestContext<'a>,
) -> Option<Result<RunRequestResult<ResT>, QueryError>>
where
QueryFut: Future<Output = Result<ResT, QueryError>>,
ResT: AllowedRunQueryResTType,
ResT: AllowedRunRequestResTType,
{
let mut last_error: Option<QueryError> = None;
let mut current_consistency: Consistency = context
.consistency_set_on_statement
.unwrap_or(execution_profile.consistency);

'nodes_in_plan: for (node, shard) in query_plan {
let span = trace_span!("Executing query", node = %node.address);
'nodes_in_plan: for (node, shard) in request_plan {
let span = trace_span!("Executing request", node = %node.address);
'same_node_retries: loop {
trace!(parent: &span, "Execution started");
let connection = match node.connection_for_shard(shard).await {
Expand All @@ -2114,14 +2120,14 @@ where
"Choosing connection failed"
);
last_error = Some(e.into());
// Broken connection doesn't count as a failed query, don't log in metrics
// Broken connection doesn't count as a failed request, don't log in metrics
continue 'nodes_in_plan;
}
};
context.request_span.record_shard_id(&connection);

self.metrics.inc_total_nonpaged_queries();
let query_start = std::time::Instant::now();
let request_start = std::time::Instant::now();

trace!(
parent: &span,
Expand All @@ -2130,29 +2136,29 @@ where
);
let attempt_id: Option<history::AttemptId> =
context.log_attempt_start(connection.get_connect_address());
let query_result: Result<ResT, QueryError> =
do_query(connection, current_consistency, execution_profile)
let request_result: Result<ResT, QueryError> =
run_request_once(connection, current_consistency, execution_profile)
.instrument(span.clone())
.await;

let elapsed = query_start.elapsed();
last_error = match query_result {
let elapsed = request_start.elapsed();
last_error = match request_result {
Ok(response) => {
trace!(parent: &span, "Query succeeded");
trace!(parent: &span, "Request succeeded");
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
context.log_attempt_success(&attempt_id);
execution_profile.load_balancing_policy.on_query_success(
context.query_info,
elapsed,
node,
);
return Some(Ok(RunQueryResult::Completed(response)));
return Some(Ok(RunRequestResult::Completed(response)));
}
Err(e) => {
trace!(
parent: &span,
last_error = %e,
"Query failed"
"Request failed"
);
self.metrics.inc_failed_nonpaged_queries();
execution_profile.load_balancing_policy.on_query_failure(
Expand Down Expand Up @@ -2195,7 +2201,7 @@ where
RetryDecision::DontRetry => break 'nodes_in_plan,

RetryDecision::IgnoreWriteError => {
return Some(Ok(RunQueryResult::IgnoredWriteError))
return Some(Ok(RunRequestResult::IgnoredWriteError))
}
};
}
Expand Down Expand Up @@ -2243,22 +2249,22 @@ where
}
}

// run_query, execute_query, etc have a template type called ResT.
// run_request, run_request_speculative_fiber, etc have a template type called ResT.
// There was a bug where ResT was set to QueryResponse, which could
// be an error response. This was not caught by retry policy which
// assumed all errors would come from analyzing Result<ResT, QueryError>.
// This trait is a guard to make sure that this mistake doesn't
// happen again.
// When using run_query make sure that the ResT type is NOT able
// When using run_request make sure that the ResT type is NOT able
// to contain any errors.
// See https://github.com/scylladb/scylla-rust-driver/issues/501
pub(crate) trait AllowedRunQueryResTType {}
pub(crate) trait AllowedRunRequestResTType {}

impl AllowedRunQueryResTType for Uuid {}
impl AllowedRunQueryResTType for QueryResult {}
impl AllowedRunQueryResTType for NonErrorQueryResponse {}
impl AllowedRunRequestResTType for Uuid {}
impl AllowedRunRequestResTType for QueryResult {}
impl AllowedRunRequestResTType for NonErrorQueryResponse {}

struct ExecuteQueryContext<'a> {
struct ExecuteRequestContext<'a> {
is_idempotent: bool,
consistency_set_on_statement: Option<Consistency>,
retry_session: Box<dyn RetrySession>,
Expand All @@ -2273,7 +2279,7 @@ struct HistoryData<'a> {
speculative_id: Option<history::SpeculativeId>,
}

impl ExecuteQueryContext<'_> {
impl ExecuteRequestContext<'_> {
fn log_attempt_start(&self, node_addr: SocketAddr) -> Option<history::AttemptId> {
self.history_data.as_ref().map(|hd| {
hd.listener
Expand Down

0 comments on commit 57ad5ad

Please sign in to comment.