Skip to content

Commit

Permalink
Merge pull request #925 from muzarski/use_prepared_metadata_to_decode…
Browse files Browse the repository at this point in the history
…_rows

Use prepared statement result metadata to decode rows
  • Loading branch information
piodul authored Feb 16, 2024
2 parents 29f6744 + 58a9885 commit 8a5be0a
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 32 deletions.
1 change: 1 addition & 0 deletions scylla-cql/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fn make_query(contents: &str, values: SerializedValues) -> query::Query<'_> {
consistency: scylla_cql::Consistency::LocalQuorum,
serial_consistency: None,
values: Cow::Owned(values),
skip_metadata: false,
page_size: None,
paging_state: None,
timestamp: None,
Expand Down
3 changes: 3 additions & 0 deletions scylla-cql/src/frame/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ mod tests {
timestamp: None,
page_size: Some(323),
paging_state: Some(vec![2, 1, 3, 7].into()),
skip_metadata: false,
values: {
let mut vals = SerializedValues::new();
vals.add_value(&2137, &ColumnType::Int).unwrap();
Expand Down Expand Up @@ -177,6 +178,7 @@ mod tests {
timestamp: Some(3423434),
page_size: None,
paging_state: None,
skip_metadata: false,
values: {
let mut vals = SerializedValues::new();
vals.add_value(&42, &ColumnType::Int).unwrap();
Expand Down Expand Up @@ -234,6 +236,7 @@ mod tests {
timestamp: None,
page_size: None,
paging_state: None,
skip_metadata: false,
values: Cow::Borrowed(SerializedValues::EMPTY),
};
let query = Query {
Expand Down
8 changes: 8 additions & 0 deletions scylla-cql/src/frame/request/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct QueryParameters<'a> {
pub timestamp: Option<i64>,
pub page_size: Option<i32>,
pub paging_state: Option<Bytes>,
pub skip_metadata: bool,
pub values: Cow<'a, SerializedValues>,
}

Expand All @@ -74,6 +75,7 @@ impl Default for QueryParameters<'_> {
timestamp: None,
page_size: None,
paging_state: None,
skip_metadata: false,
values: Cow::Borrowed(SerializedValues::EMPTY),
}
}
Expand All @@ -88,6 +90,10 @@ impl QueryParameters<'_> {
flags |= FLAG_VALUES;
}

if self.skip_metadata {
flags |= FLAG_SKIP_METADATA;
}

if self.page_size.is_some() {
flags |= FLAG_PAGE_SIZE;
}
Expand Down Expand Up @@ -143,6 +149,7 @@ impl<'q> QueryParameters<'q> {
)));
}
let values_flag = (flags & FLAG_VALUES) != 0;
let skip_metadata = (flags & FLAG_SKIP_METADATA) != 0;
let page_size_flag = (flags & FLAG_PAGE_SIZE) != 0;
let paging_state_flag = (flags & FLAG_WITH_PAGING_STATE) != 0;
let serial_consistency_flag = (flags & FLAG_WITH_SERIAL_CONSISTENCY) != 0;
Expand Down Expand Up @@ -192,6 +199,7 @@ impl<'q> QueryParameters<'q> {
timestamp,
page_size,
paging_state,
skip_metadata,
values,
})
}
Expand Down
11 changes: 6 additions & 5 deletions scylla-cql/src/frame/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ pub mod event;
pub mod result;
pub mod supported;

use crate::{errors::QueryError, frame::frame_errors::ParseError};

use crate::frame::protocol_features::ProtocolFeatures;
pub use error::Error;
pub use supported::Supported;

