From 74b7fa3b65c28b79a5a5aa4eeeef4725294f9e30 Mon Sep 17 00:00:00 2001 From: Dawid Pawlik <501149991dp@gmail.com> Date: Mon, 9 Dec 2024 21:41:14 +0100 Subject: [PATCH] metrics: make metrics optional under crate feature I've added "metrics" crate feature which enables usage and gathering of metrics. Therefore everyone willing to use metrics in their code is required to add "metrics" feature in their Cargo.toml file or compile otherwise with --features metrics flag. This change was requested in #330 --- examples/Cargo.toml | 1 + scylla/Cargo.toml | 3 +- scylla/src/lib.rs | 1 + scylla/src/transport/iterator.rs | 13 +++++++- scylla/src/transport/mod.rs | 1 + scylla/src/transport/session.rs | 32 +++++++++++++++++++ scylla/src/transport/speculative_execution.rs | 13 +++++++- 7 files changed, 61 insertions(+), 3 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 5b335fd34f..8b6bb8407a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -18,6 +18,7 @@ scylla = { path = "../scylla", features = [ "num-bigint-03", "num-bigint-04", "bigdecimal-04", + "metrics", ] } tokio = { version = "1.34", features = ["full"] } tracing = { version = "0.1.25", features = ["log"] } diff --git a/scylla/Cargo.toml b/scylla/Cargo.toml index 92b7dfafb9..6580430add 100644 --- a/scylla/Cargo.toml +++ b/scylla/Cargo.toml @@ -39,6 +39,7 @@ full-serialization = [ "num-bigint-04", "bigdecimal-04", ] +metrics = ["dep:histogram"] [dependencies] scylla-macros = { version = "0.7.0", path = "../scylla-macros" } @@ -47,7 +48,7 @@ byteorder = "1.3.4" bytes = "1.0.1" futures = "0.3.6" hashbrown = "0.14" -histogram = "0.11.1" +histogram = { version = "0.11.1", optional = true } tokio = { version = "1.34", features = [ "net", "time", diff --git a/scylla/src/lib.rs b/scylla/src/lib.rs index f03edf7a9e..3d46c1a816 100644 --- a/scylla/src/lib.rs +++ b/scylla/src/lib.rs @@ -286,4 +286,5 @@ pub use transport::load_balancing; pub use transport::retry_policy; pub use transport::speculative_execution; +#[cfg(feature = "metrics")] pub use transport::metrics::{Metrics, MetricsError}; diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 8d7c07be35..cd3874b359 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -37,6 +37,7 @@ use crate::transport::cluster::ClusterData; use crate::transport::connection::{Connection, NonErrorQueryResponse, QueryResponse}; use crate::transport::errors::{ProtocolError, QueryError, UserRequestError}; use crate::transport::load_balancing::{self, RoutingInfo}; +#[cfg(feature = "metrics")] use crate::transport::metrics::Metrics; use crate::transport::retry_policy::{QueryInfo, RetryDecision, RetrySession}; use crate::transport::NodeRef; @@ -67,6 +68,7 @@ pub(crate) struct PreparedIteratorConfig { pub(crate) values: SerializedValues, pub(crate) execution_profile: Arc, pub(crate) cluster_data: Arc, + #[cfg(feature = "metrics")] pub(crate) metrics: Arc, } @@ -142,6 +144,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> { query_consistency: Consistency, retry_session: Box, execution_profile: Arc, + #[cfg(feature = "metrics")] metrics: Arc, paging_state: PagingState, @@ -236,11 +239,13 @@ where self.log_attempt_error(&last_error, &retry_decision); match retry_decision { RetryDecision::RetrySameNode(cl) => { + #[cfg(feature = "metrics")] self.metrics.inc_retries_num(); current_consistency = cl.unwrap_or(current_consistency); continue 'same_node_retries; } RetryDecision::RetryNextNode(cl) => { + #[cfg(feature = "metrics")] self.metrics.inc_retries_num(); current_consistency = cl.unwrap_or(current_consistency); continue 'nodes_in_plan; @@ -298,6 +303,7 @@ where node: NodeRef<'_>, request_span: &RequestSpan, ) -> Result, QueryError> { + #[cfg(feature = "metrics")] self.metrics.inc_total_paged_queries(); let query_start = std::time::Instant::now(); @@ -323,6 +329,7 @@ where tracing_id, .. }) => { + #[cfg(feature = "metrics")] let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); self.log_attempt_success(); self.log_query_success(); @@ -359,6 +366,7 @@ where } Err(err) => { let err = err.into(); + #[cfg(feature = "metrics")] self.metrics.inc_failed_paged_queries(); self.execution_profile .load_balancing_policy @@ -378,6 +386,7 @@ where Ok(ControlFlow::Break(proof)) } Ok(response) => { + #[cfg(feature = "metrics")] self.metrics.inc_failed_paged_queries(); let err = ProtocolError::UnexpectedResponse(response.response.to_response_kind()).into(); @@ -680,7 +689,7 @@ impl QueryPager { query: Query, execution_profile: Arc, cluster_data: Arc, - metrics: Arc, + #[cfg(feature = "metrics")] metrics: Arc, ) -> Result { let (sender, receiver) = mpsc::channel(1); @@ -743,6 +752,7 @@ impl QueryPager { query_consistency: consistency, retry_session, execution_profile, + #[cfg(feature = "metrics")] metrics, paging_state: PagingState::start(), history_listener: query.config.history_listener.clone(), @@ -861,6 +871,7 @@ impl QueryPager { query_consistency: consistency, retry_session, execution_profile: config.execution_profile, + #[cfg(feature = "metrics")] metrics: config.metrics, paging_state: PagingState::start(), history_listener: config.prepared.config.history_listener.clone(), diff --git a/scylla/src/transport/mod.rs b/scylla/src/transport/mod.rs index 544e5644aa..d62e93c10c 100644 --- a/scylla/src/transport/mod.rs +++ b/scylla/src/transport/mod.rs @@ -11,6 +11,7 @@ pub mod iterator; pub mod legacy_query_result; pub mod load_balancing; pub mod locator; +#[cfg(feature = "metrics")] pub(crate) mod metrics; mod node; pub mod partitioner; diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index b3efa7e076..5222f808c3 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -68,6 +68,7 @@ use crate::transport::host_filter::HostFilter; #[allow(deprecated)] use crate::transport::iterator::{LegacyRowIterator, PreparedIteratorConfig}; use crate::transport::load_balancing::{self, RoutingInfo}; +#[cfg(feature = "metrics")] use crate::transport::metrics::Metrics; use crate::transport::node::Node; use crate::transport::query_result::QueryResult; @@ -190,6 +191,7 @@ where cluster: Cluster, default_execution_profile_handle: ExecutionProfileHandle, schema_agreement_interval: Duration, + #[cfg(feature = "metrics")] metrics: Arc, schema_agreement_timeout: Duration, schema_agreement_automatic_waiting: bool, @@ -215,6 +217,7 @@ impl std::fmt::Debug for GenericSession where DeserApi: DeserializationApiKind, { + #[cfg(feature = "metrics")] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Session") .field("cluster", &ClusterNeatDebug(&self.cluster)) @@ -230,6 +233,22 @@ where ) .finish() } + + #[cfg(not(feature = "metrics"))] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Session") + .field("cluster", &ClusterNeatDebug(&self.cluster)) + .field( + "default_execution_profile_handle", + &self.default_execution_profile_handle, + ) + .field("schema_agreement_interval", &self.schema_agreement_interval) + .field( + "auto_await_schema_agreement_timeout", + &self.schema_agreement_timeout, + ) + .finish() + } } /// Configuration options for [`Session`]. @@ -883,6 +902,7 @@ impl GenericSession { LegacySession { cluster: self.cluster.clone(), default_execution_profile_handle: self.default_execution_profile_handle.clone(), + #[cfg(feature = "metrics")] metrics: self.metrics.clone(), refresh_metadata_on_auto_schema_agreement: self .refresh_metadata_on_auto_schema_agreement, @@ -993,6 +1013,7 @@ impl GenericSession { Session { cluster: self.cluster.clone(), default_execution_profile_handle: self.default_execution_profile_handle.clone(), + #[cfg(feature = "metrics")] metrics: self.metrics.clone(), refresh_metadata_on_auto_schema_agreement: self .refresh_metadata_on_auto_schema_agreement, @@ -1114,6 +1135,7 @@ where cluster, default_execution_profile_handle, schema_agreement_interval: config.schema_agreement_interval, + #[cfg(feature = "metrics")] metrics: Arc::new(Metrics::new()), schema_agreement_timeout: config.schema_agreement_timeout, schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting, @@ -1324,6 +1346,7 @@ where query, execution_profile, self.cluster.get_data(), + #[cfg(feature = "metrics")] self.metrics.clone(), ) .await @@ -1338,6 +1361,7 @@ where values, execution_profile, cluster_data: self.cluster.get_data(), + #[cfg(feature = "metrics")] metrics: self.metrics.clone(), }) .await @@ -1592,6 +1616,7 @@ where values: serialized_values, execution_profile, cluster_data: self.cluster.get_data(), + #[cfg(feature = "metrics")] metrics: self.metrics.clone(), }) .await @@ -1802,6 +1827,7 @@ where /// Access metrics collected by the driver\ /// Driver collects various metrics like number of queries or query latencies. /// They can be read using this method + #[cfg(feature = "metrics")] pub fn get_metrics(&self) -> Arc { self.metrics.clone() } @@ -2020,6 +2046,7 @@ where }; let context = speculative_execution::Context { + #[cfg(feature = "metrics")] metrics: self.metrics.clone(), }; @@ -2119,6 +2146,7 @@ where }; context.request_span.record_shard_id(&connection); + #[cfg(feature = "metrics")] self.metrics.inc_total_nonpaged_queries(); let query_start = std::time::Instant::now(); @@ -2138,6 +2166,7 @@ where last_error = match query_result { Ok(response) => { trace!(parent: &span, "Query succeeded"); + #[cfg(feature = "metrics")] let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); context.log_attempt_success(&attempt_id); execution_profile.load_balancing_policy.on_query_success( @@ -2153,6 +2182,7 @@ where last_error = %e, "Query failed" ); + #[cfg(feature = "metrics")] self.metrics.inc_failed_nonpaged_queries(); execution_profile.load_balancing_policy.on_query_failure( context.query_info, @@ -2182,11 +2212,13 @@ where context.log_attempt_error(&attempt_id, the_error, &retry_decision); match retry_decision { RetryDecision::RetrySameNode(new_cl) => { + #[cfg(feature = "metrics")] self.metrics.inc_retries_num(); current_consistency = new_cl.unwrap_or(current_consistency); continue 'same_node_retries; } RetryDecision::RetryNextNode(new_cl) => { + #[cfg(feature = "metrics")] self.metrics.inc_retries_num(); current_consistency = new_cl.unwrap_or(current_consistency); continue 'nodes_in_plan; diff --git a/scylla/src/transport/speculative_execution.rs b/scylla/src/transport/speculative_execution.rs index 60344d0a02..e2168c5b5f 100644 --- a/scylla/src/transport/speculative_execution.rs +++ b/scylla/src/transport/speculative_execution.rs @@ -3,15 +3,19 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, }; use scylla_cql::frame::response::error::DbError; -use std::{future::Future, sync::Arc, time::Duration}; +#[cfg(feature = "metrics")] +use std::sync::Arc; +use std::{future::Future, time::Duration}; use tracing::{trace_span, warn, Instrument}; use crate::transport::errors::QueryError; +#[cfg(feature = "metrics")] use super::metrics::Metrics; /// Context is passed as an argument to `SpeculativeExecutionPolicy` methods pub struct Context { + #[cfg(feature = "metrics")] pub metrics: Arc, } @@ -66,6 +70,7 @@ impl SpeculativeExecutionPolicy for PercentileSpeculativeExecutionPolicy { self.max_retry_count } + #[cfg(feature = "metrics")] fn retry_interval(&self, context: &Context) -> Duration { let interval = context.metrics.get_latency_percentile_ms(self.percentile); let ms = match interval { @@ -80,6 +85,12 @@ impl SpeculativeExecutionPolicy for PercentileSpeculativeExecutionPolicy { }; Duration::from_millis(ms) } + + #[cfg(not(feature = "metrics"))] + fn retry_interval(&self, _: &Context) -> Duration { + warn!("PercentileSpeculativeExecutionPolicy requires the 'metrics' feature to work as intended, defaulting to 100 ms"); + Duration::from_millis(100) + } } /// Checks if a result created in a speculative execution branch can be ignored.