Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use prepared statement result metadata to decode rows #925

Merged
merged 6 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scylla-cql/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fn make_query(contents: &str, values: SerializedValues) -> query::Query<'_> {
consistency: scylla_cql::Consistency::LocalQuorum,
serial_consistency: None,
values: Cow::Owned(values),
skip_metadata: false,
page_size: None,
paging_state: None,
timestamp: None,
Expand Down
3 changes: 3 additions & 0 deletions scylla-cql/src/frame/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ mod tests {
timestamp: None,
page_size: Some(323),
paging_state: Some(vec![2, 1, 3, 7].into()),
skip_metadata: false,
values: {
let mut vals = SerializedValues::new();
vals.add_value(&2137, &ColumnType::Int).unwrap();
Expand Down Expand Up @@ -177,6 +178,7 @@ mod tests {
timestamp: Some(3423434),
page_size: None,
paging_state: None,
skip_metadata: false,
values: {
let mut vals = SerializedValues::new();
vals.add_value(&42, &ColumnType::Int).unwrap();
Expand Down Expand Up @@ -234,6 +236,7 @@ mod tests {
timestamp: None,
page_size: None,
paging_state: None,
skip_metadata: false,
values: Cow::Borrowed(SerializedValues::EMPTY),
};
let query = Query {
Expand Down
8 changes: 8 additions & 0 deletions scylla-cql/src/frame/request/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct QueryParameters<'a> {
pub timestamp: Option<i64>,
pub page_size: Option<i32>,
pub paging_state: Option<Bytes>,
pub skip_metadata: bool,
pub values: Cow<'a, SerializedValues>,
}

Expand All @@ -74,6 +75,7 @@ impl Default for QueryParameters<'_> {
timestamp: None,
page_size: None,
paging_state: None,
skip_metadata: false,
values: Cow::Borrowed(SerializedValues::EMPTY),
}
}
Expand All @@ -88,6 +90,10 @@ impl QueryParameters<'_> {
flags |= FLAG_VALUES;
}

if self.skip_metadata {
flags |= FLAG_SKIP_METADATA;
}

if self.page_size.is_some() {
flags |= FLAG_PAGE_SIZE;
}
Expand Down Expand Up @@ -143,6 +149,7 @@ impl<'q> QueryParameters<'q> {
)));
}
let values_flag = (flags & FLAG_VALUES) != 0;
let skip_metadata = (flags & FLAG_SKIP_METADATA) != 0;
let page_size_flag = (flags & FLAG_PAGE_SIZE) != 0;
let paging_state_flag = (flags & FLAG_WITH_PAGING_STATE) != 0;
let serial_consistency_flag = (flags & FLAG_WITH_SERIAL_CONSISTENCY) != 0;
Expand Down Expand Up @@ -192,6 +199,7 @@ impl<'q> QueryParameters<'q> {
timestamp,
page_size,
paging_state,
skip_metadata,
values,
})
}
Expand Down
11 changes: 6 additions & 5 deletions scylla-cql/src/frame/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ pub mod event;
pub mod result;
pub mod supported;

use crate::{errors::QueryError, frame::frame_errors::ParseError};

use crate::frame::protocol_features::ProtocolFeatures;
pub use error::Error;
pub use supported::Supported;

