diff --git a/scylla/Cargo.toml b/scylla/Cargo.toml index 2460e020b9..6b017c9d10 100644 --- a/scylla/Cargo.toml +++ b/scylla/Cargo.toml @@ -27,7 +27,7 @@ bytes = "1.0.1" futures = "0.3.6" histogram = "0.6.9" num_enum = "0.6" -tokio = { version = "1.27", features = ["net", "time", "io-util", "sync", "rt", "macros"] } +tokio = { version = "1.27", features = ["net", "time", "io-util", "sync", "rt", "macros", "test-util"] } snap = "1.0" uuid = { version = "1.0", features = ["v4"] } rand = "0.8.3" diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 3c2097c9dc..d1554babf1 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -2132,7 +2132,8 @@ mod latency_awareness { use futures::{future::RemoteHandle, FutureExt}; use itertools::Either; use scylla_cql::errors::{DbError, QueryError}; - use tracing::{instrument::WithSubscriber, trace}; + use tokio::time::{Duration, Instant}; + use tracing::{instrument::WithSubscriber, trace, warn}; use uuid::Uuid; use crate::{load_balancing::NodeRef, transport::node::Node}; @@ -2143,7 +2144,6 @@ mod latency_awareness { atomic::{AtomicU64, Ordering}, Arc, RwLock, }, - time::{Duration, Instant}, }; #[derive(Debug)] @@ -2168,7 +2168,7 @@ mod latency_awareness { } } - #[derive(Debug, Clone, Copy)] + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(super) struct TimestampedAverage { pub(super) timestamp: Instant, pub(super) average: Duration, @@ -2190,15 +2190,41 @@ mod latency_awareness { timestamp: now, }), Some(prev_avg) => Some({ - let delay = (now - prev_avg.timestamp).as_secs_f64(); + let delay = now + .saturating_duration_since(prev_avg.timestamp) + .as_secs_f64(); let scaled_delay = delay / scale_secs; - let prev_weight = (scaled_delay + 1.).ln() / scaled_delay; + let prev_weight = if scaled_delay <= 0. { + 1. + } else { + (scaled_delay + 1.).ln() / scaled_delay + }; let last_latency_secs = last_latency.as_secs_f64(); let prev_avg_secs = prev_avg.average.as_secs_f64(); - let average = Duration::from_secs_f64( + let average = match Duration::try_from_secs_f64( (1. - prev_weight) * last_latency_secs + prev_weight * prev_avg_secs, - ); + ) { + Ok(ts) => ts, + Err(e) => { + warn!( + "Error while calculating average: {e}. \ + prev_avg_secs: {prev_avg_secs}, \ + last_latency_secs: {last_latency_secs}, \ + prev_weight: {prev_weight}, \ + scaled_delay: {scaled_delay}, \ + delay: {delay}, \ + prev_avg.timestamp: {:?}, \ + now: {now:?}", + prev_avg.timestamp + ); + + // Not sure when we could enter this branch, + // so I have no idea what would be a sensible value to return here, + // this does not seem like a very bad choice. + prev_avg.average + } + }; Self { num_measures: prev_avg.num_measures + 1, timestamp: now, @@ -2733,7 +2759,7 @@ mod latency_awareness { }, ExecutionProfile, }; - use std::time::Instant; + use tokio::time::Instant; trait DefaultPolicyTestExt { fn set_nodes_latency_stats( @@ -3453,5 +3479,24 @@ mod latency_awareness { session.query("whatever", ()).await.unwrap_err(); } + + #[tokio::test] + async fn timestamped_average_works_when_clock_stops() { + tokio::time::pause(); + let avg = Some(TimestampedAverage { + timestamp: Instant::now(), + average: Duration::from_secs(123), + num_measures: 1, + }); + let new_avg = TimestampedAverage::compute_next(avg, Duration::from_secs(456), 10.0); + assert_eq!( + new_avg, + Some(TimestampedAverage { + timestamp: Instant::now(), + average: Duration::from_secs(123), + num_measures: 2, + }), + ); + } } }