Skip to content

Commit

Permalink
Move handling of MissingUserDefinedType to ClusterData::new
Browse files Browse the repository at this point in the history
This commit changes type of `keyspaces` field in `Metadata` from
`HashMap<String, Keyspace>` to `HashMap<String, Result<Keyspace, MissingUserDefinedType>>`.
Because of that, it also removed `MissingUserDefinedType` handling from
`query_metadata`. Now handling this error is done in `ClusterData::new`.
This has an advantage: we can use older version of the keyspace metadata
if the new version has this error.
  • Loading branch information
Lorak-mmk committed Jan 5, 2025
1 parent be9fe86 commit 3ec27b9
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 32 deletions.
26 changes: 24 additions & 2 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ impl Cluster {
&None,
host_filter.as_deref(),
TabletsInfo::new(),
&HashMap::new(),
)
.await;
cluster_data.wait_until_all_pools_are_initialized().await;
Expand Down Expand Up @@ -278,6 +279,7 @@ impl ClusterData {
used_keyspace: &Option<VerifiedKeyspaceName>,
host_filter: Option<&dyn HostFilter>,
mut tablets: TabletsInfo,
old_keyspaces: &HashMap<String, Keyspace>,
) -> Self {
// Create new updated known_peers and ring
let mut new_known_peers: HashMap<Uuid, Arc<Node>> =
Expand Down Expand Up @@ -323,6 +325,26 @@ impl ClusterData {
}
}

let keyspaces: HashMap<String, Keyspace> = metadata
.keyspaces
.into_iter()
.filter_map(|(ks_name, ks)| match ks {
Ok(ks) => Some((ks_name, ks)),
Err(e) => {
if let Some(old_ks) = old_keyspaces.get(&ks_name) {
warn!("Encountered an error while processing metadata of keyspace \"{ks_name}\": {e}.\
Re-using older version of this keyspace metadata");
Some((ks_name, old_ks.clone()))
} else {
warn!("Encountered an error while processing metadata of keyspace \"{ks_name}\": {e}.\
No previous version of this keyspace metadata found, so it will not be\
present in ClusterData until next refresh.");
None
}
}
})
.collect();

{
let removed_nodes = {
let mut removed_nodes = HashSet::new();
Expand All @@ -336,7 +358,7 @@ impl ClusterData {
};

let table_predicate = |spec: &TableSpec| {
if let Some(ks) = metadata.keyspaces.get(spec.ks_name()) {
if let Some(ks) = keyspaces.get(spec.ks_name()) {
ks.tables.contains_key(spec.table_name())
} else {
false
Expand Down Expand Up @@ -364,7 +386,6 @@ impl ClusterData {
)
}

let keyspaces = metadata.keyspaces;
let (locator, keyspaces) = tokio::task::spawn_blocking(move || {
let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy);
let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies, tablets);
Expand Down Expand Up @@ -706,6 +727,7 @@ impl ClusterWorker {
&self.used_keyspace,
self.host_filter.as_deref(),
cluster_data.locator.tablets.clone(),
&cluster_data.keyspaces,
)
.await,
);
Expand Down
3 changes: 3 additions & 0 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,7 @@ mod tests {
&None,
None,
TabletsInfo::new(),
&HashMap::new(),
)
.await
}
Expand Down Expand Up @@ -1440,6 +1441,7 @@ mod tests {
&None,
None,
TabletsInfo::new(),
&HashMap::new(),
)
.await
}
Expand Down Expand Up @@ -2489,6 +2491,7 @@ mod tests {
Some(&FHostFilter)
},
TabletsInfo::new(),
&HashMap::new(),
)
.await;

Expand Down
48 changes: 42 additions & 6 deletions scylla/src/transport/locator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,43 +857,79 @@ mod tests {
check(
160,
None,
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_3)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_3,
vec![F, A, C, D, G, E],
);
check(
160,
None,
&metadata.keyspaces.get(KEYSPACE_NTS_RF_2).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_2)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_2,
vec![F, A, D, G],
);
check(
160,
None,
&metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_SS_RF_2)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_SS_RF_2,
vec![F, A],
);

