diff --git a/examples/compare-tokens.rs b/examples/compare-tokens.rs index 294dc7842b..e160383a4b 100644 --- a/examples/compare-tokens.rs +++ b/examples/compare-tokens.rs @@ -33,13 +33,13 @@ async fn main() -> Result<()> { ) .await?; - let t = prepared.calculate_token(&(pk,))?.unwrap().value; + let t = prepared.calculate_token(&(pk,))?.unwrap().value(); println!( "Token endpoints for query: {:?}", session .get_cluster_data() - .get_token_endpoints("examples_ks", Token { value: t }) + .get_token_endpoints("examples_ks", Token::new(t)) .iter() .map(|n| n.address) .collect::>() diff --git a/scylla/src/routing.rs b/scylla/src/routing.rs index e54dfcaec9..740fbfda79 100644 --- a/scylla/src/routing.rs +++ b/scylla/src/routing.rs @@ -5,8 +5,43 @@ use std::num::NonZeroU16; use thiserror::Error; #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)] + +/// Token is a result of computing a hash of a primary key +/// +/// It is basically an i64 with one caveat: i64::MIN is not +/// a valid token. It is used to represent infinity. +/// For this reason tokens are normalized - i64::MIN +/// is replaced with i64::MAX. See this fragment of +/// Scylla code for more information: +/// +/// +/// This struct is a wrapper over i64 that performs this normalization +/// when initialized using `new()` method. pub struct Token { - pub value: i64, + value: i64, +} + +impl Token { + /// Creates a new token with given value, normalizing the value if necessary + #[inline] + pub fn new(value: i64) -> Self { + Self { + value: if value == i64::MIN { i64::MAX } else { value }, + } + } + + /// Creates a new token with given value, but skips the normalization. + /// + /// This can be used to create incorrect token value - i64::MIN. + #[inline] + pub fn new_denormalized(value: i64) -> Self { + Self { value } + } + + #[inline] + pub fn value(&self) -> i64 { + self.value + } } pub type Shard = u32; diff --git a/scylla/src/transport/connection_pool.rs b/scylla/src/transport/connection_pool.rs index f26ea36ac2..3e7bb340c4 100644 --- a/scylla/src/transport/connection_pool.rs +++ b/scylla/src/transport/connection_pool.rs @@ -236,7 +236,7 @@ impl NodeConnectionPool { } pub(crate) fn connection_for_token(&self, token: Token) -> Result, QueryError> { - trace!(token = token.value, "Selecting connection for token"); + trace!(token = token.value(), "Selecting connection for token"); self.with_connections(|pool_conns| match pool_conns { PoolConnections::NotSharded(conns) => { Self::choose_random_connection_from_slice(conns).unwrap() diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 3fdeef18ef..f89504e567 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -1102,9 +1102,7 @@ mod tests { datacenter: Some(dc.to_string()), rack: None, address: id_to_invalid_addr(*id), - tokens: vec![Token { - value: *id as i64 * 100, - }], + tokens: vec![Token::new(*id as i64 * 100)], host_id: Uuid::new_v4(), }) .collect::>(); @@ -1218,7 +1216,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Two, ..Default::default() @@ -1243,7 +1241,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Two, ..Default::default() @@ -1267,7 +1265,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover ..Default::default() @@ -1289,7 +1287,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::One, ..Default::default() @@ -1311,7 +1309,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, ..Default::default() @@ -1336,7 +1334,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, ..Default::default() @@ -1360,7 +1358,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, ..Default::default() @@ -1382,7 +1380,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::Two, ..Default::default() @@ -1406,7 +1404,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover ..Default::default() @@ -1447,7 +1445,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: None, // no keyspace consistency: Consistency::Quorum, ..Default::default() @@ -1466,7 +1464,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, ..Default::default() @@ -1488,7 +1486,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, ..Default::default() @@ -1507,7 +1505,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, ..Default::default() @@ -1529,7 +1527,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, ..Default::default() @@ -1554,7 +1552,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::One, ..Default::default() @@ -1582,7 +1580,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 560 }), + token: Some(Token::new(560)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::Two, ..Default::default() @@ -1609,7 +1607,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::One, ..Default::default() @@ -1688,7 +1686,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Two, is_confirmed_lwt: true, @@ -1714,7 +1712,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Two, is_confirmed_lwt: true, @@ -1739,7 +1737,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover is_confirmed_lwt: true, @@ -1762,7 +1760,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::One, is_confirmed_lwt: true, @@ -1785,7 +1783,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -1811,7 +1809,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -1836,7 +1834,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -1859,7 +1857,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::Two, is_confirmed_lwt: true, @@ -1884,7 +1882,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover is_confirmed_lwt: true, @@ -1927,7 +1925,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: None, // no keyspace consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -1947,7 +1945,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -1970,7 +1968,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -1990,7 +1988,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -2013,7 +2011,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, is_confirmed_lwt: true, @@ -2039,7 +2037,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::One, is_confirmed_lwt: true, @@ -2068,7 +2066,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 760 }), + token: Some(Token::new(760)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::Two, is_confirmed_lwt: true, @@ -2095,7 +2093,7 @@ mod tests { ..Default::default() }, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_SS_RF_2), consistency: Consistency::One, is_confirmed_lwt: true, @@ -3342,7 +3340,7 @@ mod latency_awareness { (E, too_few_measurements_slow()), ], routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, ..Default::default() @@ -3362,7 +3360,7 @@ mod latency_awareness { // Latency-awareness has old minimum average cached, so does not fire. preset_min_avg: Some(100 * min_avg), routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_3), consistency: Consistency::Quorum, ..Default::default() @@ -3391,7 +3389,7 @@ mod latency_awareness { (C, too_few_measurements_fast_leader()), ], routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some(KEYSPACE_NTS_RF_2), consistency: Consistency::Quorum, ..Default::default() @@ -3410,7 +3408,7 @@ mod latency_awareness { // No latency stats, so latency-awareness is a no-op. preset_min_avg: None, routing_info: RoutingInfo { - token: Some(Token { value: 160 }), + token: Some(Token::new(160)), keyspace: Some("invalid"), consistency: Consistency::Quorum, ..Default::default() diff --git a/scylla/src/transport/locator/mod.rs b/scylla/src/transport/locator/mod.rs index 4ff44891d1..84e3513dc9 100644 --- a/scylla/src/transport/locator/mod.rs +++ b/scylla/src/transport/locator/mod.rs @@ -750,8 +750,7 @@ mod tests { // For each case (token, limit_to_dc, strategy), we are checking // that ReplicasOrdered yields replicas in the expected order. let check = |token, limit_to_dc, strategy, expected| { - let replica_set = - locator.replicas_for_token(Token { value: token }, strategy, limit_to_dc); + let replica_set = locator.replicas_for_token(Token::new(token), strategy, limit_to_dc); let replicas_ordered = replica_set.into_replicas_ordered(); let ids: Vec<_> = replicas_ordered .into_iter() diff --git a/scylla/src/transport/locator/precomputed_replicas.rs b/scylla/src/transport/locator/precomputed_replicas.rs index 0a9256b7e8..84acc04aad 100644 --- a/scylla/src/transport/locator/precomputed_replicas.rs +++ b/scylla/src/transport/locator/precomputed_replicas.rs @@ -253,10 +253,8 @@ mod tests { ); let check = |token, replication_factor, expected_node_ids| { - let replicas = precomputed_replicas.get_precomputed_simple_strategy_replicas( - Token { value: token }, - replication_factor, - ); + let replicas = precomputed_replicas + .get_precomputed_simple_strategy_replicas(Token::new(token), replication_factor); let ids: Vec = replicas .unwrap() @@ -271,7 +269,7 @@ mod tests { check(160, 1, vec![F]); check(160, 2, vec![F, A]); assert_eq!( - precomputed_replicas.get_precomputed_simple_strategy_replicas(Token { value: 160 }, 3), + precomputed_replicas.get_precomputed_simple_strategy_replicas(Token::new(160), 3), None ); @@ -297,7 +295,7 @@ mod tests { let check = |token, dc, replication_factor, expected_node_ids| { let replicas = precomputed_replicas.get_precomputed_network_strategy_replicas( - Token { value: token }, + Token::new(token), dc, replication_factor, ); @@ -317,7 +315,7 @@ mod tests { check(160, "eu", 3, vec![A, C, G]); assert_eq!( precomputed_replicas.get_precomputed_network_strategy_replicas( - Token { value: 160 }, + Token::new(160), "eu", 4 ), @@ -330,7 +328,7 @@ mod tests { check(160, "us", 3, vec![F, D, E]); assert_eq!( precomputed_replicas.get_precomputed_network_strategy_replicas( - Token { value: 160 }, + Token::new(160), "us", 4 ), diff --git a/scylla/src/transport/locator/replication_info.rs b/scylla/src/transport/locator/replication_info.rs index d17ed361ba..179372471b 100644 --- a/scylla/src/transport/locator/replication_info.rs +++ b/scylla/src/transport/locator/replication_info.rs @@ -218,8 +218,8 @@ mod tests { let replication_info = ReplicationInfo::new(ring); let check = |token, replication_factor, expected_node_ids| { - let replicas = replication_info - .simple_strategy_replicas(Token { value: token }, replication_factor); + let replicas = + replication_info.simple_strategy_replicas(Token::new(token), replication_factor); let ids: Vec = replicas.map(|node| node.address.port()).collect(); assert_eq!(ids, expected_node_ids); @@ -252,8 +252,7 @@ mod tests { let replication_info = ReplicationInfo::new(ring); let check = |token, dc, rf, expected| { - let replicas = - replication_info.nts_replicas_in_datacenter(Token { value: token }, dc, rf); + let replicas = replication_info.nts_replicas_in_datacenter(Token::new(token), dc, rf); let ids: Vec = replicas.map(|node| node.address.port()).collect(); assert_eq!(ids, expected); diff --git a/scylla/src/transport/locator/test.rs b/scylla/src/transport/locator/test.rs index bb74ee0469..c452fec292 100644 --- a/scylla/src/transport/locator/test.rs +++ b/scylla/src/transport/locator/test.rs @@ -51,11 +51,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("eu".into()), rack: Some("r1".to_owned()), address: id_to_invalid_addr(1), - tokens: vec![ - Token { value: 50 }, - Token { value: 250 }, - Token { value: 400 }, - ], + tokens: vec![Token::new(50), Token::new(250), Token::new(400)], host_id: Uuid::new_v4(), }, Peer { @@ -63,11 +59,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("eu".into()), rack: Some("r1".to_owned()), address: id_to_invalid_addr(2), - tokens: vec![ - Token { value: 100 }, - Token { value: 600 }, - Token { value: 900 }, - ], + tokens: vec![Token::new(100), Token::new(600), Token::new(900)], host_id: Uuid::new_v4(), }, Peer { @@ -75,11 +67,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("eu".into()), rack: Some("r1".to_owned()), address: id_to_invalid_addr(3), - tokens: vec![ - Token { value: 300 }, - Token { value: 650 }, - Token { value: 700 }, - ], + tokens: vec![Token::new(300), Token::new(650), Token::new(700)], host_id: Uuid::new_v4(), }, Peer { @@ -87,7 +75,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("us".into()), rack: Some("r1".to_owned()), address: id_to_invalid_addr(4), - tokens: vec![Token { value: 350 }, Token { value: 550 }], + tokens: vec![Token::new(350), Token::new(550)], host_id: Uuid::new_v4(), }, Peer { @@ -95,7 +83,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("us".into()), rack: Some("r1".to_owned()), address: id_to_invalid_addr(5), - tokens: vec![Token { value: 150 }, Token { value: 750 }], + tokens: vec![Token::new(150), Token::new(750)], host_id: Uuid::new_v4(), }, Peer { @@ -103,7 +91,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("us".into()), rack: Some("r2".to_owned()), address: id_to_invalid_addr(6), - tokens: vec![Token { value: 200 }, Token { value: 450 }], + tokens: vec![Token::new(200), Token::new(450)], host_id: Uuid::new_v4(), }, Peer { @@ -111,7 +99,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { datacenter: Some("eu".into()), rack: Some("r2".to_owned()), address: id_to_invalid_addr(7), - tokens: vec![Token { value: 500 }, Token { value: 800 }], + tokens: vec![Token::new(500), Token::new(800)], host_id: Uuid::new_v4(), }, ]; @@ -250,7 +238,7 @@ fn test_datacenter_info(locator: &ReplicaLocator) { fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 450 }, + Token::new(450), &Strategy::SimpleStrategy { replication_factor: 3, }, @@ -261,7 +249,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 450 }, + Token::new(450), &Strategy::SimpleStrategy { replication_factor: 4, }, @@ -272,7 +260,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 201 }, + Token::new(201), &Strategy::SimpleStrategy { replication_factor: 4, }, @@ -283,7 +271,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 201 }, + Token::new(201), &Strategy::SimpleStrategy { replication_factor: 0, }, @@ -296,7 +284,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { // in that dc. assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 50 }, + Token::new(50), &Strategy::SimpleStrategy { replication_factor: 1, }, @@ -307,7 +295,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 50 }, + Token::new(50), &Strategy::SimpleStrategy { replication_factor: 3, }, @@ -318,7 +306,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 50 }, + Token::new(50), &Strategy::SimpleStrategy { replication_factor: 3, }, @@ -331,7 +319,7 @@ fn test_simple_strategy_replicas(locator: &ReplicaLocator) { fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)] .into_iter() @@ -344,7 +332,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)] .into_iter() @@ -357,7 +345,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)] .into_iter() @@ -370,7 +358,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 1)] .into_iter() @@ -386,7 +374,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("unknown".to_owned(), 2), ("us".to_owned(), 1)] .into_iter() @@ -399,7 +387,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { assert_replica_set_equal_to( locator.replicas_for_token( - Token { value: 800 }, + Token::new(800), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)] .into_iter() @@ -414,7 +402,7 @@ fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) { fn test_replica_set_len(locator: &ReplicaLocator) { let merged_nts_len = locator .replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 1)] .into_iter() @@ -429,7 +417,7 @@ fn test_replica_set_len(locator: &ReplicaLocator) { // replica set length was limited. let capped_merged_nts_len = locator .replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 69), ("us".to_owned(), 1)] .into_iter() @@ -442,7 +430,7 @@ fn test_replica_set_len(locator: &ReplicaLocator) { let filtered_nts_len = locator .replicas_for_token( - Token { value: 450 }, + Token::new(450), &Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 1)] .into_iter() @@ -455,7 +443,7 @@ fn test_replica_set_len(locator: &ReplicaLocator) { let ss_len = locator .replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::SimpleStrategy { replication_factor: 3, }, @@ -467,7 +455,7 @@ fn test_replica_set_len(locator: &ReplicaLocator) { // Test if the replica set length was capped when a datacenter name was provided. let filtered_ss_len = locator .replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::SimpleStrategy { replication_factor: 3, }, @@ -492,8 +480,7 @@ fn test_replica_set_choose(locator: &ReplicaLocator) { let mut rng = ChaCha8Rng::seed_from_u64(69); for strategy in strategies { - let replica_set_generator = - || locator.replicas_for_token(Token { value: 75 }, &strategy, None); + let replica_set_generator = || locator.replicas_for_token(Token::new(75), &strategy, None); // Verify that after a certain number of random selections, the set of selected replicas // will contain all nodes in the ring (replica set was created using a strategy with @@ -533,8 +520,7 @@ fn test_replica_set_choose_filtered(locator: &ReplicaLocator) { let mut rng = ChaCha8Rng::seed_from_u64(69); for strategy in strategies { - let replica_set_generator = - || locator.replicas_for_token(Token { value: 75 }, &strategy, None); + let replica_set_generator = || locator.replicas_for_token(Token::new(75), &strategy, None); // Verify that after a certain number of random selections with a dc filter, the set of // selected replicas will contain all nodes in the specified dc ring. @@ -561,7 +547,7 @@ fn test_replica_set_choose_filtered(locator: &ReplicaLocator) { // Check that choosing from an empty set yields no value. let empty = locator .replicas_for_token( - Token { value: 75 }, + Token::new(75), &Strategy::LocalStrategy, Some("unknown_dc_name"), ) diff --git a/scylla/src/transport/locator/token_ring.rs b/scylla/src/transport/locator/token_ring.rs index cd5b4de8f3..deb90b5d60 100644 --- a/scylla/src/transport/locator/token_ring.rs +++ b/scylla/src/transport/locator/token_ring.rs @@ -77,117 +77,117 @@ mod tests { #[test] fn test_token_ring() { let ring_data = [ - (Token { value: -30 }, -3), - (Token { value: -20 }, -2), - (Token { value: -10 }, -1), - (Token { value: 0 }, 0), - (Token { value: 10 }, 1), - (Token { value: 20 }, 2), - (Token { value: 30 }, 3), + (Token::new(-30), -3), + (Token::new(-20), -2), + (Token::new(-10), -1), + (Token::new(0), 0), + (Token::new(10), 1), + (Token::new(20), 2), + (Token::new(30), 3), ]; let ring: TokenRing = TokenRing::new(ring_data.into_iter()); assert_eq!( - ring.ring_range(Token { value: -35 }) + ring.ring_range(Token::new(-35)) .cloned() .collect::>(), vec![-3, -2, -1, 0, 1, 2, 3] ); assert_eq!( - ring.ring_range(Token { value: -30 }) + ring.ring_range(Token::new(-30)) .cloned() .collect::>(), vec![-3, -2, -1, 0, 1, 2, 3] ); assert_eq!( - ring.ring_range(Token { value: -25 }) + ring.ring_range(Token::new(-25)) .cloned() .collect::>(), vec![-2, -1, 0, 1, 2, 3, -3] ); assert_eq!( - ring.ring_range(Token { value: -20 }) + ring.ring_range(Token::new(-20)) .cloned() .collect::>(), vec![-2, -1, 0, 1, 2, 3, -3] ); assert_eq!( - ring.ring_range(Token { value: -15 }) + ring.ring_range(Token::new(-15)) .cloned() .collect::>(), vec![-1, 0, 1, 2, 3, -3, -2] ); assert_eq!( - ring.ring_range(Token { value: -10 }) + ring.ring_range(Token::new(-10)) .cloned() .collect::>(), vec![-1, 0, 1, 2, 3, -3, -2] ); assert_eq!( - ring.ring_range(Token { value: -5 }) + ring.ring_range(Token::new(-5)) .cloned() .collect::>(), vec![0, 1, 2, 3, -3, -2, -1] ); assert_eq!( - ring.ring_range(Token { value: 0 }) + ring.ring_range(Token::new(0)) .cloned() .collect::>(), vec![0, 1, 2, 3, -3, -2, -1] ); assert_eq!( - ring.ring_range(Token { value: 5 }) + ring.ring_range(Token::new(5)) .cloned() .collect::>(), vec![1, 2, 3, -3, -2, -1, 0] ); assert_eq!( - ring.ring_range(Token { value: 10 }) + ring.ring_range(Token::new(10)) .cloned() .collect::>(), vec![1, 2, 3, -3, -2, -1, 0] ); assert_eq!( - ring.ring_range(Token { value: 15 }) + ring.ring_range(Token::new(15)) .cloned() .collect::>(), vec![2, 3, -3, -2, -1, 0, 1] ); assert_eq!( - ring.ring_range(Token { value: 20 }) + ring.ring_range(Token::new(20)) .cloned() .collect::>(), vec![2, 3, -3, -2, -1, 0, 1] ); assert_eq!( - ring.ring_range(Token { value: 25 }) + ring.ring_range(Token::new(25)) .cloned() .collect::>(), vec![3, -3, -2, -1, 0, 1, 2] ); assert_eq!( - ring.ring_range(Token { value: 30 }) + ring.ring_range(Token::new(30)) .cloned() .collect::>(), vec![3, -3, -2, -1, 0, 1, 2] ); assert_eq!( - ring.ring_range(Token { value: 35 }) + ring.ring_range(Token::new(35)) .cloned() .collect::>(), vec![-3, -2, -1, 0, 1, 2, 3] diff --git a/scylla/src/transport/partitioner.rs b/scylla/src/transport/partitioner.rs index 7a9f4b083a..d5cff1c37a 100644 --- a/scylla/src/transport/partitioner.rs +++ b/scylla/src/transport/partitioner.rs @@ -255,9 +255,7 @@ impl PartitionerHasher for Murmur3PartitionerHasher { h1 += h2; h2 += h1; - Token { - value: (((h2.0 as i128) << 64) | h1.0 as i128) as i64, - } + Token::new((((h2.0 as i128) << 64) | h1.0 as i128) as i64) } } @@ -303,9 +301,7 @@ impl PartitionerHasher for CDCPartitionerHasher { // If the buffer is full, we can compute and fix the token. if *len == Self::BUF_CAPACITY { - let token = Token { - value: (&mut &buf[..]).get_i64(), - }; + let token = Token::new((&mut &buf[..]).get_i64()); self.state = CDCPartitionerHasherState::Computed(token); } } @@ -315,7 +311,12 @@ impl PartitionerHasher for CDCPartitionerHasher { fn finish(&self) -> Token { match self.state { - CDCPartitionerHasherState::Feeding { .. } => Token { value: i64::MIN }, + // Looking at Scylla code it seems that here we actually want token with this value. + // If the value is too short Scylla returns `dht::minimum_token()`: + // https://github.com/scylladb/scylladb/blob/4be70bfc2bc7f133cab492b4aac7bab9c790a48c/cdc/cdc_partitioner.cc#L32 + // When you call `long_token` on `minimum_token` it will actually return `i64::MIN`: + // https://github.com/scylladb/scylladb/blob/0a7854ea4de04f20b71326ba5940b5fac6f7241a/dht/token.cc#L21-L35 + CDCPartitionerHasherState::Feeding { .. } => Token::new_denormalized(i64::MIN), CDCPartitionerHasherState::Computed(token) => token, } } @@ -373,7 +374,7 @@ mod tests { use super::{CDCPartitioner, Murmur3Partitioner, Partitioner}; fn assert_correct_murmur3_hash(pk: &'static str, expected_hash: i64) { - let hash = Murmur3Partitioner.hash_one(pk.as_bytes()).value; + let hash = Murmur3Partitioner.hash_one(pk.as_bytes()).value(); assert_eq!(hash, expected_hash); } @@ -390,7 +391,7 @@ mod tests { } fn assert_correct_cdc_hash(pk: &'static str, expected_hash: i64) { - let hash = CDCPartitioner.hash_one(pk.as_bytes()).value; + let hash = CDCPartitioner.hash_one(pk.as_bytes()).value(); assert_eq!(hash, expected_hash); } diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index f4f5ab2365..4586c378b9 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -1979,7 +1979,7 @@ impl RequestSpan { ); } if let Some(token) = token { - span.record("token", token.value); + span.record("token", token.value()); } Self { diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index b6c9c20ba4..8ff9675cb6 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -227,13 +227,13 @@ async fn test_prepared_statement() { .unwrap() .rows .unwrap(); - let token = Token { - value: rs.first().unwrap().columns[0] + let token = Token::new( + rs.first().unwrap().columns[0] .as_ref() .unwrap() .as_bigint() .unwrap(), - }; + ); let prepared_token = Murmur3Partitioner .hash_one(&prepared_statement.compute_partition_key(&values).unwrap()); assert_eq!(token, prepared_token); @@ -252,13 +252,13 @@ async fn test_prepared_statement() { .unwrap() .rows .unwrap(); - let token = Token { - value: rs.first().unwrap().columns[0] + let token = Token::new( + rs.first().unwrap().columns[0] .as_ref() .unwrap() .as_bigint() .unwrap(), - }; + ); let prepared_token = Murmur3Partitioner.hash_one( &prepared_complex_pk_statement .compute_partition_key(&values) @@ -525,13 +525,13 @@ async fn test_token_calculation() { .unwrap() .rows .unwrap(); - let token = Token { - value: rs.first().unwrap().columns[0] + let token = Token::new( + rs.first().unwrap().columns[0] .as_ref() .unwrap() .as_bigint() .unwrap(), - }; + ); let prepared_token = Murmur3Partitioner .hash_one(&prepared_statement.compute_partition_key(&values).unwrap()); assert_eq!(token, prepared_token); @@ -2799,7 +2799,8 @@ async fn test_manual_primary_key_computation() { .unwrap(); println!( "by_prepared: {}, by_hand: {}", - token_by_prepared.value, token_by_hand.value + token_by_prepared.value(), + token_by_hand.value() ); assert_eq!(token_by_prepared, token_by_hand); } diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index 667e164fc9..78b9a09e2f 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -429,9 +429,7 @@ impl Metadata { Peer { address: endpoint.address(), - tokens: vec![Token { - value: token as i64, - }], + tokens: vec![Token::new(token as i64)], datacenter: None, rack: None, host_id: Uuid::new_v4(), @@ -887,9 +885,7 @@ async fn create_peer_from_row( // Also, we could implement support for Cassandra's other standard partitioners // like RandomPartitioner or ByteOrderedPartitioner. trace!("Couldn't parse tokens as 64-bit integers: {}, proceeding with a dummy token. If you're using a partitioner with different token size, consider migrating to murmur3", e); - vec![Token { - value: rand::thread_rng().gen::(), - }] + vec![Token::new(rand::thread_rng().gen::())] } };