Skip to content

Commit

Permalink
metrics: make metrics optional under crate feature
Browse files Browse the repository at this point in the history
I've added "metrics" crate feature which enables usage and gathering of metrics.

Therefore everyone willing to use metrics in their code is required to add "metrics" feature in their Cargo.toml file or compile otherwise with --features metrics flag.

This change was requested in #330
  • Loading branch information
QuerthDP committed Dec 11, 2024
1 parent d983263 commit e1e7201
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 3 deletions.
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ scylla = { path = "../scylla", features = [
"num-bigint-03",
"num-bigint-04",
"bigdecimal-04",
"metrics",
] }
tokio = { version = "1.34", features = ["full"] }
tracing = { version = "0.1.25", features = ["log"] }
Expand Down
3 changes: 2 additions & 1 deletion scylla/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ full-serialization = [
"num-bigint-04",
"bigdecimal-04",
]
metrics = ["dep:histogram"]

[dependencies]
scylla-macros = { version = "0.7.0", path = "../scylla-macros" }
Expand All @@ -47,7 +48,7 @@ byteorder = "1.3.4"
bytes = "1.0.1"
futures = "0.3.6"
hashbrown = "0.14"
histogram = "0.11.1"
histogram = { version = "0.11.1", optional = true }
tokio = { version = "1.34", features = [
"net",
"time",
Expand Down
1 change: 1 addition & 0 deletions scylla/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,5 @@ pub use transport::load_balancing;
pub use transport::retry_policy;
pub use transport::speculative_execution;

#[cfg(feature = "metrics")]
pub use transport::metrics::{Metrics, MetricsError};
13 changes: 12 additions & 1 deletion scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::transport::cluster::ClusterData;
use crate::transport::connection::{Connection, NonErrorQueryResponse, QueryResponse};
use crate::transport::errors::{ProtocolError, QueryError, UserRequestError};
use crate::transport::load_balancing::{self, RoutingInfo};
#[cfg(feature = "metrics")]
use crate::transport::metrics::Metrics;
use crate::transport::retry_policy::{QueryInfo, RetryDecision, RetrySession};
use crate::transport::NodeRef;
Expand Down Expand Up @@ -67,6 +68,7 @@ pub(crate) struct PreparedIteratorConfig {
pub(crate) values: SerializedValues,
pub(crate) execution_profile: Arc<ExecutionProfileInner>,
pub(crate) cluster_data: Arc<ClusterData>,
#[cfg(feature = "metrics")]
pub(crate) metrics: Arc<Metrics>,
}

Expand Down Expand Up @@ -142,6 +144,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
query_consistency: Consistency,
retry_session: Box<dyn RetrySession>,
execution_profile: Arc<ExecutionProfileInner>,
#[cfg(feature = "metrics")]
metrics: Arc<Metrics>,

paging_state: PagingState,
Expand Down Expand Up @@ -236,11 +239,13 @@ where
self.log_attempt_error(&last_error, &retry_decision);
match retry_decision {
RetryDecision::RetrySameNode(cl) => {
#[cfg(feature = "metrics")]
self.metrics.inc_retries_num();
current_consistency = cl.unwrap_or(current_consistency);
continue 'same_node_retries;
}
RetryDecision::RetryNextNode(cl) => {
#[cfg(feature = "metrics")]
self.metrics.inc_retries_num();
current_consistency = cl.unwrap_or(current_consistency);
continue 'nodes_in_plan;
Expand Down Expand Up @@ -298,6 +303,7 @@ where
node: NodeRef<'_>,
request_span: &RequestSpan,
) -> Result<ControlFlow<PageSendAttemptedProof, ()>, QueryError> {
#[cfg(feature = "metrics")]
self.metrics.inc_total_paged_queries();
let query_start = std::time::Instant::now();

Expand All @@ -323,6 +329,7 @@ where
tracing_id,
..
}) => {
#[cfg(feature = "metrics")]
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
self.log_attempt_success();
self.log_query_success();
Expand Down Expand Up @@ -359,6 +366,7 @@ where
}
Err(err) => {
let err = err.into();
#[cfg(feature = "metrics")]
self.metrics.inc_failed_paged_queries();
self.execution_profile
.load_balancing_policy
Expand All @@ -378,6 +386,7 @@ where
Ok(ControlFlow::Break(proof))
}
Ok(response) => {
#[cfg(feature = "metrics")]
self.metrics.inc_failed_paged_queries();
let err =
ProtocolError::UnexpectedResponse(response.response.to_response_kind()).into();
Expand Down Expand Up @@ -680,7 +689,7 @@ impl QueryPager {
query: Query,
execution_profile: Arc<ExecutionProfileInner>,
cluster_data: Arc<ClusterData>,
metrics: Arc<Metrics>,
#[cfg(feature = "metrics")] metrics: Arc<Metrics>,
) -> Result<Self, QueryError> {
let (sender, receiver) = mpsc::channel(1);

Expand Down Expand Up @@ -743,6 +752,7 @@ impl QueryPager {
query_consistency: consistency,
retry_session,
execution_profile,
#[cfg(feature = "metrics")]
metrics,
paging_state: PagingState::start(),
history_listener: query.config.history_listener.clone(),
Expand Down Expand Up @@ -861,6 +871,7 @@ impl QueryPager {
query_consistency: consistency,
retry_session,
execution_profile: config.execution_profile,
#[cfg(feature = "metrics")]
metrics: config.metrics,
paging_state: PagingState::start(),
history_listener: config.prepared.config.history_listener.clone(),
Expand Down
2 changes: 2 additions & 0 deletions scylla/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ mod connection_pool;
pub mod downgrading_consistency_retry_policy;
pub mod errors;
pub mod execution_profile;
#[cfg(feature = "metrics")]
pub mod histogram;
pub mod host_filter;
pub mod iterator;
pub mod legacy_query_result;
pub mod load_balancing;
pub mod locator;
#[cfg(feature = "metrics")]
pub(crate) mod metrics;
mod node;
pub mod partitioner;
Expand Down
32 changes: 32 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use crate::transport::host_filter::HostFilter;
#[allow(deprecated)]
use crate::transport::iterator::{LegacyRowIterator, PreparedIteratorConfig};
use crate::transport::load_balancing::{self, RoutingInfo};
#[cfg(feature = "metrics")]
use crate::transport::metrics::Metrics;
use crate::transport::node::Node;
use crate::transport::query_result::QueryResult;
Expand Down Expand Up @@ -190,6 +191,7 @@ where
cluster: Cluster,
default_execution_profile_handle: ExecutionProfileHandle,
schema_agreement_interval: Duration,
#[cfg(feature = "metrics")]
metrics: Arc<Metrics>,
schema_agreement_timeout: Duration,
schema_agreement_automatic_waiting: bool,
Expand All @@ -215,6 +217,7 @@ impl<DeserApi> std::fmt::Debug for GenericSession<DeserApi>
where
DeserApi: DeserializationApiKind,
{
#[cfg(feature = "metrics")]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Session")
.field("cluster", &ClusterNeatDebug(&self.cluster))
Expand All @@ -230,6 +233,22 @@ where
)
.finish()
}

#[cfg(not(feature = "metrics"))]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Session")
.field("cluster", &ClusterNeatDebug(&self.cluster))
.field(
"default_execution_profile_handle",
&self.default_execution_profile_handle,
)
.field("schema_agreement_interval", &self.schema_agreement_interval)
.field(
"auto_await_schema_agreement_timeout",
&self.schema_agreement_timeout,
)
.finish()
}
}

/// Configuration options for [`Session`].
Expand Down Expand Up @@ -883,6 +902,7 @@ impl GenericSession<CurrentDeserializationApi> {
LegacySession {
cluster: self.cluster.clone(),
default_execution_profile_handle: self.default_execution_profile_handle.clone(),
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
refresh_metadata_on_auto_schema_agreement: self
.refresh_metadata_on_auto_schema_agreement,
Expand Down Expand Up @@ -993,6 +1013,7 @@ impl GenericSession<LegacyDeserializationApi> {
Session {
cluster: self.cluster.clone(),
default_execution_profile_handle: self.default_execution_profile_handle.clone(),
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
refresh_metadata_on_auto_schema_agreement: self
.refresh_metadata_on_auto_schema_agreement,
Expand Down Expand Up @@ -1114,6 +1135,7 @@ where
cluster,
default_execution_profile_handle,
schema_agreement_interval: config.schema_agreement_interval,
#[cfg(feature = "metrics")]
metrics: Arc::new(Metrics::new()),
schema_agreement_timeout: config.schema_agreement_timeout,
schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting,
Expand Down Expand Up @@ -1324,6 +1346,7 @@ where
query,
execution_profile,
self.cluster.get_data(),
#[cfg(feature = "metrics")]
self.metrics.clone(),
)
.await
Expand All @@ -1338,6 +1361,7 @@ where
values,
execution_profile,
cluster_data: self.cluster.get_data(),
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
})
.await
Expand Down Expand Up @@ -1592,6 +1616,7 @@ where
values: serialized_values,
execution_profile,
cluster_data: self.cluster.get_data(),
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
})
.await
Expand Down Expand Up @@ -1802,6 +1827,7 @@ where
/// Access metrics collected by the driver\
/// Driver collects various metrics like number of queries or query latencies.
/// They can be read using this method
#[cfg(feature = "metrics")]
pub fn get_metrics(&self) -> Arc<Metrics> {
self.metrics.clone()
}
Expand Down Expand Up @@ -2020,6 +2046,7 @@ where
};

let context = speculative_execution::Context {
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
};

Expand Down Expand Up @@ -2119,6 +2146,7 @@ where
};
context.request_span.record_shard_id(&connection);

