From 545e8295aaec9fff22bef03c3ada9d8eca6f7b85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Fri, 23 Aug 2024 12:38:22 +0200 Subject: [PATCH] codewide: share ResultMetadata by an Arc ResultMetadata is now passed behind an Arc, enabling shared ownership between PreparedStatement and QueryResult and RowIterator. Thanks to that, if `use_cached_metadata` flag is set on PreparedStatement, no new allocations occur when deserializing ResultMetadata on a request result. --- scylla-cql/src/frame/response/mod.rs | 4 +++- scylla-cql/src/frame/response/result.rs | 14 ++++++-------- scylla/src/statement/prepared_statement.rs | 6 +++--- scylla/src/transport/caching_session.rs | 3 ++- scylla/src/transport/connection.rs | 6 +++--- scylla/src/transport/iterator.rs | 12 ++++++------ scylla/src/transport/query_result.rs | 4 +++- 7 files changed, 26 insertions(+), 23 deletions(-) diff --git a/scylla-cql/src/frame/response/mod.rs b/scylla-cql/src/frame/response/mod.rs index 690e945048..d084eb71c9 100644 --- a/scylla-cql/src/frame/response/mod.rs +++ b/scylla-cql/src/frame/response/mod.rs @@ -5,6 +5,8 @@ pub mod event; pub mod result; pub mod supported; +use std::sync::Arc; + pub use error::Error; pub use supported::Supported; @@ -66,7 +68,7 @@ impl Response { features: &ProtocolFeatures, opcode: ResponseOpcode, buf_bytes: bytes::Bytes, - cached_metadata: Option<&ResultMetadata>, + cached_metadata: Option<&Arc>, ) -> Result { let buf = &mut &*buf_bytes; let response = match opcode { diff --git a/scylla-cql/src/frame/response/result.rs b/scylla-cql/src/frame/response/result.rs index 87b6ec91b1..0daf5d15ed 100644 --- a/scylla-cql/src/frame/response/result.rs +++ b/scylla-cql/src/frame/response/result.rs @@ -16,6 +16,7 @@ use crate::types::deserialize::value::{ use crate::types::deserialize::{DeserializationError, FrameSlice}; use bytes::{Buf, Bytes}; use std::borrow::Cow; +use std::sync::Arc; use std::{net::IpAddr, result::Result as StdResult, str}; use uuid::Uuid; @@ -475,7 +476,7 @@ impl Row { #[derive(Debug)] pub struct Rows { - pub metadata: ResultMetadata, + pub metadata: Arc, pub paging_state: Option, pub rows_count: usize, pub rows: Vec, @@ -862,16 +863,13 @@ pub fn deser_cql_value( fn deser_rows( buf_bytes: Bytes, - cached_metadata: Option<&ResultMetadata>, + cached_metadata: Option<&Arc>, ) -> StdResult { let buf = &mut &*buf_bytes; let (server_metadata, paging_state) = deser_result_metadata(buf)?; let metadata = match cached_metadata { - Some(cached) => ResultMetadata { - col_count: cached.col_count, - col_specs: cached.col_specs.clone(), - }, + Some(cached) => Arc::clone(cached), None => { // No cached_metadata provided. Server is supposed to provide the result metadata. if server_metadata.col_count != server_metadata.col_specs.len() { @@ -880,7 +878,7 @@ fn deser_rows( col_specs_count: server_metadata.col_specs.len(), }); } - server_metadata + Arc::new(server_metadata) } }; @@ -943,7 +941,7 @@ fn deser_schema_change(buf: &mut &[u8]) -> StdResult, + cached_metadata: Option<&Arc>, ) -> StdResult { let buf = &mut &*buf_bytes; use self::Result::*; diff --git a/scylla/src/statement/prepared_statement.rs b/scylla/src/statement/prepared_statement.rs index 01656bdc26..532bfa2702 100644 --- a/scylla/src/statement/prepared_statement.rs +++ b/scylla/src/statement/prepared_statement.rs @@ -100,7 +100,7 @@ pub struct PreparedStatement { #[derive(Debug)] struct PreparedStatementSharedData { metadata: PreparedMetadata, - result_metadata: ResultMetadata, + result_metadata: Arc, statement: String, } @@ -123,7 +123,7 @@ impl PreparedStatement { id: Bytes, is_lwt: bool, metadata: PreparedMetadata, - result_metadata: ResultMetadata, + result_metadata: Arc, statement: String, page_size: Option, config: StatementConfig, @@ -412,7 +412,7 @@ impl PreparedStatement { } /// Access metadata about the result of prepared statement returned by the database - pub(crate) fn get_result_metadata(&self) -> &ResultMetadata { + pub(crate) fn get_result_metadata(&self) -> &Arc { &self.shared.result_metadata } diff --git a/scylla/src/transport/caching_session.rs b/scylla/src/transport/caching_session.rs index 903eb4e346..fa3a0c56f3 100644 --- a/scylla/src/transport/caching_session.rs +++ b/scylla/src/transport/caching_session.rs @@ -13,6 +13,7 @@ use scylla_cql::types::serialize::batch::BatchValues; use scylla_cql::types::serialize::row::SerializeRow; use std::collections::hash_map::RandomState; use std::hash::BuildHasher; +use std::sync::Arc; /// Contains just the parts of a prepared statement that were returned /// from the database. All remaining parts (query string, page size, @@ -23,7 +24,7 @@ struct RawPreparedStatementData { id: Bytes, is_confirmed_lwt: bool, metadata: PreparedMetadata, - result_metadata: ResultMetadata, + result_metadata: Arc, partitioner_name: PartitionerName, } diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index f92cdf8f27..6c7b46d5c6 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -754,7 +754,7 @@ impl Connection { .protocol_features .prepared_flags_contain_lwt_mark(p.prepared_metadata.flags as u32), p.prepared_metadata, - p.result_metadata, + Arc::new(p.result_metadata), query.contents.clone(), query.get_page_size(), query.config.clone(), @@ -1203,7 +1203,7 @@ impl Connection { request: &impl SerializableRequest, compress: bool, tracing: bool, - cached_metadata: Option<&ResultMetadata>, + cached_metadata: Option<&Arc>, ) -> Result { let compression = if compress { self.config.compression @@ -1228,7 +1228,7 @@ impl Connection { task_response: TaskResponse, compression: Option, features: &ProtocolFeatures, - cached_metadata: Option<&ResultMetadata>, + cached_metadata: Option<&Arc>, ) -> Result { let body_with_ext = frame::parse_response_body_extensions( task_response.params.flags, diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index c85570c53f..c9e4efaeeb 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -429,7 +429,7 @@ mod checked_channel_sender { errors::QueryError, frame::response::result::{ResultMetadata, Rows}, }; - use std::marker::PhantomData; + use std::{marker::PhantomData, sync::Arc}; use tokio::sync::mpsc; use uuid::Uuid; @@ -470,7 +470,7 @@ mod checked_channel_sender { ) { let empty_page = ReceivedPage { rows: Rows { - metadata: ResultMetadata::mock_empty(), + metadata: Arc::new(ResultMetadata::mock_empty()), paging_state: None, rows_count: 0, rows: Vec::new(), @@ -678,7 +678,7 @@ where match query_response { Ok(NonErrorQueryResponse { - response: NonErrorResponse::Result(result::Result::Rows(mut rows)), + response: NonErrorResponse::Result(result::Result::Rows(rows)), tracing_id, .. }) => { @@ -689,7 +689,7 @@ where .load_balancing_policy .on_query_success(&self.statement_info, elapsed, node); - self.paging_state = rows.paging_state.take(); + self.paging_state = rows.paging_state.clone(); request_span.record_rows_fields(&rows); @@ -853,8 +853,8 @@ where let result = (self.fetcher)(paging_state).await?; let response = result.into_non_error_query_response()?; match response.response { - NonErrorResponse::Result(result::Result::Rows(mut rows)) => { - paging_state = rows.paging_state.take(); + NonErrorResponse::Result(result::Result::Rows(rows)) => { + paging_state = rows.paging_state.clone(); let (proof, send_result) = self .sender .send(Ok(ReceivedPage { diff --git a/scylla/src/transport/query_result.rs b/scylla/src/transport/query_result.rs index f9edb93624..a7a9adf852 100644 --- a/scylla/src/transport/query_result.rs +++ b/scylla/src/transport/query_result.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::frame::response::cql_to_rust::{FromRow, FromRowError}; use crate::frame::response::result::ColumnSpec; use crate::frame::response::result::Row; @@ -22,7 +24,7 @@ pub struct QueryResult { /// Paging state returned from the server pub paging_state: Option, /// Metadata returned along with this response. - pub(crate) metadata: Option, + pub(crate) metadata: Option>, /// The original size of the serialized rows in request pub serialized_size: usize, }