check(
160,
Some("eu"),
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_3)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_3,
vec![A, C, G],
);
check(
160,
Some("us"),
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_3)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_3,
vec![F, D, E],
);
check(
160,
Some("eu"),
&metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_SS_RF_2)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_SS_RF_2,
vec![A],
);
Expand Down
8 changes: 4 additions & 4 deletions scylla/src/transport/locator/precomputed_replicas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ mod tests {
let mut metadata = mock_metadata_for_token_aware_tests();
metadata.keyspaces = [(
"SimpleStrategy{rf=2}".into(),
Keyspace {
Ok(Keyspace {
strategy: Strategy::SimpleStrategy {
replication_factor: 2,
},
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
},
}),
)]
.iter()
.cloned()
Expand All @@ -251,7 +251,7 @@ mod tests {
metadata
.keyspaces
.values()
.map(|keyspace| &keyspace.strategy),
.map(|keyspace| &keyspace.as_ref().unwrap().strategy),
);

let check = |token, replication_factor, expected_node_ids| {
Expand Down Expand Up @@ -293,7 +293,7 @@ mod tests {
metadata
.keyspaces
.values()
.map(|keyspace| &keyspace.strategy),
.map(|keyspace| &keyspace.as_ref().unwrap().strategy),
);

let check = |token, dc, replication_factor, expected_node_ids| {
Expand Down
17 changes: 10 additions & 7 deletions scylla/src/transport/locator/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,18 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
let keyspaces = [
(
KEYSPACE_SS_RF_2.into(),
Keyspace {
Ok(Keyspace {
strategy: Strategy::SimpleStrategy {
replication_factor: 2,
},
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
},
}),
),
(
KEYSPACE_NTS_RF_2.into(),
Keyspace {
Ok(Keyspace {
strategy: Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 2)]
.into_iter()
Expand All @@ -140,11 +140,11 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
},
}),
),
(
KEYSPACE_NTS_RF_3.into(),
Keyspace {
Ok(Keyspace {
strategy: Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 3), ("us".to_owned(), 3)]
.into_iter()
Expand All @@ -153,7 +153,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
},
}),
),
]
.iter()
Expand Down Expand Up @@ -201,7 +201,10 @@ pub(crate) fn create_ring(metadata: &Metadata) -> impl Iterator<Item = (Token, A

pub(crate) fn create_locator(metadata: &Metadata) -> ReplicaLocator {
let ring = create_ring(metadata);
let strategies = metadata.keyspaces.values().map(|ks| &ks.strategy);
let strategies = metadata
.keyspaces
.values()
.map(|ks| &ks.as_ref().unwrap().strategy);

ReplicaLocator::new(ring, strategies, TabletsInfo::new())
}
Expand Down
15 changes: 2 additions & 13 deletions scylla/src/transport/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub(crate) struct MetadataReader {
/// Describes all metadata retrieved from the cluster
pub(crate) struct Metadata {
pub(crate) peers: Vec<Peer>,
pub(crate) keyspaces: HashMap<String, Keyspace>,
pub(crate) keyspaces: HashMap<String, Result<Keyspace, MissingUserDefinedType>>,
}

#[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way
Expand Down Expand Up @@ -280,7 +280,7 @@ pub struct UserDefinedType {
/// Represents a user defined type whose definition is missing from the metadata.
#[derive(Clone, Debug, Error)]
#[error("Missing UDT: {keyspace}, {name}")]
struct MissingUserDefinedType {
pub(crate) struct MissingUserDefinedType {
name: String,
keyspace: String,
}
Expand Down Expand Up @@ -791,17 +791,6 @@ async fn query_metadata(
return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists).into());
}

let keyspaces = keyspaces
.into_iter()
.filter_map(|(ks_name, ks)| match ks {
Ok(ks) => Some((ks_name, ks)),
Err(e) => {
warn!("Error while processing keyspace \"{ks_name}\": {e}");
None
}
})
.collect();

Ok(Metadata { peers, keyspaces })
}

Expand Down

0 comments on commit 3ec27b9

Please sign in to comment.