diff --git a/scylla/src/statement/prepared_statement.rs b/scylla/src/statement/prepared_statement.rs index c83043d18..54c752179 100644 --- a/scylla/src/statement/prepared_statement.rs +++ b/scylla/src/statement/prepared_statement.rs @@ -18,7 +18,7 @@ use crate::frame::types::{Consistency, SerialConsistency}; use crate::history::HistoryListener; use crate::retry_policy::RetryPolicy; use crate::routing::Token; -use crate::transport::errors::{BadQuery, QueryError}; +use crate::transport::errors::{BadQuery, ProtocolError, QueryError}; use crate::transport::execution_profile::ExecutionProfileHandle; use crate::transport::partitioner::{Partitioner, PartitionerHasher, PartitionerName}; @@ -241,7 +241,7 @@ impl PreparedStatement { self.extract_partition_key(serialized_values) .map_err(|err| match err { PartitionKeyExtractionError::NoPkIndexValue(_, _) => { - QueryError::ProtocolError("No pk indexes - can't calculate token") + ProtocolError::PartitionKeyExtraction } })?; let token = partition_key diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 44c59878f..f2fd0eb22 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -46,9 +46,9 @@ use std::{ net::{Ipv4Addr, Ipv6Addr}, }; +use super::errors::{ProtocolError, UseKeyspaceProtocolError}; use super::iterator::RowIterator; use super::locator::tablets::{RawTablet, TabletParsingError}; -use super::query_result::SingleRowTypedError; use super::session::AddressTranslator; use super::topology::{PeerEndpoint, UntranslatedEndpoint, UntranslatedPeer}; use super::NodeAddr; @@ -299,10 +299,11 @@ impl NonErrorQueryResponse { let (result, paging_state) = self.into_query_result_and_paging_state()?; if !paging_state.finished() { - let error_msg = "Internal driver API misuse or a server bug: nonfinished paging state\ - would be discarded by `NonErrorQueryResponse::into_query_result`"; - error!(error_msg); - return Err(QueryError::ProtocolError(error_msg)); + error!( + "Internal driver API misuse or a server bug: nonfinished paging state\ + would be discarded by `NonErrorQueryResponse::into_query_result`" + ); + return Err(ProtocolError::NonfinishedPagingState.into()); } Ok(result) @@ -904,7 +905,11 @@ impl Connection { // Reprepared statement should keep its id - it's the md5 sum // of statement contents if reprepared.get_id() != previous_prepared.get_id() { - Err(UserRequestError::RepreparedIdChanged) + Err(UserRequestError::RepreparedIdChanged { + statement: reprepare_query.contents, + expected_id: previous_prepared.get_id().clone().into(), + reprepared_id: reprepared.get_id().clone().into(), + }) } else { Ok(()) } @@ -1284,17 +1289,16 @@ impl Connection { self.reprepare(p.get_statement(), p).await?; continue; } else { - return Err(QueryError::ProtocolError( - "The server returned a prepared statement Id that did not exist in the batch", - )); + return Err(ProtocolError::RepreparedIdMissingInBatch.into()); } } _ => Err(err.into()), }, Response::Result(_) => Ok(query_response.into_query_result()?), - _ => Err(QueryError::ProtocolError( - "BATCH: Unexpected server response", - )), + _ => Err(ProtocolError::UnexpectedResponse( + query_response.response.to_response_kind(), + ) + .into()), }; } } @@ -1359,23 +1363,40 @@ impl Connection { }; let query_response = self.query_raw_unpaged(&query, PagingState::start()).await?; + Self::verify_use_keyspace_result(keyspace_name, query_response) + } + fn verify_use_keyspace_result( + keyspace_name: &VerifiedKeyspaceName, + query_response: QueryResponse, + ) -> Result<(), QueryError> { match query_response.response { Response::Result(result::Result::SetKeyspace(set_keyspace)) => { - if set_keyspace.keyspace_name.to_lowercase() - != keyspace_name.as_str().to_lowercase() + if !set_keyspace + .keyspace_name + .eq_ignore_ascii_case(keyspace_name.as_str()) { - return Err(QueryError::ProtocolError( - "USE returned response with different keyspace name", - )); + let expected_keyspace_name_lowercase = keyspace_name.as_str().to_lowercase(); + let result_keyspace_name_lowercase = set_keyspace.keyspace_name.to_lowercase(); + + return Err(ProtocolError::UseKeyspace( + UseKeyspaceProtocolError::KeyspaceNameMismatch { + expected_keyspace_name_lowercase, + result_keyspace_name_lowercase, + }, + ) + .into()); } Ok(()) } Response::Error(err) => Err(err.into()), - _ => Err(QueryError::ProtocolError( - "USE returned unexpected response", - )), + _ => Err( + ProtocolError::UseKeyspace(UseKeyspaceProtocolError::UnexpectedResponse( + query_response.response.to_response_kind(), + )) + .into(), + ), } } @@ -1425,17 +1446,7 @@ impl Connection { .query_unpaged(LOCAL_VERSION) .await? .single_row_typed() - .map_err(|err| match err { - SingleRowTypedError::RowsExpected(_) => { - QueryError::ProtocolError("Version query returned not rows") - } - SingleRowTypedError::BadNumberOfRows(_) => { - QueryError::ProtocolError("system.local query returned a wrong number of rows") - } - SingleRowTypedError::FromRowError(_) => { - QueryError::ProtocolError("Row is not uuid type as it should be") - } - })?; + .map_err(ProtocolError::SchemaVersionFetch)?; Ok(version_id) } diff --git a/scylla/src/transport/downgrading_consistency_retry_policy.rs b/scylla/src/transport/downgrading_consistency_retry_policy.rs index abe55caed..51fcdfd3e 100644 --- a/scylla/src/transport/downgrading_consistency_retry_policy.rs +++ b/scylla/src/transport/downgrading_consistency_retry_policy.rs @@ -185,7 +185,9 @@ mod tests { use bytes::Bytes; use crate::test_utils::setup_tracing; - use crate::transport::errors::{BadQuery, BrokenConnectionErrorKind, ConnectionPoolError}; + use crate::transport::errors::{ + BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, ProtocolError, + }; use super::*; @@ -284,7 +286,7 @@ mod tests { cl, ); downgrading_consistency_policy_assert_never_retries( - QueryError::ProtocolError("test"), + ProtocolError::NonfinishedPagingState.into(), cl, ); } diff --git a/scylla/src/transport/errors.rs b/scylla/src/transport/errors.rs index d00e68de2..f6b31b6c9 100644 --- a/scylla/src/transport/errors.rs +++ b/scylla/src/transport/errors.rs @@ -8,10 +8,12 @@ use std::{ error::Error, io::ErrorKind, net::{AddrParseError, IpAddr, SocketAddr}, + num::ParseIntError, sync::Arc, }; use scylla_cql::{ + cql_to_rust::FromRowError, frame::{ frame_errors::{ CqlAuthChallengeParseError, CqlAuthSuccessParseError, CqlAuthenticateParseError, @@ -30,6 +32,8 @@ use thiserror::Error; use crate::{authentication::AuthError, frame::response}; +use super::query_result::{RowsExpectedError, SingleRowTypedError}; + /// Error that occurred during query execution #[derive(Error, Debug, Clone)] #[non_exhaustive] @@ -50,6 +54,15 @@ pub enum QueryError { #[error(transparent)] BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError), + /// Load balancing policy returned an empty plan. + #[error( + "Load balancing policy returned an empty plan.\ + First thing to investigate should be the logic of custom LBP implementation.\ + If you think that your LBP implementation is correct, or you make use of `DefaultPolicy`,\ + then this is most probably a driver bug!" + )] + EmptyPlan, + /// Received a RESULT server response, but failed to deserialize it. #[error(transparent)] CqlResultParseError(#[from] CqlResultParseError), @@ -58,17 +71,17 @@ pub enum QueryError { #[error("Failed to deserialize ERROR response: {0}")] CqlErrorParseError(#[from] CqlErrorParseError), + /// A metadata error occurred during schema agreement. + #[error("Cluster metadata fetch error occurred during automatic schema agreement: {0}")] + MetadataError(#[from] MetadataError), + /// Selected node's connection pool is in invalid state. #[error("No connections in the pool: {0}")] ConnectionPoolError(#[from] ConnectionPoolError), - /// Unexpected message received - #[error("Protocol Error: {0}")] - ProtocolError(&'static str), - - /// Invalid message received - #[error("Invalid message: {0}")] - InvalidMessage(String), + /// Protocol error. + #[error("Protocol error: {0}")] + ProtocolError(#[from] ProtocolError), /// Timeout error has occurred, function didn't complete in time. #[error("Timeout Error")] @@ -113,15 +126,21 @@ impl From for QueryError { UserRequestError::CqlResultParseError(e) => e.into(), UserRequestError::CqlErrorParseError(e) => e.into(), UserRequestError::BrokenConnectionError(e) => e.into(), - UserRequestError::UnexpectedResponse(_) => { - // FIXME: make it typed. It needs to wait for ProtocolError refactor. - QueryError::ProtocolError("Received unexpected response from the server. Expected RESULT or ERROR response.") + UserRequestError::UnexpectedResponse(response) => { + ProtocolError::UnexpectedResponse(response).into() } UserRequestError::BodyExtensionsParseError(e) => e.into(), UserRequestError::UnableToAllocStreamId => QueryError::UnableToAllocStreamId, - UserRequestError::RepreparedIdChanged => QueryError::ProtocolError( - "Prepared statement Id changed, md5 sum should stay the same", - ), + UserRequestError::RepreparedIdChanged { + statement, + expected_id, + reprepared_id, + } => ProtocolError::RepreparedIdChanged { + statement, + expected_id, + reprepared_id, + } + .into(), } } } @@ -135,9 +154,10 @@ impl From for NewSessionError { QueryError::CqlResultParseError(e) => NewSessionError::CqlResultParseError(e), QueryError::CqlErrorParseError(e) => NewSessionError::CqlErrorParseError(e), QueryError::BodyExtensionsParseError(e) => NewSessionError::BodyExtensionsParseError(e), + QueryError::EmptyPlan => NewSessionError::EmptyPlan, + QueryError::MetadataError(e) => NewSessionError::MetadataError(e), QueryError::ConnectionPoolError(e) => NewSessionError::ConnectionPoolError(e), - QueryError::ProtocolError(m) => NewSessionError::ProtocolError(m), - QueryError::InvalidMessage(m) => NewSessionError::InvalidMessage(m), + QueryError::ProtocolError(e) => NewSessionError::ProtocolError(e), QueryError::TimeoutError => NewSessionError::TimeoutError, QueryError::BrokenConnection(e) => NewSessionError::BrokenConnection(e), QueryError::UnableToAllocStreamId => NewSessionError::UnableToAllocStreamId, @@ -183,10 +203,23 @@ pub enum NewSessionError { #[error("Failed to serialize CQL request: {0}")] CqlRequestSerialization(#[from] CqlRequestSerializationError), + /// Load balancing policy returned an empty plan. + #[error( + "Load balancing policy returned an empty plan.\ + First thing to investigate should be the logic of custom LBP implementation.\ + If you think that your LBP implementation is correct, or you make use of `DefaultPolicy`,\ + then this is most probably a driver bug!" + )] + EmptyPlan, + /// Failed to deserialize frame body extensions. #[error(transparent)] BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError), + /// Failed to perform initial cluster metadata fetch. + #[error("Failed to perform initial cluster metadata fetch: {0}")] + MetadataError(#[from] MetadataError), + /// Received a RESULT server response, but failed to deserialize it. #[error(transparent)] CqlResultParseError(#[from] CqlResultParseError), @@ -199,13 +232,9 @@ pub enum NewSessionError { #[error("No connections in the pool: {0}")] ConnectionPoolError(#[from] ConnectionPoolError), - /// Unexpected message received - #[error("Protocol Error: {0}")] - ProtocolError(&'static str), - - /// Invalid message received - #[error("Invalid message: {0}")] - InvalidMessage(String), + /// Protocol error. + #[error("Protocol error: {0}")] + ProtocolError(#[from] ProtocolError), /// Timeout error has occurred, couldn't connect to node in time. #[error("Timeout Error")] @@ -225,6 +254,242 @@ pub enum NewSessionError { RequestTimeout(String), } +/// A protocol error. +/// +/// It indicates an inconsistency between CQL protocol +/// and server's behavior. +/// In some cases, it could also represent a misuse +/// of internal driver API - a driver bug. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum ProtocolError { + /// Received an unexpected response when RESULT or ERROR was expected. + #[error( + "Received unexpected response from the server: {0}. Expected RESULT or ERROR response." + )] + UnexpectedResponse(CqlResponseKind), + + /// Prepared statement id mismatch. + #[error( + "Prepared statement id mismatch between multiple connections - all result ids should be equal." + )] + PreparedStatementIdsMismatch, + + /// Prepared statement id changed after repreparation. + #[error( + "Prepared statement id changed after repreparation; md5 sum (computed from the query string) should stay the same;\ + Statement: \"{statement}\"; expected id: {expected_id:?}; reprepared id: {reprepared_id:?}" + )] + RepreparedIdChanged { + statement: String, + expected_id: Vec, + reprepared_id: Vec, + }, + + /// USE KEYSPACE protocol error. + #[error("USE KEYSPACE protocol error: {0}")] + UseKeyspace(#[from] UseKeyspaceProtocolError), + + /// A protocol error appeared during schema version fetch. + #[error("Schema version fetch protocol error: {0}")] + SchemaVersionFetch(SingleRowTypedError), + + /// A result with nonfinished paging state received for unpaged query. + #[error("Unpaged query returned a non-empty paging state! This is a driver-side or server-side bug.")] + NonfinishedPagingState, + + /// Failed to parse CQL type. + #[error("Failed to parse a CQL type '{typ}', at position {position}: {reason}")] + InvalidCqlType { + typ: String, + position: usize, + reason: String, + }, + + /// Unable extract a partition key based on prepared statement's metadata. + #[error("Unable extract a partition key based on prepared statement's metadata")] + PartitionKeyExtraction, + + /// A protocol error occurred during tracing info fetch. + #[error("Tracing info fetch protocol error: {0}")] + Tracing(#[from] TracingProtocolError), + + /// Driver tried to reprepare a statement in the batch, but the reprepared + /// statement's id is not included in the batch. + #[error("Reprepared statement's id does not exist in the batch.")] + RepreparedIdMissingInBatch, +} + +/// A protocol error that occurred during `USE KEYSPACE <>` request. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum UseKeyspaceProtocolError { + #[error("Keyspace name mismtach; expected: {expected_keyspace_name_lowercase}, received: {result_keyspace_name_lowercase}")] + KeyspaceNameMismatch { + expected_keyspace_name_lowercase: String, + result_keyspace_name_lowercase: String, + }, + #[error("Received unexpected response: {0}. Expected RESULT:Set_keyspace")] + UnexpectedResponse(CqlResponseKind), +} + +/// A protocol error that occurred during tracing info fetch. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum TracingProtocolError { + /// Response to system_traces.session is not RESULT:Rows. + #[error("Response to system_traces.session is not RESULT:Rows: {0}")] + TracesSessionNotRows(RowsExpectedError), + + /// system_traces.session has invalid column type. + #[error("system_traces.session has invalid column type: {0}")] + TracesSessionInvalidColumnType(FromRowError), + + /// Response to system_traces.events is not RESULT:Rows. + #[error("Response to system_traces.events is not RESULT:Rows: {0}")] + TracesEventsNotRows(RowsExpectedError), + + /// system_traces.events has invalid column type. + #[error("system_traces.events has invalid column type: {0}")] + TracesEventsInvalidColumnType(FromRowError), + + /// All tracing queries returned an empty result. + #[error( + "All tracing queries returned an empty result, \ + maybe the trace information didn't propagate yet. \ + Consider configuring Session with \ + a longer fetch interval (tracing_info_fetch_interval)" + )] + EmptyResults, +} + +/// An error that occurred during cluster metadata fetch. +/// +/// An error can occur during metadata fetch of: +/// - peers +/// - keyspaces +/// - UDTs +/// - tables +/// - views +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum MetadataError { + /// Bad peers metadata. + #[error("Bad peers metadata: {0}")] + Peers(#[from] PeersMetadataError), + + /// Bad keyspaces metadata. + #[error("Bad keyspaces metadata: {0}")] + Keyspaces(#[from] KeyspacesMetadataError), + + /// Bad UDTs metadata. + #[error("Bad UDTs metadata: {0}")] + Udts(#[from] UdtMetadataError), + + /// Bad tables metadata. + #[error("Bad tables metadata: {0}")] + Tables(#[from] TablesMetadataError), + + /// Bad views metadata. + #[error("Bad views metadata: {0}")] + Views(#[from] ViewsMetadataError), +} + +/// An error that occurred during peers metadata fetch. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum PeersMetadataError { + /// Empty peers list returned during peers metadata fetch. + #[error("Peers list is empty")] + EmptyPeers, + + /// All peers have empty token lists. + #[error("All peers have empty token lists")] + EmptyTokenLists, +} + +/// An error that occurred during keyspaces metadata fetch. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum KeyspacesMetadataError { + /// system_schema.keyspaces has invalid column type. + #[error("system_schema.keyspaces has invalid column type: {0}")] + SchemaKeyspacesInvalidColumnType(FromRowError), + + /// Bad keyspace replication strategy. + #[error("Bad keyspace <{keyspace}> replication strategy: {error}")] + Strategy { + keyspace: String, + error: KeyspaceStrategyError, + }, +} + +/// An error that occurred during specific keyspace's metadata fetch. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum KeyspaceStrategyError { + /// Keyspace strategy map missing a `class` field. + #[error("keyspace strategy definition is missing a 'class' field")] + MissingClassForStrategyDefinition, + + /// Missing replication factor for SimpleStrategy. + #[error("Missing replication factor field for SimpleStrategy")] + MissingReplicationFactorForSimpleStrategy, + + /// Replication factor could not be parsed as unsigned integer. + #[error("Failed to parse a replication factor as unsigned integer: {0}")] + ReplicationFactorParseError(ParseIntError), + + /// Received an unexpected NTS option. + /// Driver expects only 'class' and replication factor per dc ('dc': rf) + #[error("Unexpected NetworkTopologyStrategy option: '{key}': '{value}'")] + UnexpectedNetworkTopologyStrategyOption { key: String, value: String }, +} + +/// An error that occurred during UDTs metadata fetch. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum UdtMetadataError { + /// system_schema.types has invalid column type. + #[error("system_schema.types has invalid column type: {0}")] + SchemaTypesInvalidColumnType(FromRowError), + + /// Circular UDT dependency detected. + #[error("Detected circular dependency between user defined types - toposort is impossible!")] + CircularTypeDependency, +} + +/// An error that occurred during tables metadata fetch. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum TablesMetadataError { + /// system_schema.tables has invalid column type. + #[error("system_schema.tables has invalid column type: {0}")] + SchemaTablesInvalidColumnType(FromRowError), + + /// system_schema.columns has invalid column type. + #[error("system_schema.columns has invalid column type: {0}")] + SchemaColumnsInvalidColumnType(FromRowError), + + /// Unknown column kind. + #[error("Unknown column kind '{column_kind}' for {keyspace_name}.{table_name}.{column_name}")] + UnknownColumnKind { + keyspace_name: String, + table_name: String, + column_name: String, + column_kind: String, + }, +} + +/// An error that occurred during views metadata fetch. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum ViewsMetadataError { + /// system_schema.views has invalid column type. + #[error("system_schema.views has invalid column type: {0}")] + SchemaViewsInvalidColumnType(FromRowError), +} + /// Error caused by caller creating an invalid query #[derive(Error, Debug, Clone)] #[error("Invalid query passed to Session")] @@ -574,8 +839,15 @@ pub(crate) enum UserRequestError { BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError), #[error("Unable to allocate stream id")] UnableToAllocStreamId, - #[error("Prepared statement Id changed, md5 sum should stay the same")] - RepreparedIdChanged, + #[error( + "Prepared statement id changed after repreparation; md5 sum (computed from the query string) should stay the same;\ + Statement: \"{statement}\"; expected id: {expected_id:?}; reprepared id: {reprepared_id:?}" + )] + RepreparedIdChanged { + statement: String, + expected_id: Vec, + reprepared_id: Vec, + }, } impl From for UserRequestError { diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index a4404e47e..82c66cb3d 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -28,7 +28,7 @@ use crate::statement::{prepared_statement::PreparedStatement, query::Query}; use crate::statement::{Consistency, PagingState, SerialConsistency}; use crate::transport::cluster::ClusterData; use crate::transport::connection::{Connection, NonErrorQueryResponse, QueryResponse}; -use crate::transport::errors::{QueryError, UserRequestError}; +use crate::transport::errors::{ProtocolError, QueryError, UserRequestError}; use crate::transport::load_balancing::{self, RoutingInfo}; use crate::transport::metrics::Metrics; use crate::transport::retry_policy::{QueryInfo, RetryDecision, RetrySession}; @@ -511,8 +511,7 @@ where let query_plan = load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_data); - let mut last_error: QueryError = - QueryError::ProtocolError("Empty query plan - driver bug!"); + let mut last_error: QueryError = QueryError::EmptyPlan; let mut current_consistency: Consistency = self.query_consistency; self.log_query_start(); @@ -723,9 +722,10 @@ where let (proof, _) = self.sender.send_empty_page(tracing_id).await; Ok(ControlFlow::Break(proof)) } - Ok(_) => { + Ok(response) => { self.metrics.inc_failed_paged_queries(); - let err = QueryError::ProtocolError("Unexpected response to next page query"); + let err = + ProtocolError::UnexpectedResponse(response.response.to_response_kind()).into(); self.execution_profile .load_balancing_policy .on_query_failure(&self.statement_info, elapsed, node, &err); @@ -879,9 +879,10 @@ where return Ok(proof); } _ => { - return Err(QueryError::ProtocolError( - "Unexpected response to next page query", - )); + return Err(ProtocolError::UnexpectedResponse( + response.response.to_response_kind(), + ) + .into()); } } } diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 20368cb96..51db7f97f 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -2844,6 +2844,7 @@ mod latency_awareness { | QueryError::CqlRequestSerialization(_) | QueryError::BrokenConnection(_) | QueryError::ConnectionPoolError(_) + | QueryError::EmptyPlan | QueryError::UnableToAllocStreamId | QueryError::DbError(DbError::IsBootstrapping, _) | QueryError::DbError(DbError::Unavailable { .. }, _) @@ -2856,7 +2857,7 @@ mod latency_awareness { | QueryError::CqlResultParseError(_) | QueryError::CqlErrorParseError(_) | QueryError::BodyExtensionsParseError(_) - | QueryError::InvalidMessage(_) + | QueryError::MetadataError(_) | QueryError::ProtocolError(_) | QueryError::TimeoutError | QueryError::RequestTimeout(_) => true, diff --git a/scylla/src/transport/retry_policy.rs b/scylla/src/transport/retry_policy.rs index 686f7643a..44baa4424 100644 --- a/scylla/src/transport/retry_policy.rs +++ b/scylla/src/transport/retry_policy.rs @@ -223,7 +223,7 @@ mod tests { use crate::statement::Consistency; use crate::test_utils::setup_tracing; use crate::transport::errors::{ - BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, QueryError, + BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, ProtocolError, QueryError, }; use crate::transport::errors::{DbError, WriteType}; use bytes::Bytes; @@ -299,7 +299,7 @@ mod tests { (got 1 values, 2 statements)" .to_owned(), ))); - default_policy_assert_never_retries(QueryError::ProtocolError("test")); + default_policy_assert_never_retries(ProtocolError::NonfinishedPagingState.into()); } // Asserts that for this error policy retries on next on idempotent queries only diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index f420641d0..116003304 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -8,7 +8,9 @@ use crate::cloud::CloudConfig; use crate::history; use crate::history::HistoryListener; pub use crate::transport::errors::TranslationError; -use crate::transport::errors::{BadQuery, NewSessionError, QueryError, UserRequestError}; +use crate::transport::errors::{ + BadQuery, NewSessionError, ProtocolError, QueryError, UserRequestError, +}; use crate::utils::pretty::{CommaSeparatedDisplayer, CqlValueDisplayer}; use arc_swap::ArcSwapOption; use async_trait::async_trait; @@ -38,6 +40,7 @@ use super::connection::NonErrorQueryResponse; use super::connection::QueryResponse; #[cfg(feature = "ssl")] use super::connection::SslConfig; +use super::errors::TracingProtocolError; use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner}; #[cfg(feature = "cloud")] use super::node::CloudEndpoint; @@ -644,9 +647,8 @@ impl Session { .query(&query, values, None, PagingState::start()) .await?; if !paging_state_response.finished() { - let err_msg = "Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug."; - error!(err_msg); - return Err(QueryError::ProtocolError(err_msg)); + error!("Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug."); + return Err(ProtocolError::NonfinishedPagingState.into()); } Ok(result) } @@ -991,9 +993,7 @@ impl Session { // Validate prepared ids equality for statement in results.flatten() { if prepared.get_id() != statement.get_id() { - return Err(QueryError::ProtocolError( - "Prepared statement Ids differ, all should be equal", - )); + return Err(ProtocolError::PreparedStatementIdsMismatch.into()); } // Collect all tracing ids from prepare() queries in the final result @@ -1079,9 +1079,8 @@ impl Session { .execute(prepared, &serialized_values, None, PagingState::start()) .await?; if !paging_state.finished() { - let err_msg = "Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug."; - error!(err_msg); - return Err(QueryError::ProtocolError(err_msg)); + error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug."); + return Err(ProtocolError::NonfinishedPagingState.into()); } Ok(result) } @@ -1599,12 +1598,7 @@ impl Session { }; } - Err(QueryError::ProtocolError( - "All tracing queries returned an empty result, \ - maybe the trace information didn't propagate yet. \ - Consider configuring Session with \ - a longer fetch interval (tracing_info_fetch_interval)", - )) + Err(ProtocolError::Tracing(TracingProtocolError::EmptyResults).into()) } /// Gets the name of the keyspace that is currently set, or `None` if no @@ -1649,12 +1643,12 @@ impl Session { let maybe_tracing_info: Option = traces_session_res .maybe_first_row_typed() .map_err(|err| match err { - MaybeFirstRowTypedError::RowsExpected(_) => QueryError::ProtocolError( - "Response to system_traces.sessions query was not Rows", - ), - MaybeFirstRowTypedError::FromRowError(_) => QueryError::ProtocolError( - "Columns from system_traces.session have an unexpected type", - ), + MaybeFirstRowTypedError::RowsExpected(e) => { + ProtocolError::Tracing(TracingProtocolError::TracesSessionNotRows(e)) + } + MaybeFirstRowTypedError::FromRowError(e) => { + ProtocolError::Tracing(TracingProtocolError::TracesSessionInvalidColumnType(e)) + } })?; let mut tracing_info = match maybe_tracing_info { @@ -1663,15 +1657,13 @@ impl Session { }; // Get tracing events - let tracing_event_rows = traces_events_res.rows_typed().map_err(|_| { - QueryError::ProtocolError("Response to system_traces.events query was not Rows") + let tracing_event_rows = traces_events_res.rows_typed().map_err(|err| { + ProtocolError::Tracing(TracingProtocolError::TracesEventsNotRows(err)) })?; for event in tracing_event_rows { - let tracing_event: TracingEvent = event.map_err(|_| { - QueryError::ProtocolError( - "Columns from system_traces.events have an unexpected type", - ) + let tracing_event: TracingEvent = event.map_err(|err| { + ProtocolError::Tracing(TracingProtocolError::TracesEventsInvalidColumnType(err)) })?; tracing_info.events.push(tracing_event); @@ -1822,9 +1814,7 @@ impl Session { }, ) .await - .unwrap_or(Err(QueryError::ProtocolError( - "Empty query plan - driver bug!", - ))) + .unwrap_or(Err(QueryError::EmptyPlan)) } } }; diff --git a/scylla/src/transport/speculative_execution.rs b/scylla/src/transport/speculative_execution.rs index 65389b5eb..ecc9d02c7 100644 --- a/scylla/src/transport/speculative_execution.rs +++ b/scylla/src/transport/speculative_execution.rs @@ -92,7 +92,7 @@ fn can_be_ignored(result: &Result) -> bool { } } -const EMPTY_PLAN_ERROR: QueryError = QueryError::ProtocolError("Empty query plan - driver bug!"); +const EMPTY_PLAN_ERROR: QueryError = QueryError::EmptyPlan; pub(crate) async fn execute( policy: &dyn SpeculativeExecutionPolicy, diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index f25935522..0d2671ec9 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -29,6 +29,10 @@ use tokio::sync::{broadcast, mpsc}; use tracing::{debug, error, trace, warn}; use uuid::Uuid; +use super::errors::{ + KeyspaceStrategyError, KeyspacesMetadataError, MetadataError, PeersMetadataError, + ProtocolError, TablesMetadataError, UdtMetadataError, ViewsMetadataError, +}; use super::node::{KnownNode, NodeAddr, ResolvedContactPoint}; /// Allows to read current metadata from the cluster @@ -404,11 +408,12 @@ impl fmt::Display for InvalidCqlType { impl From for QueryError { fn from(e: InvalidCqlType) -> Self { - // FIXME: The correct error type is QueryError:ProtocolError but at the moment it accepts only &'static str - QueryError::InvalidMessage(format!( - "error parsing type \"{:?}\" at position {}: {}", - e.type_, e.position, e.reason - )) + ProtocolError::InvalidCqlType { + typ: e.type_, + position: e.position, + reason: e.reason, + } + .into() } } @@ -750,16 +755,12 @@ async fn query_metadata( // There must be at least one peer if peers.is_empty() { - return Err(QueryError::ProtocolError( - "Bad Metadata: peers list is empty", - )); + return Err(MetadataError::Peers(PeersMetadataError::EmptyPeers).into()); } // At least one peer has to have some tokens if peers.iter().all(|peer| peer.tokens.is_empty()) { - return Err(QueryError::ProtocolError( - "Bad Metadata: All peers have empty token list", - )); + return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists).into()); } Ok(Metadata { peers, keyspaces }) @@ -957,11 +958,18 @@ async fn query_keyspaces( rows.map(|row_result| { let row = row_result?; - let (keyspace_name, strategy_map) = row.into_typed().map_err(|_| { - QueryError::ProtocolError("system_schema.keyspaces has invalid column type") + let (keyspace_name, strategy_map) = row.into_typed::<(String, _)>().map_err(|err| { + MetadataError::Keyspaces(KeyspacesMetadataError::SchemaKeyspacesInvalidColumnType( + err, + )) })?; - let strategy: Strategy = strategy_from_string_map(strategy_map)?; + let strategy: Strategy = strategy_from_string_map(strategy_map).map_err(|error| { + MetadataError::Keyspaces(KeyspacesMetadataError::Strategy { + keyspace: keyspace_name.clone(), + error, + }) + })?; let tables = all_tables.remove(&keyspace_name).unwrap_or_default(); let views = all_views.remove(&keyspace_name).unwrap_or_default(); let user_defined_types = all_user_defined_types @@ -1035,8 +1043,8 @@ async fn query_user_defined_types( let row = row_result?; let udt_row = row .into_typed::() - .map_err(|_| { - QueryError::ProtocolError("system_schema.types has invalid column type") + .map_err(|err| { + MetadataError::Udts(UdtMetadataError::SchemaTypesInvalidColumnType(err)) })? .try_into()?; @@ -1166,9 +1174,7 @@ fn topo_sort_udts(udts: &mut Vec) -> Result<(), Quer if sorted.len() < indegs.len() { // Some UDTs could not become leaves in the graph, which implies cycles. - return Err(QueryError::ProtocolError( - "Invalid fetched User Defined Types definitions: circular type dependency detected. Topological sort is thus impossible." - )); + return Err(MetadataError::Udts(UdtMetadataError::CircularTypeDependency).into()); } let owned_sorted = sorted.into_iter().cloned().collect::>(); @@ -1360,8 +1366,8 @@ async fn query_tables( rows.map(|row_result| { let row = row_result?; - let (keyspace_name, table_name) = row.into_typed().map_err(|_| { - QueryError::ProtocolError("system_schema.tables has invalid column type") + let (keyspace_name, table_name) = row.into_typed().map_err(|err| { + MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err)) })?; let keyspace_and_table_name = (keyspace_name, table_name); @@ -1402,8 +1408,8 @@ async fn query_views( rows.map(|row_result| { let row = row_result?; - let (keyspace_name, view_name, base_table_name) = row.into_typed().map_err(|_| { - QueryError::ProtocolError("system_schema.views has invalid column type") + let (keyspace_name, view_name, base_table_name) = row.into_typed().map_err(|err| { + MetadataError::Views(ViewsMetadataError::SchemaViewsInvalidColumnType(err)) })?; let keyspace_and_view_name = (keyspace_name, view_name); @@ -1457,8 +1463,8 @@ async fn query_tables_schema( String, i32, String, - ) = row.into_typed().map_err(|_| { - QueryError::ProtocolError("system_schema.columns has invalid column type") + ) = row.into_typed().map_err(|err| { + MetadataError::Tables(TablesMetadataError::SchemaColumnsInvalidColumnType(err)) })?; if type_ == THRIFT_EMPTY_TYPE { @@ -1468,16 +1474,21 @@ async fn query_tables_schema( let pre_cql_type = map_string_to_cql_type(&type_)?; let cql_type = pre_cql_type.into_cql_type(&keyspace_name, udts); + let kind = ColumnKind::from_str(&kind).map_err(|_| { + MetadataError::Tables(TablesMetadataError::UnknownColumnKind { + keyspace_name: keyspace_name.clone(), + table_name: table_name.clone(), + column_name: column_name.clone(), + column_kind: kind, + }) + })?; + let entry = tables_schema.entry((keyspace_name, table_name)).or_insert(( HashMap::new(), // columns HashMap::new(), // partition key HashMap::new(), // clustering key )); - let kind = ColumnKind::from_str(&kind) - // FIXME: The correct error type is QueryError:ProtocolError but at the moment it accepts only &'static str - .map_err(|_| QueryError::InvalidMessage(format!("invalid column kind {}", kind)))?; - if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering { let key_map = if kind == ColumnKind::PartitionKey { entry.1.borrow_mut() @@ -1670,8 +1681,8 @@ async fn query_table_partitioners( let result = rows .map(|row_result| { let (keyspace_name, table_name, partitioner) = - row_result?.into_typed().map_err(|_| { - QueryError::ProtocolError("system_schema.tables has invalid column type") + row_result?.into_typed().map_err(|err| { + MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err)) })?; Ok::<_, QueryError>(((keyspace_name, table_name), partitioner)) }) @@ -1690,25 +1701,19 @@ async fn query_table_partitioners( fn strategy_from_string_map( mut strategy_map: HashMap, -) -> Result { +) -> Result { let strategy_name: String = strategy_map .remove("class") - .ok_or(QueryError::ProtocolError( - "strategy map should have a 'class' field", - ))?; + .ok_or(KeyspaceStrategyError::MissingClassForStrategyDefinition)?; let strategy: Strategy = match strategy_name.as_str() { "org.apache.cassandra.locator.SimpleStrategy" | "SimpleStrategy" => { - let rep_factor_str: String = - strategy_map - .remove("replication_factor") - .ok_or(QueryError::ProtocolError( - "SimpleStrategy in strategy map does not have a replication factor", - ))?; + let rep_factor_str: String = strategy_map + .remove("replication_factor") + .ok_or(KeyspaceStrategyError::MissingReplicationFactorForSimpleStrategy)?; - let replication_factor: usize = usize::from_str(&rep_factor_str).map_err(|_| { - QueryError::ProtocolError("Could not parse replication factor as an integer") - })?; + let replication_factor: usize = usize::from_str(&rep_factor_str) + .map_err(KeyspaceStrategyError::ReplicationFactorParseError)?; Strategy::SimpleStrategy { replication_factor } } @@ -1716,13 +1721,18 @@ fn strategy_from_string_map( let mut datacenter_repfactors: HashMap = HashMap::with_capacity(strategy_map.len()); - for (datacenter, rep_factor_str) in strategy_map.drain() { - let rep_factor: usize = match usize::from_str(&rep_factor_str) { - Ok(number) => number, - Err(_) => continue, // There might be other things in the map, we care only about rep_factors - }; + for (key, value) in strategy_map.drain() { + let rep_factor: usize = usize::from_str(&value).map_err(|_| { + // Unexpected NTS option. + // We only expect 'class' (which is resolved above) + // and replication factors per dc. + KeyspaceStrategyError::UnexpectedNetworkTopologyStrategyOption { + key: key.clone(), + value, + } + })?; - datacenter_repfactors.insert(datacenter, rep_factor); + datacenter_repfactors.insert(key, rep_factor); } Strategy::NetworkTopologyStrategy {