Skip to content

Commit

Permalink
Merge pull request #1088 from wprzytula/new-deserialization-api-yet-a…
Browse files Browse the repository at this point in the history
…nother-preparatory-bunch

New deserialization API - yet another bunch of preparations
  • Loading branch information
wprzytula authored Oct 11, 2024
2 parents afa91f2 + 5cb42fa commit eb14ca8
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 60 deletions.
4 changes: 3 additions & 1 deletion scylla-cql/src/frame/frame_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub use super::request::{
use super::response::result::TableSpec;
use super::response::CqlResponseKind;
use super::TryFromPrimitiveError;
use crate::types::deserialize::DeserializationError;
use crate::types::deserialize::{DeserializationError, TypeCheckError};
use thiserror::Error;

/// An error returned by `parse_response_body_extensions`.
Expand Down Expand Up @@ -320,6 +320,8 @@ pub enum RowsParseError {
},
#[error("Malformed rows count: {0}")]
RowsCountParseError(LowLevelDeserializationError),
#[error("Data type check prior to deserialization failed: {0}")]
IncomingDataTypeCheckError(#[from] TypeCheckError),
#[error("Data deserialization failed: {0}")]
DataDeserializationError(#[from] DeserializationError),
}
Expand Down
160 changes: 102 additions & 58 deletions scylla-cql/src/frame/response/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,11 +693,10 @@ macro_rules! generate_deser_type {
};
}

generate_deser_type!(deser_type_owned, 'static, |buf| types::read_string(buf).map(ToOwned::to_owned));

// This is going to be used for deserializing borrowed result metadata.
generate_deser_type!(_deser_type_borrowed, 'frame, types::read_string);

generate_deser_type!(deser_type_owned, 'static, |buf| types::read_string(buf).map(ToOwned::to_owned));

/// Deserializes a table spec, be it per-column one or a global one,
/// in the borrowed form.
///
Expand All @@ -721,70 +720,114 @@ fn mk_col_spec_parse_error(
}
}

/// Deserializes col specs (part of ResultMetadata or PreparedMetadata)
/// in the owned form.
/// Deserializes table spec of a column spec in the borrowed form.
///
/// Checks for equality of table specs across columns, because the protocol
/// does not guarantee that and we want to be sure that the assumption
/// of them being all the same is correct.
/// To this end, the first column's table spec is written to `known_table_spec`
/// and compared with remaining columns' table spec.
///
/// To avoid needless allocations, it is advised to pass `global_table_spec`
/// To avoid needless allocations, it is advised to pass `known_table_spec`
/// in the borrowed form, so that cloning it is cheap.
fn deser_col_specs(
buf: &mut &[u8],
global_table_spec: Option<TableSpec<'_>>,
col_count: usize,
) -> StdResult<Vec<ColumnSpec<'static>>, ColumnSpecParseError> {
let global_table_spec_provided = global_table_spec.is_some();
let mut known_table_spec = global_table_spec;

let mut col_specs = Vec::with_capacity(col_count);
for col_idx in 0..col_count {
let table_spec = match known_table_spec {
// If global table spec was provided, we simply clone it to each column spec.
Some(ref known_spec) if global_table_spec_provided => known_spec.clone(),

// Else, we deserialize the table spec for a column and, if we already know some
// previous spec (i.e. that of the first column), we perform equality check
// against it.
Some(_) | None => {
let table_spec =
deser_table_spec(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;

if let Some(ref known_spec) = known_table_spec {
// We assume that for each column, table spec is the same.
// As this is not guaranteed by the CQL protocol specification but only by how
// Cassandra and ScyllaDB work (no support for joins), we perform a sanity check here.
if known_spec.table_name != table_spec.table_name
|| known_spec.ks_name != table_spec.ks_name
{
return Err(mk_col_spec_parse_error(
col_idx,
ColumnSpecParseErrorKind::TableSpecDiffersAcrossColumns(
known_spec.clone().into_owned(),
table_spec.into_owned(),
),
));
}
} else {
// Once we have read the first column spec, we save its table spec
// in order to verify its equality with other columns'.
known_table_spec = Some(table_spec.clone());
fn deser_table_spec_for_col_spec<'frame>(
buf: &'_ mut &'frame [u8],
global_table_spec_provided: bool,
known_table_spec: &'_ mut Option<TableSpec<'frame>>,
col_idx: usize,
) -> StdResult<TableSpec<'frame>, ColumnSpecParseError> {
let table_spec = match known_table_spec {
// If global table spec was provided, we simply clone it to each column spec.
Some(ref known_spec) if global_table_spec_provided => known_spec.clone(),

// Else, we deserialize the table spec for a column and, if we already know some
// previous spec (i.e. that of the first column), we perform equality check
// against it.
Some(_) | None => {
let table_spec =
deser_table_spec(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;

if let Some(ref known_spec) = known_table_spec {
// We assume that for each column, table spec is the same.
// As this is not guaranteed by the CQL protocol specification but only by how
// Cassandra and ScyllaDB work (no support for joins), we perform a sanity check here.
if known_spec.table_name != table_spec.table_name
|| known_spec.ks_name != table_spec.ks_name
{
return Err(mk_col_spec_parse_error(
col_idx,
ColumnSpecParseErrorKind::TableSpecDiffersAcrossColumns(
known_spec.clone().into_owned(),
table_spec.into_owned(),
),
));
}

table_spec
} else {
// Once we have read the first column spec, we save its table spec
// in order to verify its equality with other columns'.
*known_table_spec = Some(table_spec.clone());
}
};

let name = types::read_string(buf)
.map_err(|err| mk_col_spec_parse_error(col_idx, err))?
.to_owned();
let typ = deser_type_owned(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
col_specs.push(ColumnSpec::owned(name, typ, table_spec.into_owned()));
}
Ok(col_specs)
table_spec
}
};

Ok(table_spec)
}

macro_rules! generate_deser_col_specs {
($deser_col_specs: ident, $l: lifetime, $deser_type: ident, $make_col_spec: expr $(,)?) => {
/// Deserializes col specs (part of ResultMetadata or PreparedMetadata)
/// in the form mentioned by its name.
///
/// Checks for equality of table specs across columns, because the protocol
/// does not guarantee that and we want to be sure that the assumption
/// of them being all the same is correct.
///
/// To avoid needless allocations, it is advised to pass `global_table_spec`
/// in the borrowed form, so that cloning it is cheap.
fn $deser_col_specs<'frame>(
buf: &mut &'frame [u8],
global_table_spec: Option<TableSpec<'frame>>,
col_count: usize,
) -> StdResult<Vec<ColumnSpec<$l>>, ColumnSpecParseError> {
let global_table_spec_provided = global_table_spec.is_some();
let mut known_table_spec = global_table_spec;

let mut col_specs = Vec::with_capacity(col_count);
for col_idx in 0..col_count {
let table_spec = deser_table_spec_for_col_spec(
buf,
global_table_spec_provided,
&mut known_table_spec,
col_idx,
)?;

let name =
types::read_string(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
let typ = $deser_type(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
let col_spec = $make_col_spec(name, typ, table_spec);
col_specs.push(col_spec);
}
Ok(col_specs)
}
};
}

generate_deser_col_specs!(
_deser_col_specs_borrowed,
'frame,
_deser_type_borrowed,
ColumnSpec::borrowed,
);

generate_deser_col_specs!(
deser_col_specs_owned,
'static,
deser_type_owned,
|name: &str, typ, table_spec: TableSpec| ColumnSpec::owned(name.to_owned(), typ, table_spec.into_owned()),
);

fn deser_result_metadata(
buf: &mut &[u8],
) -> StdResult<(ResultMetadata<'static>, PagingStateResponse), ResultMetadataParseError> {
Expand All @@ -800,6 +843,7 @@ fn deser_result_metadata(
let raw_paging_state = has_more_pages
.then(|| types::read_bytes(buf).map_err(ResultMetadataParseError::PagingStateParseError))
.transpose()?;

let paging_state = PagingStateResponse::new_from_raw_bytes(raw_paging_state);

let col_specs = if no_metadata {
Expand All @@ -809,7 +853,7 @@ fn deser_result_metadata(
.then(|| deser_table_spec(buf))
.transpose()?;

deser_col_specs(buf, global_table_spec, col_count)?
deser_col_specs_owned(buf, global_table_spec, col_count)?
};

let metadata = ResultMetadata {
Expand Down Expand Up @@ -847,7 +891,7 @@ fn deser_prepared_metadata(
.then(|| deser_table_spec(buf))
.transpose()?;

let col_specs = deser_col_specs(buf, global_table_spec, col_count)?;
let col_specs = deser_col_specs_owned(buf, global_table_spec, col_count)?;

Ok(PreparedMetadata {
flags,
Expand Down
14 changes: 13 additions & 1 deletion scylla/src/transport/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use scylla_cql::{
CqlAuthChallengeParseError, CqlAuthSuccessParseError, CqlAuthenticateParseError,
CqlErrorParseError, CqlEventParseError, CqlRequestSerializationError,
CqlResponseParseError, CqlResultParseError, CqlSupportedParseError,
FrameBodyExtensionsParseError, FrameHeaderParseError,
FrameBodyExtensionsParseError, FrameHeaderParseError, RowsParseError,
},
request::CqlRequestKind,
response::CqlResponseKind,
Expand Down Expand Up @@ -178,6 +178,18 @@ impl From<response::Error> for QueryError {
}
}

impl From<RowsParseError> for QueryError {
fn from(err: RowsParseError) -> Self {
let err: CqlResultParseError = err.into();
let err: CqlResponseParseError = err.into();
let err: RequestError = err.into();
let err: UserRequestError = err.into();
let err: QueryError = err.into();

err
}
}

/// Error that occurred during session creation
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
Expand Down

0 comments on commit eb14ca8

Please sign in to comment.