From a47c6a4401f6cd4a303a02c803166f666e9b5701 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 28 Oct 2024 08:52:27 +0100 Subject: [PATCH] session,iterator: record raw metadata&rows size Even though we can no longer record rows serialized size without accounting metadata, we can record their serialized size together. --- scylla/src/transport/iterator.rs | 2 ++ scylla/src/transport/query_result.rs | 4 ++++ scylla/src/transport/session.rs | 20 +++++++++++++++++++- 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 095e4957d..fd8ff17db 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -328,6 +328,8 @@ where .load_balancing_policy .on_query_success(&self.statement_info, elapsed, node); + request_span.record_raw_rows_fields(&rows); + let received_page = ReceivedPage { rows, tracing_id }; // Send next page to RowIterator diff --git a/scylla/src/transport/query_result.rs b/scylla/src/transport/query_result.rs index a024ea9b2..eedcb34a1 100644 --- a/scylla/src/transport/query_result.rs +++ b/scylla/src/transport/query_result.rs @@ -167,6 +167,10 @@ impl QueryResult { } } + pub(crate) fn raw_metadata_and_rows(&self) -> Option<&RawMetadataAndRawRows> { + self.raw_metadata_and_rows.as_ref() + } + /// Warnings emitted by the database. #[inline] pub fn warnings(&self) -> impl Iterator { diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index be491aa24..1defa514b 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -18,6 +18,7 @@ use async_trait::async_trait; use futures::future::join_all; use futures::future::try_join_all; use itertools::{Either, Itertools}; +use scylla_cql::frame::response::result::RawMetadataAndRawRows; use scylla_cql::frame::response::result::{deser_cql_value, ColumnSpec}; use scylla_cql::frame::response::NonErrorResponse; use scylla_cql::types::serialize::batch::BatchValues; @@ -798,6 +799,7 @@ impl Session { self.handle_auto_await_schema_agreement(&response).await?; let (result, paging_state) = response.into_query_result_and_paging_state()?; + span.record_result_fields(&result); let result = result.into_legacy_result()?; Ok((result, paging_state)) } @@ -1235,6 +1237,7 @@ impl Session { self.handle_auto_await_schema_agreement(&response).await?; let (result, paging_state) = response.into_query_result_and_paging_state()?; + span.record_result_fields(&result); let result = result.into_legacy_result()?; Ok((result, paging_state)) } @@ -1430,8 +1433,12 @@ impl Session { let result = match run_query_result { RunQueryResult::IgnoredWriteError => LegacyQueryResult::mock_empty(), - RunQueryResult::Completed(response) => response.into_legacy_result()?, + RunQueryResult::Completed(result) => { + span.record_result_fields(&result); + result.into_legacy_result()? + } }; + Ok(result) } @@ -2150,6 +2157,17 @@ impl RequestSpan { } } + pub(crate) fn record_raw_rows_fields(&self, raw_rows: &RawMetadataAndRawRows) { + self.span + .record("raw_result_size", raw_rows.metadata_and_rows_bytes_size()); + } + + pub(crate) fn record_result_fields(&self, query_result: &QueryResult) { + if let Some(raw_metadata_and_rows) = query_result.raw_metadata_and_rows() { + self.record_raw_rows_fields(raw_metadata_and_rows); + } + } + pub(crate) fn record_replicas<'a>(&'a self, replicas: &'a [(impl Borrow>, Shard)]) { struct ReplicaIps<'a, N>(&'a [(N, Shard)]); impl<'a, N> Display for ReplicaIps<'a, N>