use super::TryFromPrimitiveError;
use crate::frame::protocol_features::ProtocolFeatures;
use crate::frame::response::result::ResultMetadata;
use crate::frame::TryFromPrimitiveError;
use crate::{errors::QueryError, frame::frame_errors::ParseError};

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
Expand Down Expand Up @@ -64,6 +64,7 @@ impl Response {
features: &ProtocolFeatures,
opcode: ResponseOpcode,
buf: &mut &[u8],
cached_metadata: Option<&ResultMetadata>,
) -> Result<Response, ParseError> {
let response = match opcode {
ResponseOpcode::Error => Response::Error(Error::deserialize(features, buf)?),
Expand All @@ -72,7 +73,7 @@ impl Response {
Response::Authenticate(authenticate::Authenticate::deserialize(buf)?)
}
ResponseOpcode::Supported => Response::Supported(Supported::deserialize(buf)?),
ResponseOpcode::Result => Response::Result(result::deserialize(buf)?),
ResponseOpcode::Result => Response::Result(result::deserialize(buf, cached_metadata)?),
ResponseOpcode::Event => Response::Event(event::Event::deserialize(buf)?),
ResponseOpcode::AuthChallenge => {
Response::AuthChallenge(authenticate::AuthChallenge::deserialize(buf)?)
Expand Down
37 changes: 26 additions & 11 deletions scylla-cql/src/frame/response/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ pub struct ColumnSpec {
pub typ: ColumnType,
}

#[derive(Debug, Default)]
#[derive(Debug, Clone, Default)]
pub struct ResultMetadata {
col_count: usize,
pub paging_state: Option<Bytes>,
Expand Down Expand Up @@ -886,17 +886,29 @@ pub fn deser_cql_value(typ: &ColumnType, buf: &mut &[u8]) -> StdResult<CqlValue,
})
}

fn deser_rows(buf: &mut &[u8]) -> StdResult<Rows, ParseError> {
let metadata = deser_result_metadata(buf)?;
fn deser_rows(
buf: &mut &[u8],
cached_metadata: Option<&ResultMetadata>,
) -> StdResult<Rows, ParseError> {
let server_metadata = deser_result_metadata(buf)?;

let metadata = match cached_metadata {
Some(metadata) => metadata.clone(),
None => {
// No cached_metadata provided. Server is supposed to provide the result metadata.
if server_metadata.col_count != server_metadata.col_specs.len() {
return Err(ParseError::BadIncomingData(format!(
"Bad result metadata provided in the response. Expected {} column specifications, received: {}",
server_metadata.col_count,
server_metadata.col_specs.len()
)));
}
server_metadata
}
};

let original_size = buf.len();

// TODO: the protocol allows an optimization (which must be explicitly requested on query by
// the driver) where the column metadata is not sent with the result.
// Implement this optimization. We'll then need to take the column types by a parameter.
// Beware of races; our column types may be outdated.
assert!(metadata.col_count == metadata.col_specs.len());

let rows_count: usize = types::read_int(buf)?.try_into()?;

let mut rows = Vec::with_capacity(rows_count);
Expand Down Expand Up @@ -946,11 +958,14 @@ fn deser_schema_change(buf: &mut &[u8]) -> StdResult<SchemaChange, ParseError> {
})
}

pub fn deserialize(buf: &mut &[u8]) -> StdResult<Result, ParseError> {
pub fn deserialize(
buf: &mut &[u8],
cached_metadata: Option<&ResultMetadata>,
) -> StdResult<Result, ParseError> {
use self::Result::*;
Ok(match types::read_int(buf)? {
0x0001 => Void,
0x0002 => Rows(deser_rows(buf)?),
0x0002 => Rows(deser_rows(buf, cached_metadata)?),
0x0003 => SetKeyspace(deser_set_keyspace(buf)?),
0x0004 => Prepared(deser_prepared(buf)?),
0x0005 => SchemaChange(deser_schema_change(buf)?),
Expand Down
1 change: 1 addition & 0 deletions scylla/src/statement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) struct StatementConfig {

pub(crate) is_idempotent: bool,

pub(crate) skip_result_metadata: bool,
pub(crate) tracing: bool,
pub(crate) timestamp: Option<i64>,
pub(crate) request_timeout: Option<Duration>,
Expand Down
50 changes: 47 additions & 3 deletions scylla/src/statement/prepared_statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time::Duration;
use thiserror::Error;
use uuid::Uuid;

use scylla_cql::frame::response::result::ColumnSpec;
use scylla_cql::frame::response::result::{ColumnSpec, PartitionKeyIndex, ResultMetadata};

use super::StatementConfig;
use crate::frame::response::result::PreparedMetadata;
Expand All @@ -37,6 +37,7 @@ pub struct PreparedStatement {
#[derive(Debug)]
struct PreparedStatementSharedData {
metadata: PreparedMetadata,
result_metadata: ResultMetadata,
statement: String,
}

Expand All @@ -59,6 +60,7 @@ impl PreparedStatement {
id: Bytes,
is_lwt: bool,
metadata: PreparedMetadata,
result_metadata: ResultMetadata,
statement: String,
page_size: Option<i32>,
config: StatementConfig,
Expand All @@ -67,6 +69,7 @@ impl PreparedStatement {
id,
shared: Arc::new(PreparedStatementSharedData {
metadata,
result_metadata,
statement,
}),
prepare_tracing_ids: Vec::new(),
Expand Down Expand Up @@ -270,6 +273,27 @@ impl PreparedStatement {
self.config.tracing
}

/// Make use of cached metadata to decode results
/// of the statement's execution.
///
/// If true, the driver will request the server not to
/// attach the result metadata in response to the statement execution.
///
/// The driver will cache the result metadata received from the server
/// after statement preparation and will use it
/// to deserialize the results of statement execution.
///
/// This option is false by default.
pub fn set_use_cached_result_metadata(&mut self, use_cached_metadata: bool) {
self.config.skip_result_metadata = use_cached_metadata;
}

/// Gets the information whether the driver uses cached metadata
/// to decode the results of the statement's execution.
pub fn get_use_cached_result_metadata(&self) -> bool {
self.config.skip_result_metadata
}

/// Sets the default timestamp for this statement in microseconds.
/// If not None, it will replace the server side assigned timestamp as default timestamp
/// If a statement contains a `USING TIMESTAMP` clause, calling this method won't change
Expand Down Expand Up @@ -301,11 +325,31 @@ impl PreparedStatement {
self.partitioner_name = partitioner_name;
}

/// Access metadata about this prepared statement as returned by the database
pub fn get_prepared_metadata(&self) -> &PreparedMetadata {
/// Access metadata about the bind variables of this statement as returned by the database
pub(crate) fn get_prepared_metadata(&self) -> &PreparedMetadata {
&self.shared.metadata
}

/// Access column specifications of the bind variables of this statement
pub fn get_variable_col_specs(&self) -> &[ColumnSpec] {
&self.shared.metadata.col_specs
}

/// Access info about partition key indexes of the bind variables of this statement
pub fn get_variable_pk_indexes(&self) -> &[PartitionKeyIndex] {
&self.shared.metadata.pk_indexes
}

/// Access metadata about the result of prepared statement returned by the database
pub(crate) fn get_result_metadata(&self) -> &ResultMetadata {
&self.shared.result_metadata
}

/// Access column specifications of the result set returned after the execution of this statement
pub fn get_result_set_col_specs(&self) -> &[ColumnSpec] {
&self.shared.result_metadata.col_specs
}

/// Get the name of the partitioner used for this statement.
pub(crate) fn get_partitioner_name(&self) -> &PartitionerName {
&self.partitioner_name
Expand Down
5 changes: 4 additions & 1 deletion scylla/src/transport/caching_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{QueryResult, Session};
use bytes::Bytes;
use dashmap::DashMap;
use futures::future::try_join_all;
use scylla_cql::frame::response::result::PreparedMetadata;
use scylla_cql::frame::response::result::{PreparedMetadata, ResultMetadata};
use scylla_cql::types::serialize::batch::BatchValues;
use scylla_cql::types::serialize::row::SerializeRow;
use std::collections::hash_map::RandomState;
Expand All @@ -23,6 +23,7 @@ struct RawPreparedStatementData {
id: Bytes,
is_confirmed_lwt: bool,
metadata: PreparedMetadata,
result_metadata: ResultMetadata,
partitioner_name: PartitionerName,
}

Expand Down Expand Up @@ -168,6 +169,7 @@ where
raw.id.clone(),
raw.is_confirmed_lwt,
raw.metadata.clone(),
raw.result_metadata.clone(),
query.contents,
page_size,
query.config,
Expand Down Expand Up @@ -195,6 +197,7 @@ where
id: prepared.get_id().clone(),
is_confirmed_lwt: prepared.is_confirmed_lwt(),
metadata: prepared.get_prepared_metadata().clone(),
result_metadata: prepared.get_result_metadata().clone(),
partitioner_name: prepared.get_partitioner_name().clone(),
};
self.cache.insert(query_contents, raw);
Expand Down
Loading

0 comments on commit 8a5be0a

Please sign in to comment.