Skip to content

Commit

Permalink
Serialization refactor: WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorak-mmk committed Nov 3, 2023
1 parent 25677ba commit a78e757
Show file tree
Hide file tree
Showing 17 changed files with 556 additions and 256 deletions.
25 changes: 18 additions & 7 deletions scylla-cql/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ use std::borrow::Cow;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};

use scylla_cql::frame::request::SerializableRequest;
use scylla_cql::frame::value::SerializedValues;
use scylla_cql::frame::value::ValueList;
use scylla_cql::frame::response::result::ColumnType;
use scylla_cql::frame::{request::query, Compression, SerializedRequest};
use scylla_cql::types::serialize::row::NewSerializedValues;

fn make_query<'a>(contents: &'a str, values: &'a SerializedValues) -> query::Query<'a> {
fn make_query(contents: &str, values: NewSerializedValues) -> query::Query<'_> {
query::Query {
contents: Cow::Borrowed(contents),
parameters: query::QueryParameters {
consistency: scylla_cql::Consistency::LocalQuorum,
serial_consistency: None,
values: Cow::Borrowed(values),
values: Cow::Owned(values),
page_size: None,
paging_state: None,
timestamp: None,
Expand All @@ -22,13 +22,24 @@ fn make_query<'a>(contents: &'a str, values: &'a SerializedValues) -> query::Que
}

fn serialized_request_make_bench(c: &mut Criterion) {
let mut values = NewSerializedValues::new();
let mut group = c.benchmark_group("LZ4Compression.SerializedRequest");
let query_args = [
("INSERT foo INTO ks.table_name (?)", &(1234,).serialized().unwrap()),
("INSERT foo, bar, baz INTO ks.table_name (?, ?, ?)", &(1234, "a value", "i am storing a string").serialized().unwrap()),
("INSERT foo INTO ks.table_name (?)", {
values.add_value(&1234, &ColumnType::Int).unwrap();
values.clone()
}),
("INSERT foo, bar, baz INTO ks.table_name (?, ?, ?)", {
values.add_value(&"a value", &ColumnType::Text).unwrap();
values.add_value(&"i am storing a string", &ColumnType::Text).unwrap();
values.clone()
}),
(
"INSERT foo, bar, baz, boop, blah INTO longer_keyspace.a_big_table_name (?, ?, ?, ?, 1000)",
&(1234, "a value", "i am storing a string", "dc0c8cd7-d954-47c1-8722-a857941c43fb").serialized().unwrap()
{
values.add_value(&"dc0c8cd7-d954-47c1-8722-a857941c43fb", &ColumnType::Text).unwrap();
values.clone()
}
),
];
let queries = query_args.map(|(q, v)| make_query(q, v));
Expand Down
10 changes: 10 additions & 0 deletions scylla-cql/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::frame::frame_errors::{FrameError, ParseError};
use crate::frame::protocol_features::ProtocolFeatures;
use crate::frame::value::SerializeValuesError;
use crate::types::serialize::SerializationError;
use crate::Consistency;
use bytes::Bytes;
use std::io::ErrorKind;
Expand Down Expand Up @@ -340,6 +341,9 @@ pub enum BadQuery {
#[error("Serializing values failed: {0} ")]
SerializeValuesError(#[from] SerializeValuesError),

#[error("Serializing values failed: {0} ")]
SerializationError(#[from] SerializationError),

/// Serialized values are too long to compute partition key
#[error("Serialized values are too long to compute partition key! Length: {0}, Max allowed length: {1}")]
ValuesTooLongForKey(usize, usize),
Expand Down Expand Up @@ -443,6 +447,12 @@ impl From<SerializeValuesError> for QueryError {
}
}

impl From<SerializationError> for QueryError {
fn from(serialized_err: SerializationError) -> QueryError {
QueryError::BadQuery(BadQuery::SerializationError(serialized_err))
}
}

impl From<ParseError> for QueryError {
fn from(parse_error: ParseError) -> QueryError {
QueryError::InvalidMessage(format!("Error parsing message: {}", parse_error))
Expand Down
3 changes: 3 additions & 0 deletions scylla-cql/src/frame/frame_errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::response;
use crate::cql_to_rust::CqlTypeError;
use crate::frame::value::SerializeValuesError;
use crate::types::serialize::SerializationError;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -44,5 +45,7 @@ pub enum ParseError {
#[error(transparent)]
SerializeValuesError(#[from] SerializeValuesError),
#[error(transparent)]
SerializationError(#[from] SerializationError),
#[error(transparent)]
CqlTypeError(#[from] CqlTypeError),
}
36 changes: 26 additions & 10 deletions scylla-cql/src/frame/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ mod tests {
query::{Query, QueryParameters},
DeserializableRequest, SerializableRequest,
},
response::result::ColumnType,
types::{self, SerialConsistency},
value::SerializedValues,
},
types::serialize::row::NewSerializedValues,
Consistency,
};

Expand All @@ -129,8 +130,8 @@ mod tests {
page_size: Some(323),
paging_state: Some(vec![2, 1, 3, 7].into()),
values: {
let mut vals = SerializedValues::new();
vals.add_value(&2137).unwrap();
let mut vals = NewSerializedValues::new();
vals.add_value(&2137, &ColumnType::Int).unwrap();
Cow::Owned(vals)
},
};
Expand All @@ -156,9 +157,9 @@ mod tests {
page_size: None,
paging_state: None,
values: {
let mut vals = SerializedValues::new();
vals.add_named_value("the_answer", &42).unwrap();
vals.add_named_value("really?", &2137).unwrap();
let mut vals = NewSerializedValues::new();
vals.add_value(&42, &ColumnType::Int).unwrap();
vals.add_value(&2137, &ColumnType::Int).unwrap();
Cow::Owned(vals)
},
};
Expand Down Expand Up @@ -189,8 +190,18 @@ mod tests {

// Not execute's values, because named values are not supported in batches.
values: vec![
query.parameters.values.deref().clone(),
query.parameters.values.deref().clone(),
query
.parameters
.values
.deref()
.clone()
.into_old_serialized_values(),
query
.parameters
.values
.deref()
.clone()
.into_old_serialized_values(),
],
};
{
Expand All @@ -212,7 +223,7 @@ mod tests {
timestamp: None,
page_size: None,
paging_state: None,
values: Cow::Owned(SerializedValues::new()),
values: Cow::Owned(NewSerializedValues::new()),
};
let query = Query {
contents: contents.clone(),
Expand Down Expand Up @@ -261,7 +272,12 @@ mod tests {
serial_consistency: None,
timestamp: None,

values: vec![query.parameters.values.deref().clone()],
values: vec![query
.parameters
.values
.deref()
.clone()
.into_old_serialized_values()],
};
{
let mut buf = Vec::new();
Expand Down
24 changes: 14 additions & 10 deletions scylla-cql/src/frame/request/query.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::borrow::Cow;

use crate::frame::{frame_errors::ParseError, types::SerialConsistency};
use crate::{
frame::{frame_errors::ParseError, types::SerialConsistency},
types::serialize::row::NewSerializedValues,
};
use bytes::{Buf, BufMut, Bytes};

use crate::{
frame::request::{RequestOpcode, SerializableRequest},
frame::types,
frame::value::SerializedValues,
};

use super::DeserializableRequest;
Expand Down Expand Up @@ -61,7 +63,7 @@ pub struct QueryParameters<'a> {
pub timestamp: Option<i64>,
pub page_size: Option<i32>,
pub paging_state: Option<Bytes>,
pub values: Cow<'a, SerializedValues>,
pub values: Cow<'a, NewSerializedValues>,
}

impl Default for QueryParameters<'_> {
Expand All @@ -72,7 +74,7 @@ impl Default for QueryParameters<'_> {
timestamp: None,
page_size: None,
paging_state: None,
values: Cow::Borrowed(SerializedValues::EMPTY),
values: Cow::Owned(NewSerializedValues::new()),
}
}
}
Expand Down Expand Up @@ -102,10 +104,6 @@ impl QueryParameters<'_> {
flags |= FLAG_WITH_DEFAULT_TIMESTAMP;
}

if self.values.has_names() {
flags |= FLAG_WITH_NAMES_FOR_VALUES;
}

buf.put_u8(flags);

if !self.values.is_empty() {
Expand Down Expand Up @@ -151,10 +149,16 @@ impl<'q> QueryParameters<'q> {
let default_timestamp_flag = (flags & FLAG_WITH_DEFAULT_TIMESTAMP) != 0;
let values_have_names_flag = (flags & FLAG_WITH_NAMES_FOR_VALUES) != 0;

if values_have_names_flag {
return Err(ParseError::BadIncomingData(
"Named values in frame are currently unsupported".to_string(),
));
}

let values = Cow::Owned(if values_flag {
SerializedValues::new_from_frame(buf, values_have_names_flag)?
NewSerializedValues::new_from_frame(buf)?
} else {
SerializedValues::new()
NewSerializedValues::new()
});

let page_size = page_size_flag.then(|| types::read_int(buf)).transpose()?;
Expand Down
4 changes: 2 additions & 2 deletions scylla-cql/src/types/serialize/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{any::Any, sync::Arc};
use std::{error::Error, sync::Arc};

pub mod row;
pub mod value;

type SerializationError = Arc<dyn Any + Send + Sync>;
pub type SerializationError = Arc<dyn Error + Send + Sync>;

/// An interface that facilitates writing values for a CQL query.
pub trait RowWriter {
Expand Down
Loading

0 comments on commit a78e757

Please sign in to comment.