From b3468f91ff2b99352670ec195b93fab5df564255 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sat, 4 Jan 2025 18:23:36 +0100 Subject: [PATCH 1/6] topology: accept keyspace udts in into_cql_type Previously this method accepted a map with UDTs for all keyspaces. This is not necessary: UDTs in one keyspace can not reference UDTs in another keyspace. --- scylla/src/cluster/metadata.rs | 39 +++++++++++++++++----------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index 922c4170f..d27961326 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -221,28 +221,25 @@ impl PreCqlType { pub(crate) fn into_cql_type( self, keyspace_name: &String, - udts: &HashMap>>, + keyspace_udts: &HashMap>, ) -> CqlType { match self { PreCqlType::Native(n) => CqlType::Native(n), PreCqlType::Collection { frozen, type_ } => CqlType::Collection { frozen, - type_: type_.into_collection_type(keyspace_name, udts), + type_: type_.into_collection_type(keyspace_name, keyspace_udts), }, PreCqlType::Tuple(t) => CqlType::Tuple( t.into_iter() - .map(|t| t.into_cql_type(keyspace_name, udts)) + .map(|t| t.into_cql_type(keyspace_name, keyspace_udts)) .collect(), ), PreCqlType::Vector { type_, dimensions } => CqlType::Vector { - type_: Box::new(type_.into_cql_type(keyspace_name, udts)), + type_: Box::new(type_.into_cql_type(keyspace_name, keyspace_udts)), dimensions, }, PreCqlType::UserDefinedType { frozen, name } => { - let definition = match udts - .get(keyspace_name) - .and_then(|per_keyspace_udts| per_keyspace_udts.get(&name)) - { + let definition = match keyspace_udts.get(&name) { Some(def) => Ok(def.clone()), None => Err(MissingUserDefinedType { name, @@ -360,18 +357,18 @@ impl PreCollectionType { pub(crate) fn into_collection_type( self, keyspace_name: &String, - udts: &HashMap>>, + keyspace_udts: &HashMap>, ) -> CollectionType { match self { PreCollectionType::List(t) => { - CollectionType::List(Box::new(t.into_cql_type(keyspace_name, udts))) + CollectionType::List(Box::new(t.into_cql_type(keyspace_name, keyspace_udts))) } PreCollectionType::Map(tk, tv) => CollectionType::Map( - Box::new(tk.into_cql_type(keyspace_name, udts)), - Box::new(tv.into_cql_type(keyspace_name, udts)), + Box::new(tk.into_cql_type(keyspace_name, keyspace_udts)), + Box::new(tv.into_cql_type(keyspace_name, keyspace_udts)), ), PreCollectionType::Set(t) => { - CollectionType::Set(Box::new(t.into_cql_type(keyspace_name, udts))) + CollectionType::Set(Box::new(t.into_cql_type(keyspace_name, keyspace_udts))) } } } @@ -1129,22 +1126,23 @@ async fn query_user_defined_types( field_types, } = udt_row; + let keyspace_name_clone = keyspace_name.clone(); + let keyspace_entry = udts.entry(keyspace_name).or_insert_with(HashMap::new); + let mut fields = Vec::with_capacity(field_names.len()); for (field_name, field_type) in field_names.into_iter().zip(field_types.into_iter()) { - let cql_type = field_type.into_cql_type(&keyspace_name, &udts); + let cql_type = field_type.into_cql_type(&keyspace_name_clone, keyspace_entry); fields.push((field_name, cql_type)); } let udt = Arc::new(UserDefinedType { name: type_name.clone(), - keyspace: keyspace_name.clone(), + keyspace: keyspace_name_clone, field_types: fields, }); - udts.entry(keyspace_name) - .or_insert_with(HashMap::new) - .insert(type_name, udt); + keyspace_entry.insert(type_name, udt); } Ok(udts) @@ -1509,6 +1507,8 @@ async fn query_tables_schema( } ); + let empty_map = HashMap::new(); + let mut tables_schema = HashMap::new(); rows.map(|row_result| { @@ -1518,8 +1518,9 @@ async fn query_tables_schema( return Ok::<_, QueryError>(()); } + let keyspace_udts = udts.get(&keyspace_name).unwrap_or(&empty_map); let pre_cql_type = map_string_to_cql_type(&type_)?; - let cql_type = pre_cql_type.into_cql_type(&keyspace_name, udts); + let cql_type = pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts); let kind = ColumnKind::from_str(&kind).map_err(|_| { MetadataError::Tables(TablesMetadataError::UnknownColumnKind { From 1cdbf8e769aa51d06d88e74f961dca43f7a1fe4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sat, 4 Jan 2025 18:03:10 +0100 Subject: [PATCH 2/6] topology: De-duplicate query_tables_schema calls This function performs a request that fetches columns for all tables and views. It is potentially the most performance-impactful part of the schema fetching process, and it was unnecessarily called twice: in query_tables and in query_views. It can be easily prevented by calling the function earlier and passing the result to query_tables and query_views. --- scylla/src/cluster/metadata.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index d27961326..cd051bc50 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -1012,9 +1012,17 @@ async fn query_keyspaces( let (mut all_tables, mut all_views, mut all_user_defined_types) = if fetch_schema { let udts = query_user_defined_types(conn, keyspaces_to_fetch).await?; + let mut tables_schema = query_tables_schema(conn, keyspaces_to_fetch, &udts).await?; ( - query_tables(conn, keyspaces_to_fetch, &udts).await?, - query_views(conn, keyspaces_to_fetch, &udts).await?, + // We pass the mutable reference to the same map to the both functions. + // First function fetches `system_schema.tables`, and removes found + // table from `tables_schema`. + // Second does the same for `system_schema.views`. + // The assumption here is that no keys (table names) can appear in both + // of those schema table. + // As far as we know this assumption is true for Scylla and Cassandra. + query_tables(conn, keyspaces_to_fetch, &mut tables_schema).await?, + query_views(conn, keyspaces_to_fetch, &mut tables_schema).await?, udts, ) } else { @@ -1411,7 +1419,7 @@ mod toposort_tests { async fn query_tables( conn: &Arc, keyspaces_to_fetch: &[String], - udts: &HashMap>>, + tables: &mut HashMap<(String, String), Table>, ) -> Result>, QueryError> { let rows = query_filter_keyspace_name::<(String, String)>( conn, @@ -1420,7 +1428,6 @@ async fn query_tables( |err| MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err)), ); let mut result = HashMap::new(); - let mut tables = query_tables_schema(conn, keyspaces_to_fetch, udts).await?; rows.map(|row_result| { let keyspace_and_table_name = row_result?; @@ -1448,7 +1455,7 @@ async fn query_tables( async fn query_views( conn: &Arc, keyspaces_to_fetch: &[String], - udts: &HashMap>>, + tables: &mut HashMap<(String, String), Table>, ) -> Result>, QueryError> { let rows = query_filter_keyspace_name::<(String, String, String)>( conn, @@ -1458,7 +1465,6 @@ async fn query_views( ); let mut result = HashMap::new(); - let mut tables = query_tables_schema(conn, keyspaces_to_fetch, udts).await?; rows.map(|row_result| { let (keyspace_name, view_name, base_table_name) = row_result?; From 7fa1b78c17a05b94369b71e3cceba87ee39021ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sat, 4 Jan 2025 17:52:37 +0100 Subject: [PATCH 3/6] topology: Introduce type aliases to increase readability --- scylla/src/cluster/metadata.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index cd051bc50..7c412696a 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -54,6 +54,10 @@ use crate::errors::{ ProtocolError, TablesMetadataError, UdtMetadataError, ViewsMetadataError, }; +type PerKeyspace = HashMap; +type PerTable = HashMap; +type PerKsTable = HashMap<(String, String), T>; + /// Allows to read current metadata from the cluster pub(crate) struct MetadataReader { connection_config: ConnectionConfig, @@ -221,7 +225,7 @@ impl PreCqlType { pub(crate) fn into_cql_type( self, keyspace_name: &String, - keyspace_udts: &HashMap>, + keyspace_udts: &PerTable>, ) -> CqlType { match self { PreCqlType::Native(n) => CqlType::Native(n), @@ -357,7 +361,7 @@ impl PreCollectionType { pub(crate) fn into_collection_type( self, keyspace_name: &String, - keyspace_udts: &HashMap>, + keyspace_udts: &PerTable>, ) -> CollectionType { match self { PreCollectionType::List(t) => { @@ -998,7 +1002,7 @@ async fn query_keyspaces( conn: &Arc, keyspaces_to_fetch: &[String], fetch_schema: bool, -) -> Result, QueryError> { +) -> Result, QueryError> { let rows = query_filter_keyspace_name::<(String, HashMap)>( conn, "select keyspace_name, replication from system_schema.keyspaces", @@ -1099,7 +1103,7 @@ impl TryFrom for UdtRowWithParsedFieldTypes { async fn query_user_defined_types( conn: &Arc, keyspaces_to_fetch: &[String], -) -> Result>>, QueryError> { +) -> Result>>, QueryError> { let rows = query_filter_keyspace_name::( conn, "select keyspace_name, type_name, field_names, field_types from system_schema.types", @@ -1419,8 +1423,8 @@ mod toposort_tests { async fn query_tables( conn: &Arc, keyspaces_to_fetch: &[String], - tables: &mut HashMap<(String, String), Table>, -) -> Result>, QueryError> { + tables: &mut PerKsTable, +) -> Result>, QueryError> { let rows = query_filter_keyspace_name::<(String, String)>( conn, "SELECT keyspace_name, table_name FROM system_schema.tables", @@ -1455,8 +1459,8 @@ async fn query_tables( async fn query_views( conn: &Arc, keyspaces_to_fetch: &[String], - tables: &mut HashMap<(String, String), Table>, -) -> Result>, QueryError> { + tables: &mut PerKsTable
, +) -> Result>, QueryError> { let rows = query_filter_keyspace_name::<(String, String, String)>( conn, "SELECT keyspace_name, view_name, base_table_name FROM system_schema.views", @@ -1498,8 +1502,8 @@ async fn query_views( async fn query_tables_schema( conn: &Arc, keyspaces_to_fetch: &[String], - udts: &HashMap>>, -) -> Result, QueryError> { + udts: &PerKeyspace>>, +) -> Result, QueryError> { // Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of // type EmptyType for dense tables. This resolves into this CQL type name. // This column shouldn't be exposed to the user but is currently exposed in system tables. @@ -1736,7 +1740,7 @@ fn freeze_type(type_: PreCqlType) -> PreCqlType { async fn query_table_partitioners( conn: &Arc, -) -> Result>, QueryError> { +) -> Result>, QueryError> { let mut partitioner_query = Query::new( "select keyspace_name, table_name, partitioner from system_schema.scylla_tables", ); From 71476b78e3ff5a7846bd1aa2ecb903b28a79359a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sun, 5 Jan 2025 12:08:49 +0100 Subject: [PATCH 4/6] topology: Don't use MissingUserDefinedType in CqlType MissingUserDefinedType was an obstacle preventing us from unifying ColumnType and CqlType. This commit changes the topology fetching code to remove the keyspace where such an error happened from the metadata. A warning is printed in such case. --- scylla/src/client/session_test.rs | 8 +- scylla/src/cluster/metadata.rs | 283 +++++++++++++++++++++--------- 2 files changed, 201 insertions(+), 90 deletions(-) diff --git a/scylla/src/client/session_test.rs b/scylla/src/client/session_test.rs index 5f7e6f90c..b54b7cc78 100644 --- a/scylla/src/client/session_test.rs +++ b/scylla/src/client/session_test.rs @@ -1474,7 +1474,7 @@ fn udt_type_c_def(ks: &str) -> Arc { }), Box::new(CqlType::UserDefinedType { frozen: true, - definition: Ok(udt_type_b_def(ks)), + definition: udt_type_b_def(ks), }), ), }, @@ -1565,7 +1565,7 @@ async fn test_schema_types_in_metadata() { a.type_, CqlType::UserDefinedType { frozen: true, - definition: Ok(udt_type_a_def(&ks)), + definition: udt_type_a_def(&ks), } ); @@ -1575,7 +1575,7 @@ async fn test_schema_types_in_metadata() { b.type_, CqlType::UserDefinedType { frozen: false, - definition: Ok(udt_type_b_def(&ks)), + definition: udt_type_b_def(&ks), } ); @@ -1585,7 +1585,7 @@ async fn test_schema_types_in_metadata() { c.type_, CqlType::UserDefinedType { frozen: true, - definition: Ok(udt_type_c_def(&ks)) + definition: udt_type_c_def(&ks) } ); diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index 7c412696a..3ca36d1f1 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -44,6 +44,7 @@ use std::num::NonZeroUsize; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; +use thiserror::Error; use tokio::sync::{broadcast, mpsc}; use tracing::{debug, error, trace, warn}; use uuid::Uuid; @@ -55,8 +56,10 @@ use crate::errors::{ }; type PerKeyspace = HashMap; +type PerKeyspaceResult = PerKeyspace>; type PerTable = HashMap; type PerKsTable = HashMap<(String, String), T>; +type PerKsTableResult = PerKsTable>; /// Allows to read current metadata from the cluster pub(crate) struct MetadataReader { @@ -226,31 +229,37 @@ impl PreCqlType { self, keyspace_name: &String, keyspace_udts: &PerTable>, - ) -> CqlType { + ) -> Result { match self { - PreCqlType::Native(n) => CqlType::Native(n), - PreCqlType::Collection { frozen, type_ } => CqlType::Collection { - frozen, - type_: type_.into_collection_type(keyspace_name, keyspace_udts), - }, - PreCqlType::Tuple(t) => CqlType::Tuple( - t.into_iter() - .map(|t| t.into_cql_type(keyspace_name, keyspace_udts)) - .collect(), - ), - PreCqlType::Vector { type_, dimensions } => CqlType::Vector { - type_: Box::new(type_.into_cql_type(keyspace_name, keyspace_udts)), - dimensions, - }, + PreCqlType::Native(n) => Ok(CqlType::Native(n)), + PreCqlType::Collection { frozen, type_ } => type_ + .into_collection_type(keyspace_name, keyspace_udts) + .map(|inner| CqlType::Collection { + frozen, + type_: inner, + }), + PreCqlType::Tuple(t) => t + .into_iter() + .map(|t| t.into_cql_type(keyspace_name, keyspace_udts)) + .collect::, MissingUserDefinedType>>() + .map(CqlType::Tuple), + PreCqlType::Vector { type_, dimensions } => type_ + .into_cql_type(keyspace_name, keyspace_udts) + .map(|inner| CqlType::Vector { + type_: Box::new(inner), + dimensions, + }), PreCqlType::UserDefinedType { frozen, name } => { let definition = match keyspace_udts.get(&name) { - Some(def) => Ok(def.clone()), - None => Err(MissingUserDefinedType { - name, - keyspace: keyspace_name.clone(), - }), + Some(def) => def.clone(), + None => { + return Err(MissingUserDefinedType { + name, + keyspace: keyspace_name.clone(), + }) + } }; - CqlType::UserDefinedType { frozen, definition } + Ok(CqlType::UserDefinedType { frozen, definition }) } } } @@ -273,7 +282,7 @@ pub enum CqlType { UserDefinedType { frozen: bool, // Using Arc here in order not to have many copies of the same definition - definition: Result, MissingUserDefinedType>, + definition: Arc, }, } @@ -286,7 +295,8 @@ pub struct UserDefinedType { } /// Represents a user defined type whose definition is missing from the metadata. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Error)] +#[error("Missing UDT: {keyspace}, {name}")] pub struct MissingUserDefinedType { pub name: String, pub keyspace: String, @@ -362,18 +372,18 @@ impl PreCollectionType { self, keyspace_name: &String, keyspace_udts: &PerTable>, - ) -> CollectionType { + ) -> Result { match self { - PreCollectionType::List(t) => { - CollectionType::List(Box::new(t.into_cql_type(keyspace_name, keyspace_udts))) - } - PreCollectionType::Map(tk, tv) => CollectionType::Map( - Box::new(tk.into_cql_type(keyspace_name, keyspace_udts)), - Box::new(tv.into_cql_type(keyspace_name, keyspace_udts)), - ), - PreCollectionType::Set(t) => { - CollectionType::Set(Box::new(t.into_cql_type(keyspace_name, keyspace_udts))) - } + PreCollectionType::List(t) => t + .into_cql_type(keyspace_name, keyspace_udts) + .map(|inner| CollectionType::List(Box::new(inner))), + PreCollectionType::Map(tk, tv) => Ok(CollectionType::Map( + Box::new(tk.into_cql_type(keyspace_name, keyspace_udts)?), + Box::new(tv.into_cql_type(keyspace_name, keyspace_udts)?), + )), + PreCollectionType::Set(t) => t + .into_cql_type(keyspace_name, keyspace_udts) + .map(|inner| CollectionType::Set(Box::new(inner))), } } } @@ -790,6 +800,17 @@ async fn query_metadata( return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists).into()); } + let keyspaces = keyspaces + .into_iter() + .filter_map(|(ks_name, ks)| match ks { + Ok(ks) => Some((ks_name, ks)), + Err(e) => { + warn!("Error while processing keyspace \"{ks_name}\": {e}"); + None + } + }) + .collect(); + Ok(Metadata { peers, keyspaces }) } @@ -1002,7 +1023,7 @@ async fn query_keyspaces( conn: &Arc, keyspaces_to_fetch: &[String], fetch_schema: bool, -) -> Result, QueryError> { +) -> Result, QueryError> { let rows = query_filter_keyspace_name::<(String, HashMap)>( conn, "select keyspace_name, replication from system_schema.keyspaces", @@ -1042,11 +1063,28 @@ async fn query_keyspaces( error, }) })?; - let tables = all_tables.remove(&keyspace_name).unwrap_or_default(); - let views = all_views.remove(&keyspace_name).unwrap_or_default(); + let tables = all_tables + .remove(&keyspace_name) + .unwrap_or_else(|| Ok(HashMap::new())); + let views = all_views + .remove(&keyspace_name) + .unwrap_or_else(|| Ok(HashMap::new())); let user_defined_types = all_user_defined_types .remove(&keyspace_name) - .unwrap_or_default(); + .unwrap_or_else(|| Ok(HashMap::new())); + + // As you can notice, in this file we generally operate on two layers of errors: + // - Outer (QueryError) if something went wrong with querying the cluster. + // - Inner (currently MissingUserDefinedType, possibly other variants in the future) if the fetched metadata + // turned out to not be fully consistent. + // If there is an inner error, we want to drop metadata for the whole keyspace. + // This logic checks if either tables views or UDTs have such inner error, and returns it if so. + // Notice that in the error branch, return value is wrapped in `Ok` - but this is the + // outer error, so it just means there was no error while querying the cluster. + let (tables, views, user_defined_types) = match (tables, views, user_defined_types) { + (Ok(t), Ok(v), Ok(u)) => (t, v, u), + (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => return Ok((keyspace_name, Err(e))), + }; let keyspace = Keyspace { strategy, @@ -1055,7 +1093,7 @@ async fn query_keyspaces( user_defined_types, }; - Ok((keyspace_name, keyspace)) + Ok((keyspace_name, Ok(keyspace))) }) .try_collect() .await @@ -1103,7 +1141,7 @@ impl TryFrom for UdtRowWithParsedFieldTypes { async fn query_user_defined_types( conn: &Arc, keyspaces_to_fetch: &[String], -) -> Result>>, QueryError> { +) -> Result>, MissingUserDefinedType>, QueryError> { let rows = query_filter_keyspace_name::( conn, "select keyspace_name, type_name, field_names, field_types from system_schema.types", @@ -1130,7 +1168,7 @@ async fn query_user_defined_types( ); let mut udts = HashMap::new(); - for udt_row in udt_rows { + 'udts_loop: for udt_row in udt_rows { let UdtRowWithParsedFieldTypes { keyspace_name, type_name, @@ -1139,13 +1177,26 @@ async fn query_user_defined_types( } = udt_row; let keyspace_name_clone = keyspace_name.clone(); - let keyspace_entry = udts.entry(keyspace_name).or_insert_with(HashMap::new); + let keyspace_udts_result = udts + .entry(keyspace_name) + .or_insert_with(|| Ok(HashMap::new())); + + // If there was previously an error in this keyspace then it makes no sense to process this UDT. + let keyspace_udts = match keyspace_udts_result { + Ok(udts) => udts, + Err(_) => continue, + }; let mut fields = Vec::with_capacity(field_names.len()); for (field_name, field_type) in field_names.into_iter().zip(field_types.into_iter()) { - let cql_type = field_type.into_cql_type(&keyspace_name_clone, keyspace_entry); - fields.push((field_name, cql_type)); + match field_type.into_cql_type(&keyspace_name_clone, keyspace_udts) { + Ok(cql_type) => fields.push((field_name, cql_type)), + Err(e) => { + *keyspace_udts_result = Err(e); + continue 'udts_loop; + } + } } let udt = Arc::new(UserDefinedType { @@ -1154,7 +1205,7 @@ async fn query_user_defined_types( field_types: fields, }); - keyspace_entry.insert(type_name, udt); + keyspace_udts.insert(type_name, udt); } Ok(udts) @@ -1423,8 +1474,8 @@ mod toposort_tests { async fn query_tables( conn: &Arc, keyspaces_to_fetch: &[String], - tables: &mut PerKsTable
, -) -> Result>, QueryError> { + tables: &mut PerKsTableResult, +) -> Result, MissingUserDefinedType>, QueryError> { let rows = query_filter_keyspace_name::<(String, String)>( conn, "SELECT keyspace_name, table_name FROM system_schema.tables", @@ -1436,17 +1487,23 @@ async fn query_tables( rows.map(|row_result| { let keyspace_and_table_name = row_result?; - let table = tables.remove(&keyspace_and_table_name).unwrap_or(Table { + let table = tables.remove(&keyspace_and_table_name).unwrap_or(Ok(Table { columns: HashMap::new(), partition_key: vec![], clustering_key: vec![], partitioner: None, - }); + })); - result + let mut entry = result .entry(keyspace_and_table_name.0) - .or_insert_with(HashMap::new) - .insert(keyspace_and_table_name.1, table); + .or_insert_with(|| Ok(HashMap::new())); + match (&mut entry, table) { + (Ok(tables), Ok(table)) => { + let _ = tables.insert(keyspace_and_table_name.1, table); + } + (Err(_), _) => (), + (Ok(_), Err(e)) => *entry = Err(e), + }; Ok::<_, QueryError>(()) }) @@ -1459,8 +1516,8 @@ async fn query_tables( async fn query_views( conn: &Arc, keyspaces_to_fetch: &[String], - tables: &mut PerKsTable
, -) -> Result>, QueryError> { + tables: &mut PerKsTableResult, +) -> Result, MissingUserDefinedType>, QueryError> { let rows = query_filter_keyspace_name::<(String, String, String)>( conn, "SELECT keyspace_name, view_name, base_table_name FROM system_schema.views", @@ -1475,21 +1532,30 @@ async fn query_views( let keyspace_and_view_name = (keyspace_name, view_name); - let table = tables.remove(&keyspace_and_view_name).unwrap_or(Table { - columns: HashMap::new(), - partition_key: vec![], - clustering_key: vec![], - partitioner: None, - }); - let materialized_view = MaterializedView { - view_metadata: table, - base_table_name, - }; - - result + let materialized_view = tables + .remove(&keyspace_and_view_name) + .unwrap_or(Ok(Table { + columns: HashMap::new(), + partition_key: vec![], + clustering_key: vec![], + partitioner: None, + })) + .map(|table| MaterializedView { + view_metadata: table, + base_table_name, + }); + + let mut entry = result .entry(keyspace_and_view_name.0) - .or_insert_with(HashMap::new) - .insert(keyspace_and_view_name.1, materialized_view); + .or_insert_with(|| Ok(HashMap::new())); + + match (&mut entry, materialized_view) { + (Ok(views), Ok(view)) => { + let _ = views.insert(keyspace_and_view_name.1, view); + } + (Err(_), _) => (), + (Ok(_), Err(e)) => *entry = Err(e), + }; Ok::<_, QueryError>(()) }) @@ -1502,8 +1568,8 @@ async fn query_views( async fn query_tables_schema( conn: &Arc, keyspaces_to_fetch: &[String], - udts: &PerKeyspace>>, -) -> Result, QueryError> { + udts: &PerKeyspaceResult>, MissingUserDefinedType>, +) -> Result, QueryError> { // Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of // type EmptyType for dense tables. This resolves into this CQL type name. // This column shouldn't be exposed to the user but is currently exposed in system tables. @@ -1517,9 +1583,9 @@ async fn query_tables_schema( } ); - let empty_map = HashMap::new(); + let empty_ok_map = Ok(HashMap::new()); - let mut tables_schema = HashMap::new(); + let mut tables_schema: HashMap<_, Result<_, MissingUserDefinedType>> = HashMap::new(); rows.map(|row_result| { let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?; @@ -1528,9 +1594,43 @@ async fn query_tables_schema( return Ok::<_, QueryError>(()); } - let keyspace_udts = udts.get(&keyspace_name).unwrap_or(&empty_map); + let keyspace_udts: &PerTable> = + match udts.get(&keyspace_name).unwrap_or(&empty_ok_map) { + Ok(udts) => udts, + Err(e) => { + // There are two things we could do here + // 1. Not inserting, just returning. In that case the keyspaces containing + // tables that have a column with a broken UDT will not be present in + // the output of this function at all. + // 2. Inserting an error (which requires cloning it). In that case, + // keyspace containing a table with broken UDT will have the error + // cloned from this UDT. + // + // Solution number 1 seems weird because it can be seen as silencing + // the error: we have data for a keyspace, but we just don't put + // it in the result at all. + // Solution 2 is also not perfect because it: + // - Returns error for the keyspace even if the broken UDT is not used in any table. + // - Doesn't really distinguish between a table using a broken UDT and + // a keyspace just containing some broken UDTs. + // + // I chose solution 2. Its first problem is not really important because + // the caller will error out the entire keyspace anyway. The second problem + // is minor enough to ignore. Note that the first issue also applies to + // solution 1: but the keyspace won't be present in the result at all, + // which is arguably worse. + tables_schema.insert((keyspace_name, table_name), Err(e.clone())); + return Ok::<_, QueryError>(()); + } + }; let pre_cql_type = map_string_to_cql_type(&type_)?; - let cql_type = pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts); + let cql_type = match pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts) { + Ok(t) => t, + Err(e) => { + tables_schema.insert((keyspace_name, table_name), Err(e)); + return Ok::<_, QueryError>(()); + } + }; let kind = ColumnKind::from_str(&kind).map_err(|_| { MetadataError::Tables(TablesMetadataError::UnknownColumnKind { @@ -1541,11 +1641,17 @@ async fn query_tables_schema( }) })?; - let entry = tables_schema.entry((keyspace_name, table_name)).or_insert(( - HashMap::new(), // columns - HashMap::new(), // partition key - HashMap::new(), // clustering key - )); + let Ok(entry) = tables_schema + .entry((keyspace_name, table_name)) + .or_insert(Ok(( + HashMap::new(), // columns + HashMap::new(), // partition key + HashMap::new(), // clustering key + ))) + else { + // This table was previously marked as broken, no way to insert anything. + return Ok::<_, QueryError>(()); + }; if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering { let key_map = if kind == ColumnKind::PartitionKey { @@ -1572,9 +1678,16 @@ async fn query_tables_schema( let mut all_partitioners = query_table_partitioners(conn).await?; let mut result = HashMap::new(); - for ((keyspace_name, table_name), (columns, partition_key_columns, clustering_key_columns)) in - tables_schema - { + for ((keyspace_name, table_name), table_result) in tables_schema { + let keyspace_and_table_name = (keyspace_name, table_name); + + let (columns, partition_key_columns, clustering_key_columns) = match table_result { + Ok(table) => table, + Err(e) => { + let _ = result.insert(keyspace_and_table_name, Err(e)); + continue; + } + }; let mut partition_key = vec!["".to_string(); partition_key_columns.len()]; for (position, column_name) in partition_key_columns { partition_key[position as usize] = column_name; @@ -1585,20 +1698,18 @@ async fn query_tables_schema( clustering_key[position as usize] = column_name; } - let keyspace_and_table_name = (keyspace_name, table_name); - let partitioner = all_partitioners .remove(&keyspace_and_table_name) .unwrap_or_default(); result.insert( keyspace_and_table_name, - Table { + Ok(Table { columns, partition_key, clustering_key, partitioner, - }, + }), ); } From 82561f1ce3d10467bbfaf94bc71b95d0d8b58125 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sun, 29 Dec 2024 23:04:26 +0100 Subject: [PATCH 5/6] Topology: unpub MissingUserDefinedType It is no longer part of any public API. --- scylla/src/cluster/metadata.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index 3ca36d1f1..9c9f4f8b3 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -297,9 +297,9 @@ pub struct UserDefinedType { /// Represents a user defined type whose definition is missing from the metadata. #[derive(Clone, Debug, Error)] #[error("Missing UDT: {keyspace}, {name}")] -pub struct MissingUserDefinedType { - pub name: String, - pub keyspace: String, +struct MissingUserDefinedType { + name: String, + keyspace: String, } #[derive(Clone, Debug, PartialEq, Eq)] From 800aabce2d2fd69e9fd7ab446216b559350b6e77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sun, 29 Dec 2024 23:46:48 +0100 Subject: [PATCH 6/6] Move handling of MissingUserDefinedType to ClusterData::new This commit changes type of `keyspaces` field in `Metadata` from `HashMap` to `HashMap>`. Because of that, it also removed `MissingUserDefinedType` handling from `query_metadata`. Now handling this error is done in `ClusterData::new`. This has an advantage: we can use older version of the keyspace metadata if the new version has this error. --- scylla/src/cluster/metadata.rs | 15 +----- scylla/src/cluster/state.rs | 26 ++++++++-- scylla/src/cluster/worker.rs | 2 + scylla/src/policies/load_balancing/default.rs | 3 ++ scylla/src/routing/locator/mod.rs | 48 ++++++++++++++++--- .../routing/locator/precomputed_replicas.rs | 8 ++-- scylla/src/routing/locator/test.rs | 17 ++++--- 7 files changed, 86 insertions(+), 33 deletions(-) diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index 9c9f4f8b3..00359a340 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -87,7 +87,7 @@ pub(crate) struct MetadataReader { /// Describes all metadata retrieved from the cluster pub(crate) struct Metadata { pub(crate) peers: Vec, - pub(crate) keyspaces: HashMap, + pub(crate) keyspaces: HashMap>, } #[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way @@ -297,7 +297,7 @@ pub struct UserDefinedType { /// Represents a user defined type whose definition is missing from the metadata. #[derive(Clone, Debug, Error)] #[error("Missing UDT: {keyspace}, {name}")] -struct MissingUserDefinedType { +pub(crate) struct MissingUserDefinedType { name: String, keyspace: String, } @@ -800,17 +800,6 @@ async fn query_metadata( return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists).into()); } - let keyspaces = keyspaces - .into_iter() - .filter_map(|(ks_name, ks)| match ks { - Ok(ks) => Some((ks_name, ks)), - Err(e) => { - warn!("Error while processing keyspace \"{ks_name}\": {e}"); - None - } - }) - .collect(); - Ok(Metadata { peers, keyspaces }) } diff --git a/scylla/src/cluster/state.rs b/scylla/src/cluster/state.rs index 9a7d89155..973d585b8 100644 --- a/scylla/src/cluster/state.rs +++ b/scylla/src/cluster/state.rs @@ -12,7 +12,7 @@ use scylla_cql::frame::response::result::TableSpec; use scylla_cql::types::serialize::row::SerializedValues; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use tracing::debug; +use tracing::{debug, warn}; use uuid::Uuid; use super::metadata::{Keyspace, Metadata, Strategy}; @@ -64,6 +64,7 @@ impl ClusterState { used_keyspace: &Option, host_filter: Option<&dyn HostFilter>, mut tablets: TabletsInfo, + old_keyspaces: &HashMap, ) -> Self { // Create new updated known_peers and ring let mut new_known_peers: HashMap> = @@ -109,6 +110,26 @@ impl ClusterState { } } + let keyspaces: HashMap = metadata + .keyspaces + .into_iter() + .filter_map(|(ks_name, ks)| match ks { + Ok(ks) => Some((ks_name, ks)), + Err(e) => { + if let Some(old_ks) = old_keyspaces.get(&ks_name) { + warn!("Encountered an error while processing metadata of keyspace \"{ks_name}\": {e}.\ + Re-using older version of this keyspace metadata"); + Some((ks_name, old_ks.clone())) + } else { + warn!("Encountered an error while processing metadata of keyspace \"{ks_name}\": {e}.\ + No previous version of this keyspace metadata found, so it will not be\ + present in ClusterData until next refresh."); + None + } + } + }) + .collect(); + { let removed_nodes = { let mut removed_nodes = HashSet::new(); @@ -122,7 +143,7 @@ impl ClusterState { }; let table_predicate = |spec: &TableSpec| { - if let Some(ks) = metadata.keyspaces.get(spec.ks_name()) { + if let Some(ks) = keyspaces.get(spec.ks_name()) { ks.tables.contains_key(spec.table_name()) } else { false @@ -150,7 +171,6 @@ impl ClusterState { ) } - let keyspaces = metadata.keyspaces; let (locator, keyspaces) = tokio::task::spawn_blocking(move || { let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy); let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies, tablets); diff --git a/scylla/src/cluster/worker.rs b/scylla/src/cluster/worker.rs index 11ca6a8ca..3dab166fc 100644 --- a/scylla/src/cluster/worker.rs +++ b/scylla/src/cluster/worker.rs @@ -140,6 +140,7 @@ impl Cluster { &None, host_filter.as_deref(), TabletsInfo::new(), + &HashMap::new(), ) .await; cluster_data.wait_until_all_pools_are_initialized().await; @@ -413,6 +414,7 @@ impl ClusterWorker { &self.used_keyspace, self.host_filter.as_deref(), cluster_data.locator.tablets.clone(), + &cluster_data.keyspaces, ) .await, ); diff --git a/scylla/src/policies/load_balancing/default.rs b/scylla/src/policies/load_balancing/default.rs index e15fa943e..13a2540eb 100644 --- a/scylla/src/policies/load_balancing/default.rs +++ b/scylla/src/policies/load_balancing/default.rs @@ -1419,6 +1419,7 @@ mod tests { &None, None, TabletsInfo::new(), + &HashMap::new(), ) .await } @@ -1449,6 +1450,7 @@ mod tests { &None, None, TabletsInfo::new(), + &HashMap::new(), ) .await } @@ -2498,6 +2500,7 @@ mod tests { Some(&FHostFilter) }, TabletsInfo::new(), + &HashMap::new(), ) .await; diff --git a/scylla/src/routing/locator/mod.rs b/scylla/src/routing/locator/mod.rs index 0ee5dfb8e..f2d0dd47b 100644 --- a/scylla/src/routing/locator/mod.rs +++ b/scylla/src/routing/locator/mod.rs @@ -860,21 +860,39 @@ mod tests { check( 160, None, - &metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy, + &metadata + .keyspaces + .get(KEYSPACE_NTS_RF_3) + .unwrap() + .as_ref() + .unwrap() + .strategy, TABLE_NTS_RF_3, vec![F, A, C, D, G, E], ); check( 160, None, - &metadata.keyspaces.get(KEYSPACE_NTS_RF_2).unwrap().strategy, + &metadata + .keyspaces + .get(KEYSPACE_NTS_RF_2) + .unwrap() + .as_ref() + .unwrap() + .strategy, TABLE_NTS_RF_2, vec![F, A, D, G], ); check( 160, None, - &metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy, + &metadata + .keyspaces + .get(KEYSPACE_SS_RF_2) + .unwrap() + .as_ref() + .unwrap() + .strategy, TABLE_SS_RF_2, vec![F, A], ); @@ -882,21 +900,39 @@ mod tests { check( 160, Some("eu"), - &metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy, + &metadata + .keyspaces + .get(KEYSPACE_NTS_RF_3) + .unwrap() + .as_ref() + .unwrap() + .strategy, TABLE_NTS_RF_3, vec![A, C, G], ); check( 160, Some("us"), - &metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy, + &metadata + .keyspaces + .get(KEYSPACE_NTS_RF_3) + .unwrap() + .as_ref() + .unwrap() + .strategy, TABLE_NTS_RF_3, vec![F, D, E], ); check( 160, Some("eu"), - &metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy, + &metadata + .keyspaces + .get(KEYSPACE_SS_RF_2) + .unwrap() + .as_ref() + .unwrap() + .strategy, TABLE_SS_RF_2, vec![A], ); diff --git a/scylla/src/routing/locator/precomputed_replicas.rs b/scylla/src/routing/locator/precomputed_replicas.rs index 4121b410e..f37282c20 100644 --- a/scylla/src/routing/locator/precomputed_replicas.rs +++ b/scylla/src/routing/locator/precomputed_replicas.rs @@ -231,14 +231,14 @@ mod tests { let mut metadata = mock_metadata_for_token_aware_tests(); metadata.keyspaces = [( "SimpleStrategy{rf=2}".into(), - Keyspace { + Ok(Keyspace { strategy: Strategy::SimpleStrategy { replication_factor: 2, }, tables: HashMap::new(), views: HashMap::new(), user_defined_types: HashMap::new(), - }, + }), )] .iter() .cloned() @@ -251,7 +251,7 @@ mod tests { metadata .keyspaces .values() - .map(|keyspace| &keyspace.strategy), + .map(|keyspace| &keyspace.as_ref().unwrap().strategy), ); let check = |token, replication_factor, expected_node_ids| { @@ -293,7 +293,7 @@ mod tests { metadata .keyspaces .values() - .map(|keyspace| &keyspace.strategy), + .map(|keyspace| &keyspace.as_ref().unwrap().strategy), ); let check = |token, dc, replication_factor, expected_node_ids| { diff --git a/scylla/src/routing/locator/test.rs b/scylla/src/routing/locator/test.rs index 50084e6c5..e0205c101 100644 --- a/scylla/src/routing/locator/test.rs +++ b/scylla/src/routing/locator/test.rs @@ -118,18 +118,18 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { let keyspaces = [ ( KEYSPACE_SS_RF_2.into(), - Keyspace { + Ok(Keyspace { strategy: Strategy::SimpleStrategy { replication_factor: 2, }, tables: HashMap::new(), views: HashMap::new(), user_defined_types: HashMap::new(), - }, + }), ), ( KEYSPACE_NTS_RF_2.into(), - Keyspace { + Ok(Keyspace { strategy: Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 2)] .into_iter() @@ -138,11 +138,11 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { tables: HashMap::new(), views: HashMap::new(), user_defined_types: HashMap::new(), - }, + }), ), ( KEYSPACE_NTS_RF_3.into(), - Keyspace { + Ok(Keyspace { strategy: Strategy::NetworkTopologyStrategy { datacenter_repfactors: [("eu".to_owned(), 3), ("us".to_owned(), 3)] .into_iter() @@ -151,7 +151,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata { tables: HashMap::new(), views: HashMap::new(), user_defined_types: HashMap::new(), - }, + }), ), ] .iter() @@ -199,7 +199,10 @@ pub(crate) fn create_ring(metadata: &Metadata) -> impl Iterator ReplicaLocator { let ring = create_ring(metadata); - let strategies = metadata.keyspaces.values().map(|ks| &ks.strategy); + let strategies = metadata + .keyspaces + .values() + .map(|ks| &ks.as_ref().unwrap().strategy); ReplicaLocator::new(ring, strategies, TabletsInfo::new()) }