#[cfg(feature = "metrics")]
self.metrics.inc_total_nonpaged_queries();
let query_start = std::time::Instant::now();

Expand All @@ -2138,6 +2166,7 @@ where
last_error = match query_result {
Ok(response) => {
trace!(parent: &span, "Query succeeded");
#[cfg(feature = "metrics")]
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
context.log_attempt_success(&attempt_id);
execution_profile.load_balancing_policy.on_query_success(
Expand All @@ -2153,6 +2182,7 @@ where
last_error = %e,
"Query failed"
);
#[cfg(feature = "metrics")]
self.metrics.inc_failed_nonpaged_queries();
execution_profile.load_balancing_policy.on_query_failure(
context.query_info,
Expand Down Expand Up @@ -2182,11 +2212,13 @@ where
context.log_attempt_error(&attempt_id, the_error, &retry_decision);
match retry_decision {
RetryDecision::RetrySameNode(new_cl) => {
#[cfg(feature = "metrics")]
self.metrics.inc_retries_num();
current_consistency = new_cl.unwrap_or(current_consistency);
continue 'same_node_retries;
}
RetryDecision::RetryNextNode(new_cl) => {
#[cfg(feature = "metrics")]
self.metrics.inc_retries_num();
current_consistency = new_cl.unwrap_or(current_consistency);
continue 'nodes_in_plan;
Expand Down
13 changes: 12 additions & 1 deletion scylla/src/transport/speculative_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
};
use scylla_cql::frame::response::error::DbError;
use std::{future::Future, sync::Arc, time::Duration};
#[cfg(feature = "metrics")]
use std::sync::Arc;
use std::{future::Future, time::Duration};
use tracing::{trace_span, warn, Instrument};

use crate::transport::errors::QueryError;

#[cfg(feature = "metrics")]
use super::metrics::Metrics;

/// Context is passed as an argument to `SpeculativeExecutionPolicy` methods
pub struct Context {
#[cfg(feature = "metrics")]
pub metrics: Arc<Metrics>,
}

Expand Down Expand Up @@ -66,6 +70,7 @@ impl SpeculativeExecutionPolicy for PercentileSpeculativeExecutionPolicy {
self.max_retry_count
}

#[cfg(feature = "metrics")]
fn retry_interval(&self, context: &Context) -> Duration {
let interval = context.metrics.get_latency_percentile_ms(self.percentile);
let ms = match interval {
Expand All @@ -80,6 +85,12 @@ impl SpeculativeExecutionPolicy for PercentileSpeculativeExecutionPolicy {
};
Duration::from_millis(ms)
}

#[cfg(not(feature = "metrics"))]
fn retry_interval(&self, _: &Context) -> Duration {
warn!("PercentileSpeculativeExecutionPolicy requires the 'metrics' feature to work as intended, defaulting to 100 ms");
Duration::from_millis(100)
}
}

/// Checks if a result created in a speculative execution branch can be ignored.
Expand Down

0 comments on commit e1e7201

Please sign in to comment.