diff --git a/docs/source/data-types/udt.md b/docs/source/data-types/udt.md index ceb8ae8ca6..e79ad3feae 100644 --- a/docs/source/data-types/udt.md +++ b/docs/source/data-types/udt.md @@ -37,10 +37,10 @@ Now it can be sent and received just like any other CQL value: # use std::error::Error; # async fn check_only_compiles(session: &Session) -> Result<(), Box> { use scylla::IntoTypedRows; -use scylla::macros::{FromUserType, IntoUserType}; +use scylla::macros::{FromUserType, IntoUserType, SerializeCql}; use scylla::cql_to_rust::FromCqlVal; -#[derive(Debug, IntoUserType, FromUserType)] +#[derive(Debug, IntoUserType, FromUserType, SerializeCql)] struct MyType { int_val: i32, text_val: Option, diff --git a/docs/source/queries/values.md b/docs/source/queries/values.md index 09b369b689..400e7139ab 100644 --- a/docs/source/queries/values.md +++ b/docs/source/queries/values.md @@ -12,7 +12,7 @@ or a custom struct which derives from `ValueList`. A few examples: ```rust # extern crate scylla; -# use scylla::{Session, ValueList, frame::response::result::CqlValue}; +# use scylla::{Session, ValueList, SerializeRow, frame::response::result::CqlValue}; # use std::error::Error; # use std::collections::HashMap; # async fn check_only_compiles(session: &Session) -> Result<(), Box> { @@ -34,7 +34,7 @@ session // Sending an integer and a string using a named struct. // The values will be passed in the order from the struct definition -#[derive(ValueList)] +#[derive(ValueList, SerializeRow)] struct IntString { first_col: i32, second_col: String, diff --git a/examples/compare-tokens.rs b/examples/compare-tokens.rs index 5c56aa5f4d..3e46d20b44 100644 --- a/examples/compare-tokens.rs +++ b/examples/compare-tokens.rs @@ -1,5 +1,4 @@ use anyhow::Result; -use scylla::frame::value::ValueList; use scylla::routing::Token; use scylla::transport::NodeAddr; use scylla::{Session, SessionBuilder}; @@ -29,8 +28,7 @@ async fn main() -> Result<()> { .query("INSERT INTO ks.t (pk) VALUES (?)", (pk,)) .await?; - let serialized_pk = (pk,).serialized()?.into_owned(); - let t = prepared.calculate_token(&serialized_pk)?.unwrap().value; + let t = prepared.calculate_token(&(pk,))?.unwrap().value; println!( "Token endpoints for query: {:?}", diff --git a/examples/user-defined-type.rs b/examples/user-defined-type.rs index 53465b2f10..5a0f1b55f5 100644 --- a/examples/user-defined-type.rs +++ b/examples/user-defined-type.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use scylla::macros::{FromUserType, IntoUserType}; -use scylla::{IntoTypedRows, Session, SessionBuilder}; +use scylla::macros::FromUserType; +use scylla::{IntoTypedRows, SerializeCql, Session, SessionBuilder}; use std::env; #[tokio::main] @@ -29,7 +29,7 @@ async fn main() -> Result<()> { // Define custom struct that matches User Defined Type created earlier // wrapping field in Option will gracefully handle null field values - #[derive(Debug, IntoUserType, FromUserType)] + #[derive(Debug, FromUserType, SerializeCql)] struct MyType { int_val: i32, text_val: Option, diff --git a/examples/value_list.rs b/examples/value_list.rs index 44b388dcbc..409f8a5208 100644 --- a/examples/value_list.rs +++ b/examples/value_list.rs @@ -19,7 +19,7 @@ async fn main() { .await .unwrap(); - #[derive(scylla::ValueList)] + #[derive(scylla::SerializeRow)] struct MyType<'a> { k: i32, my: Option<&'a str>, @@ -36,8 +36,8 @@ async fn main() { .unwrap(); // You can also use type generics: - #[derive(scylla::ValueList)] - struct MyTypeWithGenerics { + #[derive(scylla::SerializeRow)] + struct MyTypeWithGenerics { k: i32, my: Option, } diff --git a/scylla-cql/benches/benchmark.rs b/scylla-cql/benches/benchmark.rs index 0aa6c89102..2ab15f5051 100644 --- a/scylla-cql/benches/benchmark.rs +++ b/scylla-cql/benches/benchmark.rs @@ -3,17 +3,17 @@ use std::borrow::Cow; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use scylla_cql::frame::request::SerializableRequest; -use scylla_cql::frame::value::SerializedValues; -use scylla_cql::frame::value::ValueList; +use scylla_cql::frame::response::result::ColumnType; use scylla_cql::frame::{request::query, Compression, SerializedRequest}; +use scylla_cql::types::serialize::row::SerializedValues; -fn make_query<'a>(contents: &'a str, values: &'a SerializedValues) -> query::Query<'a> { +fn make_query(contents: &str, values: SerializedValues) -> query::Query<'_> { query::Query { contents: Cow::Borrowed(contents), parameters: query::QueryParameters { consistency: scylla_cql::Consistency::LocalQuorum, serial_consistency: None, - values: Cow::Borrowed(values), + values: Cow::Owned(values), page_size: None, paging_state: None, timestamp: None, @@ -22,13 +22,24 @@ fn make_query<'a>(contents: &'a str, values: &'a SerializedValues) -> query::Que } fn serialized_request_make_bench(c: &mut Criterion) { + let mut values = SerializedValues::new(); let mut group = c.benchmark_group("LZ4Compression.SerializedRequest"); let query_args = [ - ("INSERT foo INTO ks.table_name (?)", &(1234,).serialized().unwrap()), - ("INSERT foo, bar, baz INTO ks.table_name (?, ?, ?)", &(1234, "a value", "i am storing a string").serialized().unwrap()), + ("INSERT foo INTO ks.table_name (?)", { + values.add_value(&1234, &ColumnType::Int).unwrap(); + values.clone() + }), + ("INSERT foo, bar, baz INTO ks.table_name (?, ?, ?)", { + values.add_value(&"a value", &ColumnType::Text).unwrap(); + values.add_value(&"i am storing a string", &ColumnType::Text).unwrap(); + values.clone() + }), ( "INSERT foo, bar, baz, boop, blah INTO longer_keyspace.a_big_table_name (?, ?, ?, ?, 1000)", - &(1234, "a value", "i am storing a string", "dc0c8cd7-d954-47c1-8722-a857941c43fb").serialized().unwrap() + { + values.add_value(&"dc0c8cd7-d954-47c1-8722-a857941c43fb", &ColumnType::Text).unwrap(); + values.clone() + } ), ]; let queries = query_args.map(|(q, v)| make_query(q, v)); diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 9e80247e20..e884e37ad5 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -3,6 +3,7 @@ use crate::frame::frame_errors::{FrameError, ParseError}; use crate::frame::protocol_features::ProtocolFeatures; use crate::frame::value::SerializeValuesError; +use crate::types::serialize::SerializationError; use crate::Consistency; use bytes::Bytes; use std::io::ErrorKind; @@ -340,6 +341,9 @@ pub enum BadQuery { #[error("Serializing values failed: {0} ")] SerializeValuesError(#[from] SerializeValuesError), + #[error("Serializing values failed: {0} ")] + SerializationError(#[from] SerializationError), + /// Serialized values are too long to compute partition key #[error("Serialized values are too long to compute partition key! Length: {0}, Max allowed length: {1}")] ValuesTooLongForKey(usize, usize), @@ -443,6 +447,12 @@ impl From for QueryError { } } +impl From for QueryError { + fn from(serialized_err: SerializationError) -> QueryError { + QueryError::BadQuery(BadQuery::SerializationError(serialized_err)) + } +} + impl From for QueryError { fn from(parse_error: ParseError) -> QueryError { QueryError::InvalidMessage(format!("Error parsing message: {}", parse_error)) diff --git a/scylla-cql/src/frame/frame_errors.rs b/scylla-cql/src/frame/frame_errors.rs index 3da4e26d01..9a3b228505 100644 --- a/scylla-cql/src/frame/frame_errors.rs +++ b/scylla-cql/src/frame/frame_errors.rs @@ -1,6 +1,7 @@ use super::response; use crate::cql_to_rust::CqlTypeError; use crate::frame::value::SerializeValuesError; +use crate::types::serialize::SerializationError; use thiserror::Error; #[derive(Error, Debug)] @@ -44,5 +45,7 @@ pub enum ParseError { #[error(transparent)] SerializeValuesError(#[from] SerializeValuesError), #[error(transparent)] + SerializationError(#[from] SerializationError), + #[error(transparent)] CqlTypeError(#[from] CqlTypeError), } diff --git a/scylla-cql/src/frame/request/batch.rs b/scylla-cql/src/frame/request/batch.rs index 35dd8c3c3b..5b5c2f84b6 100644 --- a/scylla-cql/src/frame/request/batch.rs +++ b/scylla-cql/src/frame/request/batch.rs @@ -5,7 +5,7 @@ use crate::frame::{ frame_errors::ParseError, request::{RequestOpcode, SerializableRequest}, types::{self, SerialConsistency}, - value::{BatchValues, BatchValuesIterator, SerializedValues}, + value::{BatchValues, BatchValuesIterator, LegacySerializedValues}, }; use super::DeserializableRequest; @@ -186,7 +186,7 @@ impl<'s, 'b> From<&'s BatchStatement<'b>> for BatchStatement<'s> { } } -impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec> { +impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec> { fn deserialize(buf: &mut &[u8]) -> Result { let batch_type = buf.get_u8().try_into()?; @@ -196,7 +196,7 @@ impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec, Vec) = + let (statements, values): (Vec, Vec) = statements_with_values.into_iter().unzip(); Ok(Self { diff --git a/scylla-cql/src/frame/request/mod.rs b/scylla-cql/src/frame/request/mod.rs index cd41d6bce1..8a625f2806 100644 --- a/scylla-cql/src/frame/request/mod.rs +++ b/scylla-cql/src/frame/request/mod.rs @@ -22,7 +22,7 @@ pub use startup::Startup; use self::batch::BatchStatement; use super::types::SerialConsistency; -use super::value::SerializedValues; +use super::value::LegacySerializedValues; #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, TryFromPrimitive)] #[repr(u8)] @@ -59,7 +59,7 @@ pub trait DeserializableRequest: SerializableRequest + Sized { pub enum Request<'r> { Query(Query<'r>), Execute(Execute<'r>), - Batch(Batch<'r, BatchStatement<'r>, Vec>), + Batch(Batch<'r, BatchStatement<'r>, Vec>), } impl<'r> Request<'r> { @@ -112,9 +112,10 @@ mod tests { query::{Query, QueryParameters}, DeserializableRequest, SerializableRequest, }, + response::result::ColumnType, types::{self, SerialConsistency}, - value::SerializedValues, }, + types::serialize::row::SerializedValues, Consistency, }; @@ -130,7 +131,7 @@ mod tests { paging_state: Some(vec![2, 1, 3, 7].into()), values: { let mut vals = SerializedValues::new(); - vals.add_value(&2137).unwrap(); + vals.add_value(&2137, &ColumnType::Int).unwrap(); Cow::Owned(vals) }, }; @@ -157,8 +158,8 @@ mod tests { paging_state: None, values: { let mut vals = SerializedValues::new(); - vals.add_named_value("the_answer", &42).unwrap(); - vals.add_named_value("really?", &2137).unwrap(); + vals.add_value(&42, &ColumnType::Int).unwrap(); + vals.add_value(&2137, &ColumnType::Int).unwrap(); Cow::Owned(vals) }, }; @@ -189,8 +190,8 @@ mod tests { // Not execute's values, because named values are not supported in batches. values: vec![ - query.parameters.values.deref().clone(), - query.parameters.values.deref().clone(), + query.parameters.values.deref().to_old_serialized_values(), + query.parameters.values.deref().to_old_serialized_values(), ], }; { @@ -212,7 +213,7 @@ mod tests { timestamp: None, page_size: None, paging_state: None, - values: Cow::Owned(SerializedValues::new()), + values: Cow::Borrowed(SerializedValues::EMPTY), }; let query = Query { contents: contents.clone(), @@ -261,7 +262,7 @@ mod tests { serial_consistency: None, timestamp: None, - values: vec![query.parameters.values.deref().clone()], + values: vec![query.parameters.values.deref().to_old_serialized_values()], }; { let mut buf = Vec::new(); diff --git a/scylla-cql/src/frame/request/query.rs b/scylla-cql/src/frame/request/query.rs index ff0b0cc867..348127eda7 100644 --- a/scylla-cql/src/frame/request/query.rs +++ b/scylla-cql/src/frame/request/query.rs @@ -1,12 +1,14 @@ use std::borrow::Cow; -use crate::frame::{frame_errors::ParseError, types::SerialConsistency}; +use crate::{ + frame::{frame_errors::ParseError, types::SerialConsistency}, + types::serialize::row::SerializedValues, +}; use bytes::{Buf, BufMut, Bytes}; use crate::{ frame::request::{RequestOpcode, SerializableRequest}, frame::types, - frame::value::SerializedValues, }; use super::DeserializableRequest; @@ -102,10 +104,6 @@ impl QueryParameters<'_> { flags |= FLAG_WITH_DEFAULT_TIMESTAMP; } - if self.values.has_names() { - flags |= FLAG_WITH_NAMES_FOR_VALUES; - } - buf.put_u8(flags); if !self.values.is_empty() { @@ -151,8 +149,14 @@ impl<'q> QueryParameters<'q> { let default_timestamp_flag = (flags & FLAG_WITH_DEFAULT_TIMESTAMP) != 0; let values_have_names_flag = (flags & FLAG_WITH_NAMES_FOR_VALUES) != 0; + if values_have_names_flag { + return Err(ParseError::BadIncomingData( + "Named values in frame are currently unsupported".to_string(), + )); + } + let values = Cow::Owned(if values_flag { - SerializedValues::new_from_frame(buf, values_have_names_flag)? + SerializedValues::new_from_frame(buf)? } else { SerializedValues::new() }); diff --git a/scylla-cql/src/frame/value.rs b/scylla-cql/src/frame/value.rs index fa3355f4a8..e4be751635 100644 --- a/scylla-cql/src/frame/value.rs +++ b/scylla-cql/src/frame/value.rs @@ -256,7 +256,7 @@ impl TryInto for CqlTime { /// Keeps a buffer with serialized Values /// Allows adding new Values and iterating over serialized ones #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub struct SerializedValues { +pub struct LegacySerializedValues { serialized_values: Vec, values_num: u16, contains_names: bool, @@ -282,27 +282,27 @@ pub enum SerializeValuesError { ParseError, } -pub type SerializedResult<'a> = Result, SerializeValuesError>; +pub type SerializedResult<'a> = Result, SerializeValuesError>; /// Represents list of values to be sent in a query /// gets serialized and but into request pub trait ValueList { - /// Provides a view of ValueList as SerializedValues - /// returns `Cow` to make impl ValueList for SerializedValues efficient + /// Provides a view of ValueList as LegacySerializedValues + /// returns `Cow` to make impl ValueList for LegacySerializedValues efficient fn serialized(&self) -> SerializedResult<'_>; fn write_to_request(&self, buf: &mut impl BufMut) -> Result<(), SerializeValuesError> { let serialized = self.serialized()?; - SerializedValues::write_to_request(&serialized, buf); + LegacySerializedValues::write_to_request(&serialized, buf); Ok(()) } } -impl SerializedValues { +impl LegacySerializedValues { /// Creates empty value list pub const fn new() -> Self { - SerializedValues { + LegacySerializedValues { serialized_values: Vec::new(), values_num: 0, contains_names: false, @@ -310,7 +310,7 @@ impl SerializedValues { } pub fn with_capacity(capacity: usize) -> Self { - SerializedValues { + LegacySerializedValues { serialized_values: Vec::with_capacity(capacity), values_num: 0, contains_names: false, @@ -322,7 +322,7 @@ impl SerializedValues { } /// A const empty instance, useful for taking references - pub const EMPTY: &'static SerializedValues = &SerializedValues::new(); + pub const EMPTY: &'static LegacySerializedValues = &LegacySerializedValues::new(); /// Serializes value and appends it to the list pub fn add_value(&mut self, val: &impl Value) -> Result<(), SerializeValuesError> { @@ -372,7 +372,7 @@ impl SerializedValues { } pub fn iter(&self) -> impl Iterator { - SerializedValuesIterator { + LegacySerializedValuesIterator { serialized_values: &self.serialized_values, contains_names: self.contains_names, } @@ -408,7 +408,7 @@ impl SerializedValues { let values_len_in_buf = values_beg.len() - buf.len(); let values_in_frame = &values_beg[0..values_len_in_buf]; - Ok(SerializedValues { + Ok(LegacySerializedValues { serialized_values: values_in_frame.to_vec(), values_num, contains_names, @@ -418,7 +418,7 @@ impl SerializedValues { pub fn iter_name_value_pairs(&self) -> impl Iterator, RawValue)> { let mut buf = &self.serialized_values[..]; (0..self.values_num).map(move |_| { - // `unwrap()`s here are safe, as we assume type-safety: if `SerializedValues` exits, + // `unwrap()`s here are safe, as we assume type-safety: if `LegacySerializedValues` exits, // we have a guarantee that the layout of the serialized values is valid. let name = self .contains_names @@ -430,12 +430,12 @@ impl SerializedValues { } #[derive(Clone, Copy)] -pub struct SerializedValuesIterator<'a> { +pub struct LegacySerializedValuesIterator<'a> { serialized_values: &'a [u8], contains_names: bool, } -impl<'a> Iterator for SerializedValuesIterator<'a> { +impl<'a> Iterator for LegacySerializedValuesIterator<'a> { type Item = RawValue<'a>; fn next(&mut self) -> Option { @@ -1030,14 +1030,14 @@ impl_value_for_tuple!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13 // Implement ValueList for the unit type impl ValueList for () { fn serialized(&self) -> SerializedResult<'_> { - Ok(Cow::Owned(SerializedValues::new())) + Ok(Cow::Owned(LegacySerializedValues::new())) } } // Implement ValueList for &[] - u8 because otherwise rust can't infer type impl ValueList for [u8; 0] { fn serialized(&self) -> SerializedResult<'_> { - Ok(Cow::Owned(SerializedValues::new())) + Ok(Cow::Owned(LegacySerializedValues::new())) } } @@ -1045,7 +1045,7 @@ impl ValueList for [u8; 0] { impl ValueList for &[T] { fn serialized(&self) -> SerializedResult<'_> { let size = std::mem::size_of_val(*self); - let mut result = SerializedValues::with_capacity(size); + let mut result = LegacySerializedValues::with_capacity(size); for val in *self { result.add_value(val)?; } @@ -1059,7 +1059,7 @@ impl ValueList for Vec { fn serialized(&self) -> SerializedResult<'_> { let slice = self.as_slice(); let size = std::mem::size_of_val(slice); - let mut result = SerializedValues::with_capacity(size); + let mut result = LegacySerializedValues::with_capacity(size); for val in self { result.add_value(val)?; } @@ -1073,7 +1073,7 @@ macro_rules! impl_value_list_for_btree_map { ($key_type:ty) => { impl ValueList for BTreeMap<$key_type, T> { fn serialized(&self) -> SerializedResult<'_> { - let mut result = SerializedValues::with_capacity(self.len()); + let mut result = LegacySerializedValues::with_capacity(self.len()); for (key, val) in self { result.add_named_value(key, val)?; } @@ -1089,7 +1089,7 @@ macro_rules! impl_value_list_for_hash_map { ($key_type:ty) => { impl ValueList for HashMap<$key_type, T, S> { fn serialized(&self) -> SerializedResult<'_> { - let mut result = SerializedValues::with_capacity(self.len()); + let mut result = LegacySerializedValues::with_capacity(self.len()); for (key, val) in self { result.add_named_value(key, val)?; } @@ -1112,7 +1112,7 @@ impl_value_list_for_btree_map!(&str); impl ValueList for (T0,) { fn serialized(&self) -> SerializedResult<'_> { let size = std::mem::size_of_val(self); - let mut result = SerializedValues::with_capacity(size); + let mut result = LegacySerializedValues::with_capacity(size); result.add_value(&self.0)?; Ok(Cow::Owned(result)) } @@ -1126,7 +1126,7 @@ macro_rules! impl_value_list_for_tuple { { fn serialized(&self) -> SerializedResult<'_> { let size = std::mem::size_of_val(self); - let mut result = SerializedValues::with_capacity(size); + let mut result = LegacySerializedValues::with_capacity(size); $( result.add_value(&self.$FieldI) ?; )* @@ -1165,13 +1165,13 @@ impl ValueList for &T { } } -impl ValueList for SerializedValues { +impl ValueList for LegacySerializedValues { fn serialized(&self) -> SerializedResult<'_> { Ok(Cow::Borrowed(self)) } } -impl<'b> ValueList for Cow<'b, SerializedValues> { +impl<'b> ValueList for Cow<'b, LegacySerializedValues> { fn serialized(&self) -> SerializedResult<'_> { Ok(Cow::Borrowed(self.as_ref())) } @@ -1346,17 +1346,20 @@ impl<'a, T: BatchValues + ?Sized> BatchValues for &'a T { /// Allows reusing already-serialized first value /// -/// We'll need to build a `SerializedValues` for the first ~`ValueList` of a batch to figure out the shard (#448). +/// We'll need to build a `LegacySerializedValues` for the first ~`ValueList` of a batch to figure out the shard (#448). /// Once that is done, we can use that instead of re-serializing. /// /// This struct implements both `BatchValues` and `BatchValuesIterator` for that purpose pub struct BatchValuesFirstSerialized<'f, T> { - first: Option<&'f SerializedValues>, + first: Option<&'f LegacySerializedValues>, rest: T, } impl<'f, T: BatchValues> BatchValuesFirstSerialized<'f, T> { - pub fn new(batch_values: T, already_serialized_first: Option<&'f SerializedValues>) -> Self { + pub fn new( + batch_values: T, + already_serialized_first: Option<&'f LegacySerializedValues>, + ) -> Self { Self { first: already_serialized_first, rest: batch_values, diff --git a/scylla-cql/src/frame/value_tests.rs b/scylla-cql/src/frame/value_tests.rs index 0ded4b4ed0..adcdcdf0b2 100644 --- a/scylla-cql/src/frame/value_tests.rs +++ b/scylla-cql/src/frame/value_tests.rs @@ -5,8 +5,8 @@ use crate::types::serialize::{CellWriter, RowWriter}; use super::response::result::{ColumnSpec, ColumnType, TableSpec}; use super::value::{ - BatchValues, CqlDate, CqlDuration, CqlTime, CqlTimestamp, MaybeUnset, SerializeValuesError, - SerializedValues, Unset, Value, ValueList, ValueTooBig, + BatchValues, CqlDate, CqlDuration, CqlTime, CqlTimestamp, LegacySerializedValues, MaybeUnset, + SerializeValuesError, Unset, Value, ValueList, ValueTooBig, }; use bigdecimal::BigDecimal; use bytes::BufMut; @@ -832,7 +832,7 @@ fn ref_value() { #[test] fn empty_serialized_values() { - const EMPTY: SerializedValues = SerializedValues::new(); + const EMPTY: LegacySerializedValues = LegacySerializedValues::new(); assert_eq!(EMPTY.len(), 0); assert!(EMPTY.is_empty()); assert_eq!(EMPTY.iter().next(), None); @@ -844,7 +844,7 @@ fn empty_serialized_values() { #[test] fn serialized_values() { - let mut values = SerializedValues::new(); + let mut values = LegacySerializedValues::new(); assert!(values.is_empty()); // Add first value @@ -920,14 +920,14 @@ fn serialized_values() { #[test] fn unit_value_list() { - let serialized_unit: SerializedValues = + let serialized_unit: LegacySerializedValues = <() as ValueList>::serialized(&()).unwrap().into_owned(); assert!(serialized_unit.is_empty()); } #[test] fn empty_array_value_list() { - let serialized_arr: SerializedValues = <[u8; 0] as ValueList>::serialized(&[]) + let serialized_arr: LegacySerializedValues = <[u8; 0] as ValueList>::serialized(&[]) .unwrap() .into_owned(); assert!(serialized_arr.is_empty()); @@ -987,7 +987,7 @@ fn col_spec(name: &str, typ: ColumnType) -> ColumnSpec { fn serialize_values( vl: T, columns: &[ColumnSpec], -) -> SerializedValues { +) -> LegacySerializedValues { let serialized = ::serialized(&vl).unwrap().into_owned(); let mut old_serialized = Vec::new(); serialized.write_to_request(&mut old_serialized); @@ -1158,11 +1158,11 @@ fn ref_value_list() { #[test] fn serialized_values_value_list() { - let mut ser_values = SerializedValues::new(); + let mut ser_values = LegacySerializedValues::new(); ser_values.add_value(&1_i32).unwrap(); ser_values.add_value(&"qwertyuiop").unwrap(); - let ser_ser_values: Cow = ser_values.serialized().unwrap(); + let ser_ser_values: Cow = ser_values.serialized().unwrap(); assert!(matches!(ser_ser_values, Cow::Borrowed(_))); assert_eq!(&ser_values, ser_ser_values.as_ref()); @@ -1170,9 +1170,9 @@ fn serialized_values_value_list() { #[test] fn cow_serialized_values_value_list() { - let cow_ser_values: Cow = Cow::Owned(SerializedValues::new()); + let cow_ser_values: Cow = Cow::Owned(LegacySerializedValues::new()); - let serialized: Cow = cow_ser_values.serialized().unwrap(); + let serialized: Cow = cow_ser_values.serialized().unwrap(); assert!(matches!(serialized, Cow::Borrowed(_))); assert_eq!(cow_ser_values.as_ref(), serialized.as_ref()); diff --git a/scylla-cql/src/lib.rs b/scylla-cql/src/lib.rs index 6d74b680ba..83b6f3751e 100644 --- a/scylla-cql/src/lib.rs +++ b/scylla-cql/src/lib.rs @@ -17,7 +17,7 @@ pub mod _macro_internal { }; pub use crate::frame::response::result::{CqlValue, Row}; pub use crate::frame::value::{ - SerializedResult, SerializedValues, Value, ValueList, ValueTooBig, + LegacySerializedValues, SerializedResult, Value, ValueList, ValueTooBig, }; pub use crate::macros::*; diff --git a/scylla-cql/src/types/serialize/row.rs b/scylla-cql/src/types/serialize/row.rs index 213af49c0f..9ad7d0e56c 100644 --- a/scylla-cql/src/types/serialize/row.rs +++ b/scylla-cql/src/types/serialize/row.rs @@ -4,13 +4,19 @@ use std::fmt::Display; use std::hash::BuildHasher; use std::{collections::HashMap, sync::Arc}; +use bytes::BufMut; use thiserror::Error; -use crate::frame::value::{SerializedValues, ValueList}; +use crate::frame::frame_errors::ParseError; +use crate::frame::response::result::ColumnType; +use crate::frame::response::result::PreparedMetadata; +use crate::frame::types; +use crate::frame::value::SerializeValuesError; +use crate::frame::value::{LegacySerializedValues, ValueList}; use crate::frame::{response::result::ColumnSpec, types::RawValue}; use super::value::SerializeCql; -use super::{RowWriter, SerializationError}; +use super::{CellWriter, RowWriter, SerializationError}; /// Contains information needed to serialize a row. pub struct RowSerializationContext<'a> { @@ -18,6 +24,13 @@ pub struct RowSerializationContext<'a> { } impl<'a> RowSerializationContext<'a> { + #[inline] + pub fn from_prepared(prepared: &'a PreparedMetadata) -> Self { + Self { + columns: prepared.col_specs.as_slice(), + } + } + /// Returns column/bind marker specifications for given query. #[inline] pub fn columns(&self) -> &'a [ColumnSpec] { @@ -54,7 +67,7 @@ macro_rules! fallback_impl_contents { } #[inline] fn is_empty(&self) -> bool { - SerializedValues::is_empty(self) + LegacySerializedValues::is_empty(self) } }; } @@ -225,11 +238,11 @@ impl SerializeRow for &T { } } -impl SerializeRow for SerializedValues { +impl SerializeRow for LegacySerializedValues { fallback_impl_contents!(); } -impl<'b> SerializeRow for Cow<'b, SerializedValues> { +impl<'b> SerializeRow for Cow<'b, LegacySerializedValues> { fallback_impl_contents!(); } @@ -329,7 +342,7 @@ impl_tuples!( /// /// ```rust /// # use std::borrow::Cow; -/// # use scylla_cql::frame::value::{Value, ValueList, SerializedResult, SerializedValues}; +/// # use scylla_cql::frame::value::{Value, ValueList, SerializedResult, LegacySerializedValues}; /// # use scylla_cql::impl_serialize_row_via_value_list; /// struct NoGenerics {} /// impl ValueList for NoGenerics { @@ -344,7 +357,7 @@ impl_tuples!( /// struct WithGenerics(T, U); /// impl ValueList for WithGenerics { /// fn serialized(&self) -> SerializedResult<'_> { -/// let mut values = SerializedValues::new(); +/// let mut values = LegacySerializedValues::new(); /// values.add_value(&self.0); /// values.add_value(&self.1.clone()); /// Ok(Cow::Owned(values)) @@ -565,10 +578,159 @@ pub enum ValueListToSerializeRowAdapterError { NoBindMarkerWithName { name: String }, } +/// A buffer containing already serialized values. +/// +/// It is not aware of the types of contained values, +/// it is basically a byte buffer in the format expected by the CQL protocol. +/// Usually there is no need for a user of a driver to use this struct, it is mostly internal. +/// The exception are APIs like `ClusterData::compute_token` / `ClusterData::get_endpoints`. +/// Allows adding new values to the buffer and iterating over the content. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct SerializedValues { + serialized_values: Vec, + element_count: u16, +} + +impl SerializedValues { + pub const fn new() -> Self { + SerializedValues { + serialized_values: Vec::new(), + element_count: 0, + } + } + + /// A const empty instance, useful for taking references + pub const EMPTY: &'static SerializedValues = &SerializedValues::new(); + + pub fn from_serializable( + ctx: &RowSerializationContext, + row: &T, + ) -> Result { + let mut data = Vec::new(); + let element_count = { + let mut writer = RowWriter::new(&mut data); + row.serialize(ctx, &mut writer)?; + match writer.value_count().try_into() { + Ok(n) => n, + Err(_) => { + return Err(SerializationError(Arc::new( + SerializeValuesError::TooManyValues, + ))) + } + } + }; + + Ok(SerializedValues { + serialized_values: data, + element_count, + }) + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.element_count() == 0 + } + + #[inline] + pub fn iter(&self) -> impl Iterator { + SerializedValuesIterator { + serialized_values: &self.serialized_values, + } + } + + #[inline] + pub fn element_count(&self) -> u16 { + // We initialize first two bytes in new() and BufBackedRowWriter does too, + // so this unwrap is safe + self.element_count + } + + #[inline] + pub fn buffer_size(&self) -> usize { + self.serialized_values.len() + } + + pub(crate) fn write_to_request(&self, buf: &mut impl BufMut) { + buf.put_u16(self.element_count); + buf.put(self.serialized_values.as_slice()) + } + + /// Serializes value and appends it to the list + pub fn add_value( + &mut self, + val: &T, + typ: &ColumnType, + ) -> Result<(), SerializationError> { + if self.element_count() == u16::MAX { + return Err(SerializationError(Arc::new( + SerializeValuesError::TooManyValues, + ))); + } + + let len_before_serialize: usize = self.serialized_values.len(); + + let writer = CellWriter::new(&mut self.serialized_values); + if let Err(e) = val.serialize(typ, writer) { + self.serialized_values.resize(len_before_serialize, 0); + Err(e) + } else { + self.element_count += 1; + Ok(()) + } + } + + /// Creates value list from the request frame + pub(crate) fn new_from_frame(buf: &mut &[u8]) -> Result { + let values_num = types::read_short(buf)?; + let values_beg = *buf; + for _ in 0..values_num { + let _serialized = types::read_value(buf)?; + } + + let values_len_in_buf = values_beg.len() - buf.len(); + let values_in_frame = &values_beg[0..values_len_in_buf]; + Ok(SerializedValues { + serialized_values: values_in_frame.to_vec(), + element_count: values_num, + }) + } + + // Temporary function, to be removed when we implement new batching API (right now it is needed in frame::request::mod.rs tests) + pub fn to_old_serialized_values(&self) -> LegacySerializedValues { + let mut frame = Vec::new(); + self.write_to_request(&mut frame); + LegacySerializedValues::new_from_frame(&mut frame.as_slice(), false).unwrap() + } +} + +impl Default for SerializedValues { + fn default() -> Self { + Self::new() + } +} + +#[derive(Clone, Copy)] +pub struct SerializedValuesIterator<'a> { + serialized_values: &'a [u8], +} + +impl<'a> Iterator for SerializedValuesIterator<'a> { + type Item = RawValue<'a>; + + fn next(&mut self) -> Option { + if self.serialized_values.is_empty() { + return None; + } + + Some(types::read_value(&mut self.serialized_values).expect("badly encoded value")) + } +} + #[cfg(test)] mod tests { use crate::frame::response::result::{ColumnSpec, ColumnType, TableSpec}; - use crate::frame::value::{MaybeUnset, SerializedValues, ValueList}; + use crate::frame::types::RawValue; + use crate::frame::value::{LegacySerializedValues, MaybeUnset, ValueList}; use crate::types::serialize::RowWriter; use super::{ @@ -576,6 +738,7 @@ mod tests { BuiltinTypeCheckErrorKind, RowSerializationContext, SerializeCql, SerializeRow, }; + use super::SerializedValues; use scylla_macros::SerializeRow; fn col_spec(name: &str, typ: ColumnType) -> ColumnSpec { @@ -630,7 +793,7 @@ mod tests { let mut sorted_row_data = Vec::new(); <_ as ValueList>::write_to_request(&sorted_row, &mut sorted_row_data).unwrap(); - let mut unsorted_row = SerializedValues::new(); + let mut unsorted_row = LegacySerializedValues::new(); unsorted_row.add_named_value("a", &1i32).unwrap(); unsorted_row.add_named_value("b", &"Ala ma kota").unwrap(); unsorted_row @@ -948,4 +1111,67 @@ mod tests { BuiltinSerializationErrorKind::ColumnSerializationFailed { .. } )); } + + #[test] + fn test_empty_serialized_values() { + let values = SerializedValues::new(); + assert!(values.is_empty()); + assert_eq!(values.element_count(), 0); + assert_eq!(values.buffer_size(), 0); + assert_eq!(values.iter().count(), 0); + } + + #[test] + fn test_serialized_values_content() { + let mut values = SerializedValues::new(); + values.add_value(&1234i32, &ColumnType::Int).unwrap(); + values.add_value(&"abcdefg", &ColumnType::Ascii).unwrap(); + let mut buf = Vec::new(); + values.write_to_request(&mut buf); + assert_eq!( + buf, + [ + 0, 2, // element count + 0, 0, 0, 4, // size of int + 0, 0, 4, 210, // content of int (1234) + 0, 0, 0, 7, // size of string + 97, 98, 99, 100, 101, 102, 103, // content of string ('abcdefg') + ] + ) + } + + #[test] + fn test_serialized_values_iter() { + let mut values = SerializedValues::new(); + values.add_value(&1234i32, &ColumnType::Int).unwrap(); + values.add_value(&"abcdefg", &ColumnType::Ascii).unwrap(); + + let mut iter = values.iter(); + assert_eq!(iter.next(), Some(RawValue::Value(&[0, 0, 4, 210]))); + assert_eq!( + iter.next(), + Some(RawValue::Value(&[97, 98, 99, 100, 101, 102, 103])) + ); + assert_eq!(iter.next(), None); + } + + #[test] + fn test_serialized_values_max_capacity() { + let mut values = SerializedValues::new(); + for _ in 0..65535 { + values + .add_value(&123456789i64, &ColumnType::BigInt) + .unwrap(); + } + + // Adding this value should fail, we reached max capacity + values + .add_value(&123456789i64, &ColumnType::BigInt) + .unwrap_err(); + + assert_eq!(values.iter().count(), 65535); + assert!(values + .iter() + .all(|v| v == RawValue::Value(&[0, 0, 0, 0, 0x07, 0x5b, 0xcd, 0x15]))) + } } diff --git a/scylla-macros/src/value_list.rs b/scylla-macros/src/value_list.rs index bf6fc38e9d..bc9de23c8a 100644 --- a/scylla-macros/src/value_list.rs +++ b/scylla-macros/src/value_list.rs @@ -17,7 +17,7 @@ pub fn value_list_derive(tokens_input: TokenStream) -> Result #path::SerializedResult { - let mut result = #path::SerializedValues::with_capacity(#values_len); + let mut result = #path::LegacySerializedValues::with_capacity(#values_len); #( result.add_value(&self.#field_name)?; )* diff --git a/scylla/benches/benchmark.rs b/scylla/benches/benchmark.rs index b33b08a21b..20440ea0b7 100644 --- a/scylla/benches/benchmark.rs +++ b/scylla/benches/benchmark.rs @@ -3,9 +3,9 @@ use criterion::{criterion_group, criterion_main, Criterion}; use bytes::BytesMut; use scylla::{ frame::types, - frame::value::ValueList, transport::partitioner::{calculate_token_for_partition_key, Murmur3Partitioner}, }; +use scylla_cql::{frame::response::result::ColumnType, types::serialize::row::SerializedValues}; fn types_benchmark(c: &mut Criterion) { let mut buf = BytesMut::with_capacity(64); @@ -40,23 +40,49 @@ fn types_benchmark(c: &mut Criterion) { } fn calculate_token_bench(c: &mut Criterion) { - let simple_pk = ("I'm prepared!!!",); - let serialized_simple_pk = simple_pk.serialized().unwrap().into_owned(); - let simple_pk_long_column = ( - 17_i32, - 16_i32, - String::from_iter(std::iter::repeat('.').take(2000)), - ); - let serialized_simple_pk_long_column = simple_pk_long_column.serialized().unwrap().into_owned(); + let mut serialized_simple_pk = SerializedValues::new(); + serialized_simple_pk + .add_value(&"I'm prepared!!!", &ColumnType::Text) + .unwrap(); - let complex_pk = (17_i32, 16_i32, "I'm prepared!!!"); - let serialized_complex_pk = complex_pk.serialized().unwrap().into_owned(); - let complex_pk_long_column = ( - 17_i32, - 16_i32, - String::from_iter(std::iter::repeat('.').take(2000)), - ); - let serialized_values_long_column = complex_pk_long_column.serialized().unwrap().into_owned(); + let mut serialized_simple_pk_long_column = SerializedValues::new(); + serialized_simple_pk_long_column + .add_value(&17_i32, &ColumnType::Int) + .unwrap(); + serialized_simple_pk_long_column + .add_value(&16_i32, &ColumnType::Int) + .unwrap(); + serialized_simple_pk_long_column + .add_value( + &String::from_iter(std::iter::repeat('.').take(2000)), + &ColumnType::Text, + ) + .unwrap(); + + let mut serialized_complex_pk = SerializedValues::new(); + serialized_complex_pk + .add_value(&17_i32, &ColumnType::Int) + .unwrap(); + serialized_complex_pk + .add_value(&16_i32, &ColumnType::Int) + .unwrap(); + serialized_complex_pk + .add_value(&"I'm prepared!!!", &ColumnType::Text) + .unwrap(); + + let mut serialized_values_long_column = SerializedValues::new(); + serialized_values_long_column + .add_value(&17_i32, &ColumnType::Int) + .unwrap(); + serialized_values_long_column + .add_value(&16_i32, &ColumnType::Int) + .unwrap(); + serialized_values_long_column + .add_value( + &String::from_iter(std::iter::repeat('.').take(2000)), + &ColumnType::Text, + ) + .unwrap(); c.bench_function("calculate_token_from_partition_key simple pk", |b| { b.iter(|| calculate_token_for_partition_key(&serialized_simple_pk, &Murmur3Partitioner)) diff --git a/scylla/src/lib.rs b/scylla/src/lib.rs index 27a3c57471..5bf9bc69e8 100644 --- a/scylla/src/lib.rs +++ b/scylla/src/lib.rs @@ -100,6 +100,7 @@ pub mod _macro_internal { pub use scylla_cql::frame; pub use scylla_cql::macros::{self, *}; +pub use scylla_cql::types::serialize; pub mod authentication; #[cfg(feature = "cloud")] diff --git a/scylla/src/statement/prepared_statement.rs b/scylla/src/statement/prepared_statement.rs index 8abdf6bd91..f61fab901e 100644 --- a/scylla/src/statement/prepared_statement.rs +++ b/scylla/src/statement/prepared_statement.rs @@ -1,6 +1,8 @@ use bytes::{Bytes, BytesMut}; use scylla_cql::errors::{BadQuery, QueryError}; use scylla_cql::frame::types::RawValue; +use scylla_cql::types::serialize::row::{RowSerializationContext, SerializeRow, SerializedValues}; +use scylla_cql::types::serialize::SerializationError; use smallvec::{smallvec, SmallVec}; use std::convert::TryInto; use std::sync::Arc; @@ -13,7 +15,6 @@ use scylla_cql::frame::response::result::ColumnSpec; use super::StatementConfig; use crate::frame::response::result::PreparedMetadata; use crate::frame::types::{Consistency, SerialConsistency}; -use crate::frame::value::SerializedValues; use crate::history::HistoryListener; use crate::retry_policy::RetryPolicy; use crate::routing::Token; @@ -134,9 +135,10 @@ impl PreparedStatement { /// [Self::calculate_token()]. pub fn compute_partition_key( &self, - bound_values: &SerializedValues, + bound_values: &impl SerializeRow, ) -> Result { - let partition_key = self.extract_partition_key(bound_values)?; + let serialized = self.serialize_values(bound_values)?; + let partition_key = self.extract_partition_key(&serialized)?; let mut buf = BytesMut::new(); let mut writer = |chunk: &[u8]| buf.extend_from_slice(chunk); @@ -182,19 +184,19 @@ impl PreparedStatement { Ok(Some((partition_key, token))) } - /// Calculates the token for given prepared statement and serialized values. + /// Calculates the token for given prepared statement and values. /// /// Returns the token that would be computed for executing the provided /// prepared statement with the provided values. // As this function creates a `PartitionKey`, it is intended rather for external usage (by users). // For internal purposes, `PartitionKey::calculate_token()` is preferred, as `PartitionKey` // is either way used internally, among others for display in traces. - pub fn calculate_token( - &self, - serialized_values: &SerializedValues, - ) -> Result, QueryError> { - self.extract_partition_key_and_calculate_token(&self.partitioner_name, serialized_values) - .map(|opt| opt.map(|(_pk, token)| token)) + pub fn calculate_token(&self, values: &impl SerializeRow) -> Result, QueryError> { + self.extract_partition_key_and_calculate_token( + &self.partitioner_name, + &self.serialize_values(values)?, + ) + .map(|opt| opt.map(|(_pk, token)| token)) } /// Returns the name of the keyspace this statement is operating on. @@ -335,6 +337,14 @@ impl PreparedStatement { pub fn get_execution_profile_handle(&self) -> Option<&ExecutionProfileHandle> { self.config.execution_profile_handle.as_ref() } + + pub(crate) fn serialize_values( + &self, + values: &impl SerializeRow, + ) -> Result { + let ctx = RowSerializationContext::from_prepared(self.get_prepared_metadata()); + SerializedValues::from_serializable(&ctx, values) + } } #[derive(Clone, Debug, Error, PartialEq, Eq, PartialOrd, Ord)] @@ -349,12 +359,14 @@ pub enum TokenCalculationError { ValueTooLong(usize), } -#[derive(Clone, Debug, Error, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Debug, Error)] pub enum PartitionKeyError { #[error(transparent)] PartitionKeyExtraction(PartitionKeyExtractionError), #[error(transparent)] TokenCalculation(TokenCalculationError), + #[error(transparent)] + Serialization(SerializationError), } impl From for PartitionKeyError { @@ -369,6 +381,12 @@ impl From for PartitionKeyError { } } +impl From for PartitionKeyError { + fn from(err: SerializationError) -> Self { + Self::Serialization(err) + } +} + pub(crate) type PartitionKeyValue<'ps> = (&'ps [u8], &'ps ColumnSpec); pub(crate) struct PartitionKey<'ps> { @@ -397,7 +415,10 @@ impl<'ps> PartitionKey<'ps> { let next_val = values_iter .nth((pk_index.index - values_iter_offset) as usize) .ok_or_else(|| { - PartitionKeyExtractionError::NoPkIndexValue(pk_index.index, bound_values.len()) + PartitionKeyExtractionError::NoPkIndexValue( + pk_index.index, + bound_values.element_count(), + ) })?; // Add it in sequence order to pk_values if let RawValue::Value(v) = next_val { @@ -456,11 +477,11 @@ impl<'ps> PartitionKey<'ps> { #[cfg(test)] mod tests { - use scylla_cql::frame::{ - response::result::{ + use scylla_cql::{ + frame::response::result::{ ColumnSpec, ColumnType, PartitionKeyIndex, PreparedMetadata, TableSpec, }, - value::SerializedValues, + types::serialize::row::SerializedValues, }; use crate::prepared_statement::PartitionKey; @@ -512,11 +533,13 @@ mod tests { [4, 0, 3], ); let mut values = SerializedValues::new(); - values.add_value(&67i8).unwrap(); - values.add_value(&42i16).unwrap(); - values.add_value(&23i32).unwrap(); - values.add_value(&89i64).unwrap(); - values.add_value(&[1u8, 2, 3, 4, 5]).unwrap(); + values.add_value(&67i8, &ColumnType::TinyInt).unwrap(); + values.add_value(&42i16, &ColumnType::SmallInt).unwrap(); + values.add_value(&23i32, &ColumnType::Int).unwrap(); + values.add_value(&89i64, &ColumnType::BigInt).unwrap(); + values + .add_value(&[1u8, 2, 3, 4, 5], &ColumnType::Blob) + .unwrap(); let pk = PartitionKey::new(&meta, &values).unwrap(); let pk_cols = Vec::from_iter(pk.iter()); diff --git a/scylla/src/transport/caching_session.rs b/scylla/src/transport/caching_session.rs index 3d3dfa0e17..14546b93e4 100644 --- a/scylla/src/transport/caching_session.rs +++ b/scylla/src/transport/caching_session.rs @@ -1,5 +1,5 @@ use crate::batch::{Batch, BatchStatement}; -use crate::frame::value::{BatchValues, ValueList}; +use crate::frame::value::BatchValues; use crate::prepared_statement::PreparedStatement; use crate::query::Query; use crate::transport::errors::QueryError; @@ -10,6 +10,7 @@ use bytes::Bytes; use dashmap::DashMap; use futures::future::try_join_all; use scylla_cql::frame::response::result::PreparedMetadata; +use scylla_cql::types::serialize::row::SerializeRow; use std::collections::hash_map::RandomState; use std::hash::BuildHasher; @@ -70,38 +71,35 @@ where pub async fn execute( &self, query: impl Into, - values: impl ValueList, + values: impl SerializeRow, ) -> Result { let query = query.into(); let prepared = self.add_prepared_statement_owned(query).await?; - let values = values.serialized()?; - self.session.execute(&prepared, values.clone()).await + self.session.execute(&prepared, values).await } /// Does the same thing as [`Session::execute_iter`] but uses the prepared statement cache pub async fn execute_iter( &self, query: impl Into, - values: impl ValueList, + values: impl SerializeRow, ) -> Result { let query = query.into(); let prepared = self.add_prepared_statement_owned(query).await?; - let values = values.serialized()?; - self.session.execute_iter(prepared, values.clone()).await + self.session.execute_iter(prepared, values).await } /// Does the same thing as [`Session::execute_paged`] but uses the prepared statement cache pub async fn execute_paged( &self, query: impl Into, - values: impl ValueList, + values: impl SerializeRow, paging_state: Option, ) -> Result { let query = query.into(); let prepared = self.add_prepared_statement_owned(query).await?; - let values = values.serialized()?; self.session - .execute_paged(&prepared, values.clone(), paging_state.clone()) + .execute_paged(&prepared, values, paging_state.clone()) .await } diff --git a/scylla/src/transport/cluster.rs b/scylla/src/transport/cluster.rs index 503d14519d..0098391854 100644 --- a/scylla/src/transport/cluster.rs +++ b/scylla/src/transport/cluster.rs @@ -1,6 +1,5 @@ /// Cluster manages up to date information and connections to database nodes use crate::frame::response::event::{Event, StatusChangeEvent}; -use crate::frame::value::ValueList; use crate::prepared_statement::TokenCalculationError; use crate::routing::Token; use crate::transport::host_filter::HostFilter; @@ -18,6 +17,7 @@ use futures::future::join_all; use futures::{future::RemoteHandle, FutureExt}; use itertools::Itertools; use scylla_cql::errors::{BadQuery, NewSessionError}; +use scylla_cql::types::serialize::row::SerializedValues; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -390,7 +390,7 @@ impl ClusterData { &self, keyspace: &str, table: &str, - partition_key: impl ValueList, + partition_key: &SerializedValues, ) -> Result { let partitioner = self .keyspaces @@ -400,12 +400,11 @@ impl ClusterData { .and_then(PartitionerName::from_str) .unwrap_or_default(); - calculate_token_for_partition_key(&partition_key.serialized().unwrap(), &partitioner) - .map_err(|err| match err { - TokenCalculationError::ValueTooLong(values_len) => { - BadQuery::ValuesTooLongForKey(values_len, u16::MAX.into()) - } - }) + calculate_token_for_partition_key(partition_key, &partitioner).map_err(|err| match err { + TokenCalculationError::ValueTooLong(values_len) => { + BadQuery::ValuesTooLongForKey(values_len, u16::MAX.into()) + } + }) } /// Access to replicas owning a given token @@ -436,7 +435,7 @@ impl ClusterData { &self, keyspace: &str, table: &str, - partition_key: impl ValueList, + partition_key: &SerializedValues, ) -> Result>, BadQuery> { Ok(self.get_token_endpoints( keyspace, diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 2ecd37b290..3faa6a5f0a 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -4,6 +4,7 @@ use scylla_cql::errors::TranslationError; use scylla_cql::frame::request::options::Options; use scylla_cql::frame::response::Error; use scylla_cql::frame::types::SerialConsistency; +use scylla_cql::types::serialize::row::SerializedValues; use socket2::{SockRef, TcpKeepalive}; use tokio::io::{split, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::{TcpSocket, TcpStream}; @@ -52,7 +53,7 @@ use crate::frame::{ request::{self, batch, execute, query, register, SerializableRequest}, response::{event::Event, result, NonErrorResponse, Response, ResponseOpcode}, server_event_type::EventType, - value::{BatchValues, BatchValuesIterator, ValueList}, + value::{BatchValues, BatchValuesIterator}, FrameParams, SerializedRequest, }; use crate::query::Query; @@ -596,7 +597,6 @@ impl Connection { pub(crate) async fn query_single_page( &self, query: impl Into, - values: impl ValueList, ) -> Result { let query: Query = query.into(); @@ -606,24 +606,18 @@ impl Connection { .determine_consistency(self.config.default_consistency); let serial_consistency = query.config.serial_consistency; - self.query_single_page_with_consistency( - query, - &values, - consistency, - serial_consistency.flatten(), - ) - .await + self.query_single_page_with_consistency(query, consistency, serial_consistency.flatten()) + .await } pub(crate) async fn query_single_page_with_consistency( &self, query: impl Into, - values: impl ValueList, consistency: Consistency, serial_consistency: Option, ) -> Result { let query: Query = query.into(); - self.query_with_consistency(&query, &values, consistency, serial_consistency, None) + self.query_with_consistency(&query, consistency, serial_consistency, None) .await? .into_query_result() } @@ -631,13 +625,11 @@ impl Connection { pub(crate) async fn query( &self, query: &Query, - values: impl ValueList, paging_state: Option, ) -> Result { // This method is used only for driver internal queries, so no need to consult execution profile here. self.query_with_consistency( query, - values, query .config .determine_consistency(self.config.default_consistency), @@ -650,33 +642,16 @@ impl Connection { pub(crate) async fn query_with_consistency( &self, query: &Query, - values: impl ValueList, consistency: Consistency, serial_consistency: Option, paging_state: Option, ) -> Result { - let serialized_values = values.serialized()?; - - let values_size = serialized_values.size(); - if values_size != 0 { - let prepared = self.prepare(query).await?; - return self - .execute_with_consistency( - &prepared, - values, - consistency, - serial_consistency, - paging_state, - ) - .await; - } - let query_frame = query::Query { contents: Cow::Borrowed(&query.contents), parameters: query::QueryParameters { consistency, serial_consistency, - values: serialized_values, + values: Cow::Borrowed(SerializedValues::EMPTY), page_size: query.get_page_size(), paging_state, timestamp: query.get_timestamp(), @@ -687,22 +662,40 @@ impl Connection { .await } + #[allow(dead_code)] + pub(crate) async fn execute( + &self, + prepared: PreparedStatement, + values: SerializedValues, + paging_state: Option, + ) -> Result { + // This method is used only for driver internal queries, so no need to consult execution profile here. + self.execute_with_consistency( + &prepared, + &values, + prepared + .config + .determine_consistency(self.config.default_consistency), + prepared.config.serial_consistency.flatten(), + paging_state, + ) + .await + } + pub(crate) async fn execute_with_consistency( &self, prepared_statement: &PreparedStatement, - values: impl ValueList, + values: &SerializedValues, consistency: Consistency, serial_consistency: Option, paging_state: Option, ) -> Result { - let serialized_values = values.serialized()?; - let execute_frame = execute::Execute { id: prepared_statement.get_id().to_owned(), parameters: query::QueryParameters { consistency, serial_consistency, - values: serialized_values, + values: Cow::Borrowed(values), page_size: prepared_statement.get_page_size(), timestamp: prepared_statement.get_timestamp(), paging_state, @@ -734,19 +727,32 @@ impl Connection { pub(crate) async fn query_iter( self: Arc, query: Query, - values: impl ValueList, ) -> Result { - let serialized_values = values.serialized()?.into_owned(); - let consistency = query .config .determine_consistency(self.config.default_consistency); let serial_consistency = query.config.serial_consistency.flatten(); - RowIterator::new_for_connection_query_iter( - query, + RowIterator::new_for_connection_query_iter(query, self, consistency, serial_consistency) + .await + } + + /// Executes a prepared statements and fetches its results over multiple pages, using + /// the asynchronous iterator interface. + pub(crate) async fn execute_iter( + self: Arc, + prepared_statement: PreparedStatement, + values: SerializedValues, + ) -> Result { + let consistency = prepared_statement + .config + .determine_consistency(self.config.default_consistency); + let serial_consistency = prepared_statement.config.serial_consistency.flatten(); + + RowIterator::new_for_connection_execute_iter( + prepared_statement, + values, self, - serialized_values, consistency, serial_consistency, ) @@ -885,7 +891,7 @@ impl Connection { false => format!("USE {}", keyspace_name.as_str()).into(), }; - let query_response = self.query(&query, (), None).await?; + let query_response = self.query(&query, None).await?; match query_response.response { Response::Result(result::Result::SetKeyspace(set_keyspace)) => { @@ -929,7 +935,7 @@ impl Connection { pub(crate) async fn fetch_schema_version(&self) -> Result { let (version_id,): (Uuid,) = self - .query_single_page(LOCAL_VERSION, &[]) + .query_single_page(LOCAL_VERSION) .await? .rows .ok_or(QueryError::ProtocolError("Version query returned not rows"))? @@ -1833,7 +1839,6 @@ mod tests { use super::ConnectionConfig; use crate::query::Query; use crate::transport::connection::open_connection; - use crate::transport::connection::QueryResponse; use crate::transport::node::ResolvedContactPoint; use crate::transport::topology::UntranslatedEndpoint; use crate::utils::test_utils::unique_keyspace_name; @@ -1914,7 +1919,7 @@ mod tests { let select_query = Query::new("SELECT p FROM connection_query_iter_tab").with_page_size(7); let empty_res = connection .clone() - .query_iter(select_query.clone(), &[]) + .query_iter(select_query.clone()) .await .unwrap() .try_collect::>() @@ -1927,15 +1932,19 @@ mod tests { let mut insert_futures = Vec::new(); let insert_query = Query::new("INSERT INTO connection_query_iter_tab (p) VALUES (?)").with_page_size(7); + let prepared = connection.prepare(&insert_query).await.unwrap(); for v in &values { - insert_futures.push(connection.query_single_page(insert_query.clone(), (v,))); + let prepared_clone = prepared.clone(); + let values = prepared_clone.serialize_values(&(*v,)).unwrap(); + let fut = async { connection.execute(prepared_clone, values, None).await }; + insert_futures.push(fut); } futures::future::try_join_all(insert_futures).await.unwrap(); let mut results: Vec = connection .clone() - .query_iter(select_query.clone(), &[]) + .query_iter(select_query.clone()) .await .unwrap() .into_typed::<(i32,)>() @@ -1947,7 +1956,9 @@ mod tests { // 3. INSERT query_iter should work and not return any rows. let insert_res1 = connection - .query_iter(insert_query, (0,)) + .query_iter(Query::new( + "INSERT INTO connection_query_iter_tab (p) VALUES (0)", + )) .await .unwrap() .try_collect::>() @@ -2007,10 +2018,7 @@ mod tests { .await .unwrap(); - connection - .query(&"TRUNCATE t".into(), (), None) - .await - .unwrap(); + connection.query(&"TRUNCATE t".into(), None).await.unwrap(); let mut futs = Vec::new(); @@ -2025,10 +2033,12 @@ mod tests { let q = Query::new("INSERT INTO t (p, v) VALUES (?, ?)"); let conn = conn.clone(); async move { - let response: QueryResponse = conn - .query(&q, (j, vec![j as u8; j as usize]), None) - .await + let prepared = conn.prepare(&q).await.unwrap(); + let values = prepared + .serialize_values(&(j, vec![j as u8; j as usize])) .unwrap(); + let response = + conn.execute(prepared.clone(), values, None).await.unwrap(); // QueryResponse might contain an error - make sure that there were no errors let _nonerror_response = response.into_non_error_query_response().unwrap(); @@ -2045,7 +2055,7 @@ mod tests { // Check that everything was written properly let range_end = arithmetic_sequence_sum(NUM_BATCHES); let mut results = connection - .query(&"SELECT p, v FROM t".into(), (), None) + .query(&"SELECT p, v FROM t".into(), None) .await .unwrap() .into_query_result() @@ -2198,7 +2208,7 @@ mod tests { // As everything is normal, these queries should succeed. for _ in 0..3 { tokio::time::sleep(Duration::from_millis(500)).await; - conn.query_single_page("SELECT host_id FROM system.local", ()) + conn.query_single_page("SELECT host_id FROM system.local") .await .unwrap(); } @@ -2218,7 +2228,7 @@ mod tests { // As the router is invalidated, all further queries should immediately // return error. - conn.query_single_page("SELECT host_id FROM system.local", ()) + conn.query_single_page("SELECT host_id FROM system.local") .await .unwrap_err(); diff --git a/scylla/src/transport/cql_collections_test.rs b/scylla/src/transport/cql_collections_test.rs index cd89443271..a3e8c76089 100644 --- a/scylla/src/transport/cql_collections_test.rs +++ b/scylla/src/transport/cql_collections_test.rs @@ -1,8 +1,8 @@ use crate::cql_to_rust::FromCqlVal; -use crate::frame::value::Value; use crate::test_utils::create_new_session_builder; use crate::utils::test_utils::unique_keyspace_name; use crate::{frame::response::result::CqlValue, IntoTypedRows, Session}; +use scylla_cql::types::serialize::value::SerializeCql; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; async fn connect() -> Session { @@ -33,7 +33,7 @@ async fn insert_and_select( to_insert: &InsertT, expected: &SelectT, ) where - InsertT: Value, + InsertT: SerializeCql, SelectT: FromCqlVal> + PartialEq + std::fmt::Debug, { session diff --git a/scylla/src/transport/cql_types_test.rs b/scylla/src/transport/cql_types_test.rs index ced5075918..6c05fc90f2 100644 --- a/scylla/src/transport/cql_types_test.rs +++ b/scylla/src/transport/cql_types_test.rs @@ -1,14 +1,16 @@ use crate as scylla; use crate::cql_to_rust::FromCqlVal; use crate::frame::response::result::CqlValue; -use crate::frame::value::{Counter, CqlDate, CqlTime, CqlTimestamp, Value}; -use crate::macros::{FromUserType, IntoUserType}; +use crate::frame::value::{Counter, CqlDate, CqlTime, CqlTimestamp}; +use crate::macros::FromUserType; use crate::test_utils::create_new_session_builder; use crate::transport::session::IntoTypedRows; use crate::transport::session::Session; use crate::utils::test_utils::unique_keyspace_name; use bigdecimal::BigDecimal; use num_bigint::BigInt; +use scylla_cql::types::serialize::value::SerializeCql; +use scylla_macros::SerializeCql; use std::cmp::PartialEq; use std::fmt::Debug; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; @@ -64,7 +66,7 @@ async fn init_test(table_name: &str, type_name: &str) -> Session { // Expected values and bound values are computed using T::from_str async fn run_tests(tests: &[&str], type_name: &str) where - T: Value + FromCqlVal + FromStr + Debug + Clone + PartialEq, + T: SerializeCql + FromCqlVal + FromStr + Debug + Clone + PartialEq, { let session: Session = init_test(type_name, type_name).await; session.await_schema_agreement().await.unwrap(); @@ -1361,7 +1363,8 @@ async fn test_udt_after_schema_update() { .await .unwrap(); - #[derive(IntoUserType, FromUserType, Debug, PartialEq)] + #[derive(SerializeCql, FromUserType, Debug, PartialEq)] + #[scylla(crate = crate)] struct UdtV1 { pub first: i32, pub second: bool, diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index e9389992ed..366a7ccb4a 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -12,6 +12,7 @@ use bytes::Bytes; use futures::Stream; use scylla_cql::frame::response::NonErrorResponse; use scylla_cql::frame::types::SerialConsistency; +use scylla_cql::types::serialize::row::SerializedValues; use std::result::Result; use thiserror::Error; use tokio::sync::mpsc; @@ -22,12 +23,9 @@ use super::execution_profile::ExecutionProfileInner; use super::session::RequestSpan; use crate::cql_to_rust::{FromRow, FromRowError}; -use crate::frame::{ - response::{ - result, - result::{ColumnSpec, Row, Rows}, - }, - value::SerializedValues, +use crate::frame::response::{ + result, + result::{ColumnSpec, Row, Rows}, }; use crate::history::{self, HistoryListener}; use crate::statement::Consistency; @@ -128,7 +126,6 @@ impl RowIterator { pub(crate) async fn new_for_query( mut query: Query, - values: SerializedValues, execution_profile: Arc, cluster_data: Arc, metrics: Arc, @@ -162,29 +159,31 @@ impl RowIterator { let parent_span = tracing::Span::current(); let worker_task = async move { let query_ref = &query; - let values_ref = &values; let choose_connection = |node: Arc| async move { node.random_connection().await }; let page_query = |connection: Arc, consistency: Consistency, - paging_state: Option| async move { - connection - .query_with_consistency( - query_ref, - values_ref, - consistency, - serial_consistency, - paging_state, - ) - .await + paging_state: Option| { + async move { + connection + .query_with_consistency( + query_ref, + consistency, + serial_consistency, + paging_state, + ) + .await + } }; let query_ref = &query; - let serialized_values_size = values.size(); - let span_creator = - move || RequestSpan::new_query(&query_ref.contents, serialized_values_size); + let span_creator = move || { + let span = RequestSpan::new_query(&query_ref.contents); + span.record_request_size(0); + span + }; let worker = RowIteratorWorker { sender: sender.into(), @@ -281,7 +280,7 @@ impl RowIterator { .await }; - let serialized_values_size = config.values.size(); + let serialized_values_size = config.values.buffer_size(); let replicas: Option> = if let (Some(keyspace), Some(token)) = @@ -337,7 +336,6 @@ impl RowIterator { pub(crate) async fn new_for_connection_query_iter( mut query: Query, connection: Arc, - values: SerializedValues, consistency: Consistency, serial_consistency: Option, ) -> Result { @@ -352,6 +350,36 @@ impl RowIterator { fetcher: |paging_state| { connection.query_with_consistency( &query, + consistency, + serial_consistency, + paging_state, + ) + }, + }; + worker.work().await + }; + + Self::new_from_worker_future(worker_task, receiver).await + } + + pub(crate) async fn new_for_connection_execute_iter( + mut prepared: PreparedStatement, + values: SerializedValues, + connection: Arc, + consistency: Consistency, + serial_consistency: Option, + ) -> Result { + if prepared.get_page_size().is_none() { + prepared.set_page_size(DEFAULT_ITER_PAGE_SIZE); + } + let (sender, receiver) = mpsc::channel::>(1); + + let worker_task = async move { + let worker = SingleConnectionRowIteratorWorker { + sender: sender.into(), + fetcher: |paging_state| { + connection.execute_with_consistency( + &prepared, &values, consistency, serial_consistency, diff --git a/scylla/src/transport/partitioner.rs b/scylla/src/transport/partitioner.rs index 4526715ab2..7a9f4b083a 100644 --- a/scylla/src/transport/partitioner.rs +++ b/scylla/src/transport/partitioner.rs @@ -1,10 +1,8 @@ use bytes::Buf; -use scylla_cql::frame::types::RawValue; +use scylla_cql::{frame::types::RawValue, types::serialize::row::SerializedValues}; use std::num::Wrapping; -use crate::{ - frame::value::SerializedValues, prepared_statement::TokenCalculationError, routing::Token, -}; +use crate::{prepared_statement::TokenCalculationError, routing::Token}; #[allow(clippy::upper_case_acronyms)] #[derive(Clone, PartialEq, Debug, Default)] @@ -342,7 +340,7 @@ pub fn calculate_token_for_partition_key( ) -> Result { let mut partitioner_hasher = partitioner.build_hasher(); - if serialized_partition_key_values.len() == 1 { + if serialized_partition_key_values.element_count() == 1 { let val = serialized_partition_key_values.iter().next().unwrap(); if let RawValue::Value(val) = val { partitioner_hasher.write(val); diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 39fbdf78b5..bf8c8f5200 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -16,6 +16,7 @@ use itertools::{Either, Itertools}; pub use scylla_cql::errors::TranslationError; use scylla_cql::frame::response::result::{deser_cql_value, ColumnSpec, Rows}; use scylla_cql::frame::response::NonErrorResponse; +use scylla_cql::types::serialize::row::SerializeRow; use std::borrow::Borrow; use std::collections::HashMap; use std::fmt::Display; @@ -46,9 +47,7 @@ use super::NodeRef; use crate::cql_to_rust::FromRow; use crate::frame::response::cql_to_rust::FromRowError; use crate::frame::response::result; -use crate::frame::value::{ - BatchValues, BatchValuesFirstSerialized, BatchValuesIterator, ValueList, -}; +use crate::frame::value::{BatchValues, BatchValuesFirstSerialized, BatchValuesIterator}; use crate::prepared_statement::PreparedStatement; use crate::query::Query; use crate::routing::Token; @@ -603,7 +602,7 @@ impl Session { pub async fn query( &self, query: impl Into, - values: impl ValueList, + values: impl SerializeRow, ) -> Result { self.query_paged(query, values, None).await } @@ -617,11 +616,10 @@ impl Session { pub async fn query_paged( &self, query: impl Into, - values: impl ValueList, + values: impl SerializeRow, paging_state: Option, ) -> Result { let query: Query = query.into(); - let serialized_values = values.serialized()?; let execution_profile = query .get_execution_profile_handle() @@ -640,7 +638,8 @@ impl Session { ..Default::default() }; - let span = RequestSpan::new_query(&query.contents, serialized_values.size()); + let span = RequestSpan::new_query(&query.contents); + let span_ref = &span; let run_query_result = self .run_query( statement_info, @@ -656,19 +655,35 @@ impl Session { .unwrap_or(execution_profile.serial_consistency); // Needed to avoid moving query and values into async move block let query_ref = &query; - let values_ref = &serialized_values; + let values_ref = &values; let paging_state_ref = &paging_state; async move { - connection - .query_with_consistency( - query_ref, - values_ref, - consistency, - serial_consistency, - paging_state_ref.clone(), - ) - .await - .and_then(QueryResponse::into_non_error_query_response) + if values_ref.is_empty() { + span_ref.record_request_size(0); + connection + .query_with_consistency( + query_ref, + consistency, + serial_consistency, + paging_state_ref.clone(), + ) + .await + .and_then(QueryResponse::into_non_error_query_response) + } else { + let prepared = connection.prepare(query_ref).await?; + let serialized = prepared.serialize_values(values_ref)?; + span_ref.record_request_size(serialized.buffer_size()); + connection + .execute_with_consistency( + &prepared, + &serialized, + consistency, + serial_consistency, + paging_state_ref.clone(), + ) + .await + .and_then(QueryResponse::into_non_error_query_response) + } } }, &span, @@ -764,24 +779,38 @@ impl Session { pub async fn query_iter( &self, query: impl Into, - values: impl ValueList, + values: impl SerializeRow, ) -> Result { let query: Query = query.into(); - let serialized_values = values.serialized()?; let execution_profile = query .get_execution_profile_handle() .unwrap_or_else(|| self.get_default_execution_profile_handle()) .access(); - RowIterator::new_for_query( - query, - serialized_values.into_owned(), - execution_profile, - self.cluster.get_data(), - self.metrics.clone(), - ) - .await + if values.is_empty() { + RowIterator::new_for_query( + query, + execution_profile, + self.cluster.get_data(), + self.metrics.clone(), + ) + .await + } else { + // Making RowIterator::new_for_query work with values is too hard (if even possible) + // so instead of sending one prepare to a specific connection on each iterator query, + // we fully prepare a statement beforehand. + let prepared = self.prepare(query).await?; + let values = prepared.serialize_values(&values)?; + RowIterator::new_for_prepared_statement(PreparedIteratorConfig { + prepared, + values, + execution_profile, + cluster_data: self.cluster.get_data(), + metrics: self.metrics.clone(), + }) + .await + } } /// Prepares a statement on the server side and returns a prepared statement, @@ -916,7 +945,7 @@ impl Session { pub async fn execute( &self, prepared: &PreparedStatement, - values: impl ValueList, + values: impl SerializeRow, ) -> Result { self.execute_paged(prepared, values, None).await } @@ -930,18 +959,15 @@ impl Session { pub async fn execute_paged( &self, prepared: &PreparedStatement, - values: impl ValueList, + values: impl SerializeRow, paging_state: Option, ) -> Result { - let serialized_values = values.serialized()?; + let serialized_values = prepared.serialize_values(&values)?; let values_ref = &serialized_values; let paging_state_ref = &paging_state; let (partition_key, token) = prepared - .extract_partition_key_and_calculate_token( - prepared.get_partitioner_name(), - &serialized_values, - )? + .extract_partition_key_and_calculate_token(prepared.get_partitioner_name(), values_ref)? .unzip(); let execution_profile = prepared @@ -966,7 +992,7 @@ impl Session { let span = RequestSpan::new_prepared( partition_key.as_ref().map(|pk| pk.iter()), token, - serialized_values.size(), + serialized_values.buffer_size(), ); if !span.span().is_disabled() { @@ -1076,10 +1102,10 @@ impl Session { pub async fn execute_iter( &self, prepared: impl Into, - values: impl ValueList, + values: impl SerializeRow, ) -> Result { let prepared = prepared.into(); - let serialized_values = values.serialized()?; + let serialized_values = prepared.serialize_values(&values)?; let execution_profile = prepared .get_execution_profile_handle() @@ -1088,7 +1114,7 @@ impl Session { RowIterator::new_for_prepared_statement(PreparedIteratorConfig { prepared, - values: serialized_values.into_owned(), + values: serialized_values, execution_profile, cluster_data: self.cluster.get_data(), metrics: self.metrics.clone(), @@ -1189,7 +1215,7 @@ impl Session { let first_value_token = statement_info.token; // Reuse first serialized value when serializing query, and delegate to `BatchValues::write_next_to_request` - // directly for others (if they weren't already serialized, possibly don't even allocate the `SerializedValues`) + // directly for others (if they weren't already serialized, possibly don't even allocate the `LegacySerializedValues`) let values = BatchValuesFirstSerialized::new(&values, first_serialized_value); let values_ref = &values; @@ -1891,7 +1917,7 @@ pub(crate) struct RequestSpan { } impl RequestSpan { - pub(crate) fn new_query(contents: &str, request_size: usize) -> Self { + pub(crate) fn new_query(contents: &str) -> Self { use tracing::field::Empty; let span = trace_span!( @@ -1899,7 +1925,7 @@ impl RequestSpan { kind = "unprepared", contents = contents, // - request_size = request_size, + request_size = Empty, result_size = Empty, result_rows = Empty, replicas = Empty, @@ -2013,6 +2039,10 @@ impl RequestSpan { .record("replicas", tracing::field::display(&ReplicaIps(replicas))); } + pub(crate) fn record_request_size(&self, size: usize) { + self.span.record("request_size", size); + } + pub(crate) fn inc_speculative_executions(&self) { self.speculative_executions.fetch_add(1, Ordering::Relaxed); } diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index 79df0834e3..b6c9c20ba4 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -1,7 +1,6 @@ use crate as scylla; use crate::batch::{Batch, BatchStatement}; use crate::frame::response::result::Row; -use crate::frame::value::ValueList; use crate::prepared_statement::PreparedStatement; use crate::query::Query; use crate::retry_policy::{QueryInfo, RetryDecision, RetryPolicy, RetrySession}; @@ -28,7 +27,9 @@ use assert_matches::assert_matches; use bytes::Bytes; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; -use scylla_cql::frame::value::Value; +use scylla_cql::frame::response::result::ColumnType; +use scylla_cql::types::serialize::row::{SerializeRow, SerializedValues}; +use scylla_cql::types::serialize::value::SerializeCql; use std::collections::BTreeSet; use std::collections::{BTreeMap, HashMap}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -208,7 +209,9 @@ async fn test_prepared_statement() { .unwrap(); let values = (17_i32, 16_i32, "I'm prepared!!!"); - let serialized_values = values.serialized().unwrap().into_owned(); + let serialized_values_complex_pk = prepared_complex_pk_statement + .serialize_values(&values) + .unwrap(); session.execute(&prepared_statement, &values).await.unwrap(); session @@ -231,15 +234,14 @@ async fn test_prepared_statement() { .as_bigint() .unwrap(), }; - let prepared_token = Murmur3Partitioner.hash_one( - &prepared_statement - .compute_partition_key(&serialized_values) - .unwrap(), - ); + let prepared_token = Murmur3Partitioner + .hash_one(&prepared_statement.compute_partition_key(&values).unwrap()); assert_eq!(token, prepared_token); + let mut pk = SerializedValues::new(); + pk.add_value(&17_i32, &ColumnType::Int).unwrap(); let cluster_data_token = session .get_cluster_data() - .compute_token(&ks, "t2", (17_i32,)) + .compute_token(&ks, "t2", &pk) .unwrap(); assert_eq!(token, cluster_data_token); } @@ -259,13 +261,13 @@ async fn test_prepared_statement() { }; let prepared_token = Murmur3Partitioner.hash_one( &prepared_complex_pk_statement - .compute_partition_key(&serialized_values) + .compute_partition_key(&values) .unwrap(), ); assert_eq!(token, prepared_token); let cluster_data_token = session .get_cluster_data() - .compute_token(&ks, "complex_pk", &serialized_values) + .compute_token(&ks, "complex_pk", &serialized_values_complex_pk) .unwrap(); assert_eq!(token, cluster_data_token); } @@ -319,9 +321,10 @@ async fn test_prepared_statement() { assert!(e.is_none()); assert_eq!((a, b, c, d), (17, 16, &String::from("I'm prepared!!!"), 7)) } - // Check that ValueList macro works + // Check that SerializeRow macro works { - #[derive(scylla::ValueList, scylla::FromRow, PartialEq, Debug, Clone)] + #[derive(scylla::SerializeRow, scylla::FromRow, PartialEq, Debug, Clone)] + #[scylla(crate = crate)] struct ComplexPk { a: i32, b: i32, @@ -510,7 +513,7 @@ async fn test_token_calculation() { s.push('a'); } let values = (&s,); - let serialized_values = values.serialized().unwrap().into_owned(); + let serialized_values = prepared_statement.serialize_values(&values).unwrap(); session.execute(&prepared_statement, &values).await.unwrap(); let rs = session @@ -529,11 +532,8 @@ async fn test_token_calculation() { .as_bigint() .unwrap(), }; - let prepared_token = Murmur3Partitioner.hash_one( - &prepared_statement - .compute_partition_key(&serialized_values) - .unwrap(), - ); + let prepared_token = Murmur3Partitioner + .hash_one(&prepared_statement.compute_partition_key(&values).unwrap()); assert_eq!(token, prepared_token); let cluster_data_token = session .get_cluster_data() @@ -1991,14 +1991,17 @@ async fn test_unusual_valuelists() { .await .unwrap(); - let values_dyn: Vec<&dyn Value> = - vec![&1 as &dyn Value, &2 as &dyn Value, &"&dyn" as &dyn Value]; + let values_dyn: Vec<&dyn SerializeCql> = vec![ + &1 as &dyn SerializeCql, + &2 as &dyn SerializeCql, + &"&dyn" as &dyn SerializeCql, + ]; session.execute(&insert_a_b_c, values_dyn).await.unwrap(); - let values_box_dyn: Vec> = vec![ - Box::new(1) as Box, - Box::new(3) as Box, - Box::new("Box dyn") as Box, + let values_box_dyn: Vec> = vec![ + Box::new(1) as Box, + Box::new(3) as Box, + Box::new("Box dyn") as Box, ]; session .execute(&insert_a_b_c, values_box_dyn) @@ -2776,26 +2779,24 @@ async fn test_manual_primary_key_computation() { async fn assert_tokens_equal( session: &Session, prepared: &PreparedStatement, - pk_values_in_pk_order: impl ValueList, - all_values_in_query_order: impl ValueList, + serialized_pk_values_in_pk_order: &SerializedValues, + all_values_in_query_order: impl SerializeRow, ) { - let serialized_values_in_pk_order = - pk_values_in_pk_order.serialized().unwrap().into_owned(); - let serialized_values_in_query_order = - all_values_in_query_order.serialized().unwrap().into_owned(); + let token_by_prepared = prepared + .calculate_token(&all_values_in_query_order) + .unwrap() + .unwrap(); session - .execute(prepared, &serialized_values_in_query_order) + .execute(prepared, all_values_in_query_order) .await .unwrap(); - let token_by_prepared = prepared - .calculate_token(&serialized_values_in_query_order) - .unwrap() - .unwrap(); - let token_by_hand = - calculate_token_for_partition_key(&serialized_values_in_pk_order, &Murmur3Partitioner) - .unwrap(); + let token_by_hand = calculate_token_for_partition_key( + serialized_pk_values_in_pk_order, + &Murmur3Partitioner, + ) + .unwrap(); println!( "by_prepared: {}, by_hand: {}", token_by_prepared.value, token_by_hand.value @@ -2819,13 +2820,16 @@ async fn test_manual_primary_key_computation() { .await .unwrap(); - let pk_values_in_pk_order = (17_i32,); + let mut pk_values_in_pk_order = SerializedValues::new(); + pk_values_in_pk_order + .add_value(&17_i32, &ColumnType::Int) + .unwrap(); let all_values_in_query_order = (17_i32, 16_i32, "I'm prepared!!!"); assert_tokens_equal( &session, &prepared_simple_pk, - pk_values_in_pk_order, + &pk_values_in_pk_order, all_values_in_query_order, ) .await; @@ -2845,13 +2849,22 @@ async fn test_manual_primary_key_computation() { .await .unwrap(); - let pk_values_in_pk_order = (17_i32, 16_i32, "I'm prepared!!!"); + let mut pk_values_in_pk_order = SerializedValues::new(); + pk_values_in_pk_order + .add_value(&17_i32, &ColumnType::Int) + .unwrap(); + pk_values_in_pk_order + .add_value(&16_i32, &ColumnType::Int) + .unwrap(); + pk_values_in_pk_order + .add_value(&"I'm prepared!!!", &ColumnType::Ascii) + .unwrap(); let all_values_in_query_order = (17_i32, "I'm prepared!!!", 16_i32); assert_tokens_equal( &session, &prepared_complex_pk, - pk_values_in_pk_order, + &pk_values_in_pk_order, all_values_in_query_order, ) .await; diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index 63ee14f5b2..e7a2adcff2 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -15,7 +15,6 @@ use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; use scylla_cql::errors::NewSessionError; use scylla_cql::frame::response::result::Row; -use scylla_cql::frame::value::ValueList; use scylla_macros::FromRow; use std::borrow::BorrowMut; use std::cell::Cell; @@ -751,7 +750,7 @@ async fn query_peers(conn: &Arc, connect_port: u16) -> Result, connect_port: u16) -> Result( conn: &Arc, - query_str: &str, - keyspaces_to_fetch: &[String], -) -> impl Stream> { - let keyspaces = &[keyspaces_to_fetch] as &[&[String]]; - let (query_str, query_values) = if !keyspaces_to_fetch.is_empty() { - (format!("{query_str} where keyspace_name in ?"), keyspaces) - } else { - (query_str.into(), &[] as &[&[String]]) - }; - let query_values = query_values.serialized().map(|sv| sv.into_owned()); - let mut query = Query::new(query_str); + query_str: &'a str, + keyspaces_to_fetch: &'a [String], +) -> impl Stream> + 'a { let conn = conn.clone(); - query.set_page_size(1024); + let fut = async move { - let query_values = query_values?; - conn.query_iter(query, query_values).await + if keyspaces_to_fetch.is_empty() { + let mut query = Query::new(query_str); + query.set_page_size(1024); + + conn.query_iter(query).await + } else { + let keyspaces = &[keyspaces_to_fetch] as &[&[String]]; + let query_str = format!("{query_str} where keyspace_name in ?"); + + let mut query = Query::new(query_str); + query.set_page_size(1024); + + let prepared = conn.prepare(&query).await?; + let serialized_values = prepared.serialize_values(&keyspaces)?; + conn.execute_iter(prepared, serialized_values).await + } }; fut.into_stream().try_flatten() } @@ -1601,7 +1606,7 @@ async fn query_table_partitioners( let rows = conn .clone() - .query_iter(partitioner_query, &[]) + .query_iter(partitioner_query) .into_stream() .try_flatten();