use super::TryFromPrimitiveError;
use crate::frame::protocol_features::ProtocolFeatures;
use crate::frame::response::result::ResultMetadata;
use crate::frame::TryFromPrimitiveError;
use crate::{errors::QueryError, frame::frame_errors::ParseError};

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
Expand Down Expand Up @@ -64,6 +64,7 @@ impl Response {
features: &ProtocolFeatures,
opcode: ResponseOpcode,
buf: &mut &[u8],
cached_metadata: Option<&ResultMetadata>,
) -> Result<Response, ParseError> {
let response = match opcode {
ResponseOpcode::Error => Response::Error(Error::deserialize(features, buf)?),
Expand All @@ -72,7 +73,7 @@ impl Response {
Response::Authenticate(authenticate::Authenticate::deserialize(buf)?)
}
ResponseOpcode::Supported => Response::Supported(Supported::deserialize(buf)?),
ResponseOpcode::Result => Response::Result(result::deserialize(buf)?),
ResponseOpcode::Result => Response::Result(result::deserialize(buf, cached_metadata)?),
ResponseOpcode::Event => Response::Event(event::Event::deserialize(buf)?),
ResponseOpcode::AuthChallenge => {
Response::AuthChallenge(authenticate::AuthChallenge::deserialize(buf)?)
Expand Down
37 changes: 26 additions & 11 deletions scylla-cql/src/frame/response/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ pub struct ColumnSpec {
pub typ: ColumnType,
}

#[derive(Debug, Default)]
#[derive(Debug, Clone, Default)]
muzarski marked this conversation as resolved.
Show resolved Hide resolved
pub struct ResultMetadata {
col_count: usize,
pub paging_state: Option<Bytes>,
Expand Down Expand Up @@ -886,17 +886,29 @@ pub fn deser_cql_value(typ: &ColumnType, buf: &mut &[u8]) -> StdResult<CqlValue,
})
}

fn deser_rows(buf: &mut &[u8]) -> StdResult<Rows, ParseError> {
let metadata = deser_result_metadata(buf)?;
fn deser_rows(
buf: &mut &[u8],
cached_metadata: Option<&ResultMetadata>,
) -> StdResult<Rows, ParseError> {
let server_metadata = deser_result_metadata(buf)?;

let metadata = match cached_metadata {
Some(metadata) => metadata.clone(),
muzarski marked this conversation as resolved.
Show resolved Hide resolved
None => {
// No cached_metadata provided. Server is supposed to provide the result metadata.
if server_metadata.col_count != server_metadata.col_specs.len() {
return Err(ParseError::BadIncomingData(format!(
"Bad result metadata provided in the response. Expected {} column specifications, received: {}",
server_metadata.col_count,
server_metadata.col_specs.len()
)));
}
server_metadata
}
};

let original_size = buf.len();

// TODO: the protocol allows an optimization (which must be explicitly requested on query by
// the driver) where the column metadata is not sent with the result.
// Implement this optimization. We'll then need to take the column types by a parameter.
// Beware of races; our column types may be outdated.
assert!(metadata.col_count == metadata.col_specs.len());

let rows_count: usize = types::read_int(buf)?.try_into()?;

let mut rows = Vec::with_capacity(rows_count);
Expand Down Expand Up @@ -946,11 +958,14 @@ fn deser_schema_change(buf: &mut &[u8]) -> StdResult<SchemaChange, ParseError> {
})
}

pub fn deserialize(buf: &mut &[u8]) -> StdResult<Result, ParseError> {
pub fn deserialize(
buf: &mut &[u8],
cached_metadata: Option<&ResultMetadata>,
) -> StdResult<Result, ParseError> {
use self::Result::*;
Ok(match types::read_int(buf)? {
0x0001 => Void,
0x0002 => Rows(deser_rows(buf)?),
0x0002 => Rows(deser_rows(buf, cached_metadata)?),
0x0003 => SetKeyspace(deser_set_keyspace(buf)?),
0x0004 => Prepared(deser_prepared(buf)?),
0x0005 => SchemaChange(deser_schema_change(buf)?),
Expand Down
1 change: 1 addition & 0 deletions scylla/src/statement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) struct StatementConfig {

pub(crate) is_idempotent: bool,

pub(crate) skip_result_metadata: bool,
pub(crate) tracing: bool,
pub(crate) timestamp: Option<i64>,
pub(crate) request_timeout: Option<Duration>,
Expand Down
50 changes: 47 additions & 3 deletions scylla/src/statement/prepared_statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time::Duration;
use thiserror::Error;
use uuid::Uuid;

use scylla_cql::frame::response::result::ColumnSpec;
use scylla_cql::frame::response::result::{ColumnSpec, PartitionKeyIndex, ResultMetadata};

use super::StatementConfig;
use crate::frame::response::result::PreparedMetadata;
Expand All @@ -37,6 +37,7 @@ pub struct PreparedStatement {
#[derive(Debug)]
struct PreparedStatementSharedData {
metadata: PreparedMetadata,
result_metadata: ResultMetadata,
statement: String,
}

Expand All @@ -59,6 +60,7 @@ impl PreparedStatement {
id: Bytes,
is_lwt: bool,
metadata: PreparedMetadata,
result_metadata: ResultMetadata,
statement: String,
page_size: Option<i32>,
config: StatementConfig,
Expand All @@ -67,6 +69,7 @@ impl PreparedStatement {
id,
shared: Arc::new(PreparedStatementSharedData {
metadata,
result_metadata,
statement,
}),
prepare_tracing_ids: Vec::new(),
Expand Down Expand Up @@ -270,6 +273,27 @@ impl PreparedStatement {
self.config.tracing
}

/// Make use of cached metadata to decode results
/// of the statement's execution.
///
/// If true, the driver will request the server not to
/// attach the result metadata in response to the statement execution.
///
/// The driver will cache the result metadata received from the server
/// after statement preparation and will use it
/// to deserialize the results of statement execution.
///
/// This option is false by default.
pub fn set_use_cached_result_metadata(&mut self, use_cached_metadata: bool) {
self.config.skip_result_metadata = use_cached_metadata;
}

/// Gets the information whether the driver uses cached metadata
/// to decode the results of the statement's execution.
pub fn get_use_cached_result_metadata(&self) -> bool {
self.config.skip_result_metadata
}

/// Sets the default timestamp for this statement in microseconds.
/// If not None, it will replace the server side assigned timestamp as default timestamp
/// If a statement contains a `USING TIMESTAMP` clause, calling this method won't change
Expand Down Expand Up @@ -301,11 +325,31 @@ impl PreparedStatement {
self.partitioner_name = partitioner_name;
}

/// Access metadata about this prepared statement as returned by the database
pub fn get_prepared_metadata(&self) -> &PreparedMetadata {
/// Access metadata about the bind variables of this statement as returned by the database
pub(crate) fn get_prepared_metadata(&self) -> &PreparedMetadata {
&self.shared.metadata
}

/// Access column specifications of the bind variables of this statement
pub fn get_variable_col_specs(&self) -> &[ColumnSpec] {
&self.shared.metadata.col_specs
}

/// Access info about partition key indexes of the bind variables of this statement
pub fn get_variable_pk_indexes(&self) -> &[PartitionKeyIndex] {
&self.shared.metadata.pk_indexes
}

/// Access metadata about the result of prepared statement returned by the database
pub(crate) fn get_result_metadata(&self) -> &ResultMetadata {
&self.shared.result_metadata
}

/// Access column specifications of the result set returned after the execution of this statement
pub fn get_result_set_col_specs(&self) -> &[ColumnSpec] {
&self.shared.result_metadata.col_specs
}
muzarski marked this conversation as resolved.
Show resolved Hide resolved

/// Get the name of the partitioner used for this statement.
pub(crate) fn get_partitioner_name(&self) -> &PartitionerName {
&self.partitioner_name
Expand Down
5 changes: 4 additions & 1 deletion scylla/src/transport/caching_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{QueryResult, Session};
use bytes::Bytes;
use dashmap::DashMap;
use futures::future::try_join_all;
use scylla_cql::frame::response::result::PreparedMetadata;
use scylla_cql::frame::response::result::{PreparedMetadata, ResultMetadata};
use scylla_cql::types::serialize::batch::BatchValues;
use scylla_cql::types::serialize::row::SerializeRow;
use std::collections::hash_map::RandomState;
Expand All @@ -23,6 +23,7 @@ struct RawPreparedStatementData {
id: Bytes,
is_confirmed_lwt: bool,
metadata: PreparedMetadata,
result_metadata: ResultMetadata,
partitioner_name: PartitionerName,
}

Expand Down Expand Up @@ -168,6 +169,7 @@ where
raw.id.clone(),
raw.is_confirmed_lwt,
raw.metadata.clone(),
raw.result_metadata.clone(),
query.contents,
page_size,
query.config,
Expand Down Expand Up @@ -195,6 +197,7 @@ where
id: prepared.get_id().clone(),
is_confirmed_lwt: prepared.is_confirmed_lwt(),
metadata: prepared.get_prepared_metadata().clone(),
result_metadata: prepared.get_result_metadata().clone(),
partitioner_name: prepared.get_partitioner_name().clone(),
};
self.cache.insert(query_contents, raw);
Expand Down
Loading
Loading