From 02d59ac43f4cb39b5a8b8c49e320d96e94fa58b7 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Fri, 3 Feb 2023 16:38:47 +0100 Subject: [PATCH 01/13] iterator: introduce new_from_worker_future A common part of new_for_query and new_for_prepared_statement which handles spawning a task and returns a non-empty RowIterator is extracted to a common function. It will also help implement the query_iter method for Connection. --- scylla/src/transport/iterator.rs | 39 ++++++++++++-------------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 5db3c3fd7f..8cd3e88664 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -136,7 +136,7 @@ impl RowIterator { if query.get_page_size().is_none() { query.set_page_size(DEFAULT_ITER_PAGE_SIZE); } - let (sender, mut receiver) = mpsc::channel(1); + let (sender, receiver) = mpsc::channel(1); let consistency = query .config @@ -185,28 +185,10 @@ impl RowIterator { current_attempt_id: None, }; - let _: PageSendAttemptedProof = worker.work(cluster_data).await; + worker.work(cluster_data).await }; - tokio::task::spawn(worker_task); - - // This unwrap is safe because: - // - The future returned by worker.work sends at least one item - // to the channel (the PageSendAttemptedProof helps enforce this) - // - That future is polled in a tokio::task which isn't going to be - // cancelled - let pages_received = receiver.recv().await.unwrap()?; - - Ok(RowIterator { - current_row_idx: 0, - current_page: pages_received.rows, - page_receiver: receiver, - tracing_ids: if let Some(tracing_id) = pages_received.tracing_id { - vec![tracing_id] - } else { - Vec::new() - }, - }) + Self::new_from_worker_future(worker_task, receiver).await } pub(crate) async fn new_for_prepared_statement( @@ -215,7 +197,7 @@ impl RowIterator { if config.prepared.get_page_size().is_none() { config.prepared.set_page_size(DEFAULT_ITER_PAGE_SIZE); } - let (sender, mut receiver) = mpsc::channel(1); + let (sender, receiver) = mpsc::channel(1); let consistency = config .prepared @@ -277,10 +259,19 @@ impl RowIterator { current_attempt_id: None, }; - let _: PageSendAttemptedProof = worker.work(config.cluster_data).await; + worker.work(config.cluster_data).await }; - tokio::task::spawn(worker_task); + Self::new_from_worker_future(worker_task, receiver).await + } + + async fn new_from_worker_future( + worker_task: impl Future + Send + 'static, + mut receiver: mpsc::Receiver>, + ) -> Result { + tokio::task::spawn(async move { + worker_task.await; + }); // This unwrap is safe because: // - The future returned by worker.work sends at least one item From 8ecbd0c91f36b334e60fb155337f178c237ae1df Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Fri, 17 Feb 2023 11:48:28 +0100 Subject: [PATCH 02/13] iterator: introduce send_empty_page method We have two places in the iterator worker code that attempt to send an empty page, and we will have one more in the upcoming commits, so let's abstract this pattern away to a method. --- scylla/src/transport/iterator.rs | 50 ++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 8cd3e88664..736b0e8ae4 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -310,8 +310,12 @@ impl RowIterator { // A separate module is used here so that the parent module cannot construct // SendAttemptedProof directly. mod checked_channel_sender { + use scylla_cql::{errors::QueryError, frame::response::result::Rows}; use std::marker::PhantomData; use tokio::sync::mpsc; + use uuid::Uuid; + + use super::ReceivedPage; /// A value whose existence proves that there was an attempt /// to send an item of type T through a channel. @@ -335,6 +339,28 @@ mod checked_channel_sender { (SendAttemptedProof(PhantomData), self.0.send(value).await) } } + + type ResultPage = Result; + + impl ProvingSender { + pub(crate) async fn send_empty_page( + &self, + tracing_id: Option, + ) -> ( + SendAttemptedProof, + Result<(), mpsc::error::SendError>, + ) { + let empty_page = ReceivedPage { + rows: Rows { + metadata: Default::default(), + rows_count: 0, + rows: Vec::new(), + }, + tracing_id, + }; + self.send(Ok(empty_page)).await + } + } } use checked_channel_sender::{ProvingSender, SendAttemptedProof}; @@ -472,17 +498,7 @@ where // interface isn't meant for sending writes), // we must attempt to send something because // the iterator expects it. - let (proof, _) = self - .sender - .send(Ok(ReceivedPage { - rows: Rows { - metadata: Default::default(), - rows_count: 0, - rows: Vec::new(), - }, - tracing_id: None, - })) - .await; + let (proof, _) = self.sender.send_empty_page(None).await; return proof; } }; @@ -572,17 +588,7 @@ where // so let's return an empty iterator as suggested in #631. // We must attempt to send something because the iterator expects it. - let (proof, _) = self - .sender - .send(Ok(ReceivedPage { - rows: Rows { - metadata: Default::default(), - rows_count: 0, - rows: Vec::new(), - }, - tracing_id, - })) - .await; + let (proof, _) = self.sender.send_empty_page(tracing_id).await; return Ok(proof); } Ok(_) => { From e4ee32da92654f358f6fe8b3c40ff3bf3ee810ad Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Mon, 6 Feb 2023 09:30:24 +0100 Subject: [PATCH 03/13] iterator: introduce new_for_connection_query_iter Adds the RowIterator::new_for_connection_query_iter method which will be the main workhorse of the Connection::query_iter method. It uses a new iterator worker which is a much simpler version of the proper, multi-connection iterator worker. --- scylla/src/transport/iterator.rs | 92 ++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 736b0e8ae4..c94a3235c6 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -9,6 +9,7 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::Stream; +use scylla_cql::frame::types::SerialConsistency; use std::result::Result; use thiserror::Error; use tokio::sync::mpsc; @@ -265,6 +266,37 @@ impl RowIterator { Self::new_from_worker_future(worker_task, receiver).await } + pub(crate) async fn new_for_connection_query_iter( + mut query: Query, + connection: Arc, + values: SerializedValues, + consistency: Consistency, + serial_consistency: Option, + ) -> Result { + if query.get_page_size().is_none() { + query.set_page_size(DEFAULT_ITER_PAGE_SIZE); + } + let (sender, receiver) = mpsc::channel::>(1); + + let worker_task = async move { + let worker = SingleConnectionRowIteratorWorker { + sender: sender.into(), + fetcher: |paging_state| { + connection.query_with_consistency( + &query, + &values, + consistency, + serial_consistency, + paging_state, + ) + }, + }; + worker.work().await + }; + + Self::new_from_worker_future(worker_task, receiver).await + } + async fn new_from_worker_future( worker_task: impl Future + Send + 'static, mut receiver: mpsc::Receiver>, @@ -683,6 +715,66 @@ where } } +/// A massively simplified version of the RowIteratorWorker. It does not have +/// any complicated logic related to retries, it just fetches pages from +/// a single connection. +struct SingleConnectionRowIteratorWorker { + sender: ProvingSender>, + fetcher: Fetcher, +} + +impl SingleConnectionRowIteratorWorker +where + Fetcher: Fn(Option) -> FetchFut + Send + Sync, + FetchFut: Future> + Send, +{ + async fn work(mut self) -> PageSendAttemptedProof { + match self.do_work().await { + Ok(proof) => proof, + Err(err) => { + let (proof, _) = self.sender.send(Err(err)).await; + proof + } + } + } + + async fn do_work(&mut self) -> Result { + let mut paging_state = None; + loop { + let result = (self.fetcher)(paging_state).await?; + let response = result.into_non_error_query_response()?; + match response.response { + NonErrorResponse::Result(result::Result::Rows(mut rows)) => { + paging_state = rows.metadata.paging_state.take(); + let (proof, send_result) = self + .sender + .send(Ok(ReceivedPage { + rows, + tracing_id: response.tracing_id, + })) + .await; + if paging_state.is_none() || send_result.is_err() { + return Ok(proof); + } + } + NonErrorResponse::Result(_) => { + // We have most probably sent a modification statement (e.g. INSERT or UPDATE), + // so let's return an empty iterator as suggested in #631. + + // We must attempt to send something because the iterator expects it. + let (proof, _) = self.sender.send_empty_page(response.tracing_id).await; + return Ok(proof); + } + _ => { + return Err(QueryError::ProtocolError( + "Unexpected response to next page query", + )); + } + } + } + } +} + /// Iterator over rows returned by paged queries /// where each row is parsed as the given type\ /// Returned by `RowIterator::into_typed` From 6ae5870918313779730a35b0bb2631d22db35215 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Fri, 10 Feb 2023 12:15:45 +0100 Subject: [PATCH 04/13] connection: introduce query_iter The new Connection::query_iter method will serve as a replacement for the query_all method. Contrary to query_all, query_iter will not materialize all query results as a single QueryResult but will allow to process the results row by row. --- scylla/src/transport/connection.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index a9ace5c31a..cb37892a29 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -30,6 +30,7 @@ use std::{ }; use super::errors::{BadKeyspaceName, BadQuery, DbError, QueryError}; +use super::iterator::RowIterator; use crate::batch::{Batch, BatchStatement}; use crate::frame::protocol_features::ProtocolFeatures; @@ -628,6 +629,30 @@ impl Connection { } } + /// Executes a query and fetches its results over multiple pages, using + /// the asynchronous iterator interface. + pub(crate) async fn query_iter( + self: Arc, + query: Query, + values: impl ValueList, + ) -> Result { + let serialized_values = values.serialized()?.into_owned(); + + let consistency = query + .config + .determine_consistency(self.config.default_consistency); + let serial_consistency = query.config.serial_consistency.flatten(); + + RowIterator::new_for_connection_query_iter( + query, + self, + serialized_values, + consistency, + serial_consistency, + ) + .await + } + #[allow(dead_code)] pub async fn batch( &self, From 1a28f25843447ac5e543592b93aa4c31a201691d Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 16 Feb 2023 10:53:41 +0100 Subject: [PATCH 05/13] topology: propagate Arc to query_peers In order to use Connection::query_iter it is neccessary to pass self wrapped in an Arc - the iterator worker is spawned as a separate tokio task, so it needs to co-own the Connection. --- scylla/src/transport/topology.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index f73b957af7..57d1177110 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -335,7 +335,7 @@ impl MetadataReader { async fn fetch_metadata(&self, initial: bool) -> Result { // TODO: Timeouts? self.control_connection.wait_until_initialized().await; - let conn = &*self.control_connection.random_connection()?; + let conn = &self.control_connection.random_connection()?; let res = query_metadata( conn, @@ -448,7 +448,7 @@ impl MetadataReader { } async fn query_metadata( - conn: &Connection, + conn: &Arc, connect_port: u16, address_translator: Option<&dyn AddressTranslator>, keyspace_to_fetch: &[String], @@ -477,7 +477,7 @@ async fn query_metadata( } async fn query_peers( - conn: &Connection, + conn: &Arc, connect_port: u16, address_translator: Option<&dyn AddressTranslator>, ) -> Result, QueryError> { From cb28052c83505873d2bb3b40e09351c33f171ba9 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 16 Feb 2023 11:07:30 +0100 Subject: [PATCH 06/13] topology: use a structure to represent a row from system.{local,peers} Thanks to #628, it is now possible to provide an explicit path to a crate which provides necessary items for the FromRow macro. It is now possible to use this macro from the `scylla` crate itself, as we just need to point it to the `scylla-cql` crate instead. The NodeInfoRow struct is introduced to represent rows fetched from system.local and system.peers. It looks nicer than using a 5-element tuple and will reduce some noise when performing refactors in the commits that follow. --- scylla/src/transport/topology.rs | 33 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index 57d1177110..b8e948bfab 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -13,6 +13,7 @@ use futures::future::try_join_all; use itertools::Itertools; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; +use scylla_macros::FromRow; use std::borrow::BorrowMut; use std::collections::HashMap; use std::fmt; @@ -476,6 +477,16 @@ async fn query_metadata( Ok(Metadata { peers, keyspaces }) } +#[derive(FromRow)] +#[scylla_crate = "scylla_cql"] +struct NodeInfoRow { + host_id: Option, + untranslated_ip_addr: IpAddr, + datacenter: Option, + rack: Option, + tokens: Option>, +} + async fn query_peers( conn: &Arc, connect_port: u16, @@ -501,35 +512,23 @@ async fn query_peers( "system.local query response was not Rows", ))?; - let typed_peers_rows = peers_rows.into_typed::<( - Option, - IpAddr, - Option, - Option, - Option>, - )>(); + let typed_peers_rows = peers_rows.into_typed::(); let local_ip: IpAddr = conn.get_connect_address().ip(); let local_address = SocketAddr::new(local_ip, connect_port); - let typed_local_rows = local_rows.into_typed::<( - Option, - IpAddr, - Option, - Option, - Option>, - )>(); + let typed_local_rows = local_rows.into_typed::(); let untranslated_rows = typed_peers_rows .map(|res| res.map(|peer_row| (false, peer_row))) .chain(typed_local_rows.map(|res| res.map(|local_row| (true, local_row)))); let translated_peers_futures = untranslated_rows - .filter_map_ok(|(is_local, (host_id, ip, dc, rack, tokens))| if let Some(host_id) = host_id { - Some((is_local, (host_id, ip, dc, rack, tokens))) + .filter_map_ok(|(is_local, NodeInfoRow { host_id, untranslated_ip_addr, datacenter, rack, tokens })| if let Some(host_id) = host_id { + Some((is_local, (host_id, untranslated_ip_addr, datacenter, rack, tokens))) } else { let who = if is_local { "Local node" } else { "Peer" }; - warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", who, ip, dc, rack); + warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", who, untranslated_ip_addr, datacenter, rack); None }) .map(|untranslated_row| async { From 2e36d6a38301ea2111435a50e02100f40ea62f6f Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 16 Feb 2023 11:37:19 +0100 Subject: [PATCH 07/13] topology: replace bool with NodeInfoSource enum An enum expresses intent a little more clearly than a bool. --- scylla/src/transport/topology.rs | 38 ++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index b8e948bfab..982bcc3a7f 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -487,6 +487,21 @@ struct NodeInfoRow { tokens: Option>, } +#[derive(Clone, Copy)] +enum NodeInfoSource { + Local, + Peer, +} + +impl NodeInfoSource { + fn describe(&self) -> &'static str { + match self { + Self::Local => "local node", + Self::Peer => "peer", + } + } +} + async fn query_peers( conn: &Arc, connect_port: u16, @@ -520,37 +535,36 @@ async fn query_peers( let typed_local_rows = local_rows.into_typed::(); let untranslated_rows = typed_peers_rows - .map(|res| res.map(|peer_row| (false, peer_row))) - .chain(typed_local_rows.map(|res| res.map(|local_row| (true, local_row)))); + .map(|res| res.map(|peer_row| (NodeInfoSource::Peer, peer_row))) + .chain(typed_local_rows.map(|res| res.map(|local_row| (NodeInfoSource::Local, local_row)))); let translated_peers_futures = untranslated_rows - .filter_map_ok(|(is_local, NodeInfoRow { host_id, untranslated_ip_addr, datacenter, rack, tokens })| if let Some(host_id) = host_id { - Some((is_local, (host_id, untranslated_ip_addr, datacenter, rack, tokens))) + .filter_map_ok(|(source, NodeInfoRow { host_id, untranslated_ip_addr, datacenter, rack, tokens })| if let Some(host_id) = host_id { + Some((source, (host_id, untranslated_ip_addr, datacenter, rack, tokens))) } else { - let who = if is_local { "Local node" } else { "Peer" }; - warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", who, untranslated_ip_addr, datacenter, rack); + warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", source.describe(), untranslated_ip_addr, datacenter, rack); None }) .map(|untranslated_row| async { - let (is_local, (host_id, untranslated_ip_addr, datacenter, rack, tokens)) = untranslated_row.map_err( + let (source, (host_id, untranslated_ip_addr, datacenter, rack, tokens)) = untranslated_row.map_err( |_| QueryError::ProtocolError("system.peers or system.local has invalid column type") )?; let untranslated_address = SocketAddr::new(untranslated_ip_addr, connect_port); - let (untranslated_address, address) = match (is_local, address_translator) { - (true, None) => { + let (untranslated_address, address) = match (source, address_translator) { + (NodeInfoSource::Local, None) => { // We need to replace rpc_address with control connection address. (Some(untranslated_address), local_address) }, - (true, Some(_)) => { + (NodeInfoSource::Local, Some(_)) => { // The address we used to connect is most likely different and we just don't know. (None, local_address) }, - (false, None) => { + (NodeInfoSource::Peer, None) => { // The usual case - no translation. (Some(untranslated_address), untranslated_address) }, - (false, Some(translator)) => { + (NodeInfoSource::Peer, Some(translator)) => { // We use the provided translator and skip the peer if there is no rule for translating it. (Some(untranslated_address), match translator.translate_address(&UntranslatedPeer {host_id, untranslated_address}).await { From d882ce474610185675bef69bf116d27bf0e21ccc Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 16 Feb 2023 11:54:35 +0100 Subject: [PATCH 08/13] topology: extract Peer creation from db row to a function Conversion from NodeInfoRow to Peer is done across two lambdas applied to the untranslated_rows iterator. Let's simplify it a bit and extract this logic to a new separate function create_peer_from_row. It is recommended to hide whitespace changes when reviewing the diff of this commit. --- scylla/src/transport/topology.rs | 160 ++++++++++++++++++------------- 1 file changed, 93 insertions(+), 67 deletions(-) diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index 982bcc3a7f..390a13147e 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -538,80 +538,106 @@ async fn query_peers( .map(|res| res.map(|peer_row| (NodeInfoSource::Peer, peer_row))) .chain(typed_local_rows.map(|res| res.map(|local_row| (NodeInfoSource::Local, local_row)))); - let translated_peers_futures = untranslated_rows - .filter_map_ok(|(source, NodeInfoRow { host_id, untranslated_ip_addr, datacenter, rack, tokens })| if let Some(host_id) = host_id { - Some((source, (host_id, untranslated_ip_addr, datacenter, rack, tokens))) - } else { - warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", source.describe(), untranslated_ip_addr, datacenter, rack); - None - }) - .map(|untranslated_row| async { - let (source, (host_id, untranslated_ip_addr, datacenter, rack, tokens)) = untranslated_row.map_err( + let translated_peers_futures = untranslated_rows.map(|untranslated_row| async { + let (source, row) = untranslated_row.map_err( |_| QueryError::ProtocolError("system.peers or system.local has invalid column type") )?; - let untranslated_address = SocketAddr::new(untranslated_ip_addr, connect_port); + create_peer_from_row(source, row, local_address, address_translator).await + }); - let (untranslated_address, address) = match (source, address_translator) { - (NodeInfoSource::Local, None) => { - // We need to replace rpc_address with control connection address. - (Some(untranslated_address), local_address) - }, - (NodeInfoSource::Local, Some(_)) => { - // The address we used to connect is most likely different and we just don't know. - (None, local_address) - }, - (NodeInfoSource::Peer, None) => { - // The usual case - no translation. - (Some(untranslated_address), untranslated_address) - }, - (NodeInfoSource::Peer, Some(translator)) => { - // We use the provided translator and skip the peer if there is no rule for translating it. - (Some(untranslated_address), - match translator.translate_address(&UntranslatedPeer {host_id, untranslated_address}).await { - Ok(address) => address, - Err(err) => { - warn!("Could not translate address {}; TranslationError: {:?}; node therefore skipped.", - untranslated_address, err); - return Ok::, QueryError>(None); - } - } - ) - } - }; + let peers = try_join_all(translated_peers_futures).await?; + Ok(peers.into_iter().flatten().collect()) +} - let tokens_str: Vec = tokens.unwrap_or_default(); +async fn create_peer_from_row( + source: NodeInfoSource, + row: NodeInfoRow, + local_address: SocketAddr, + address_translator: Option<&dyn AddressTranslator>, +) -> Result, QueryError> { + let NodeInfoRow { + host_id, + untranslated_ip_addr, + datacenter, + rack, + tokens, + } = row; + + let host_id = match host_id { + Some(host_id) => host_id, + None => { + warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", source.describe(), untranslated_ip_addr, datacenter, rack); + return Ok(None); + } + }; - // Parse string representation of tokens as integer values - let tokens: Vec = match tokens_str - .iter() - .map(|s| Token::from_str(s)) - .collect::, _>>() - { - Ok(parsed) => parsed, - Err(e) => { - // FIXME: we could allow the users to provide custom partitioning information - // in order for it to work with non-standard token sizes. - // Also, we could implement support for Cassandra's other standard partitioners - // like RandomPartitioner or ByteOrderedPartitioner. - trace!("Couldn't parse tokens as 64-bit integers: {}, proceeding with a dummy token. If you're using a partitioner with different token size, consider migrating to murmur3", e); - vec![Token { - value: rand::thread_rng().gen::(), - }] - } - }; + let connect_port = local_address.port(); + let untranslated_address = SocketAddr::new(untranslated_ip_addr, connect_port); - Ok(Some(Peer { - host_id, - untranslated_address, - address, - tokens, - datacenter, - rack, - })) - }); + let (untranslated_address, address) = match (source, address_translator) { + (NodeInfoSource::Local, None) => { + // We need to replace rpc_address with control connection address. + (Some(untranslated_address), local_address) + } + (NodeInfoSource::Local, Some(_)) => { + // The address we used to connect is most likely different and we just don't know. + (None, local_address) + } + (NodeInfoSource::Peer, None) => { + // The usual case - no translation. + (Some(untranslated_address), untranslated_address) + } + (NodeInfoSource::Peer, Some(translator)) => { + // We use the provided translator and skip the peer if there is no rule for translating it. + ( + Some(untranslated_address), + match translator + .translate_address(&UntranslatedPeer { + host_id, + untranslated_address, + }) + .await + { + Ok(address) => address, + Err(err) => { + warn!("Could not translate address {}; TranslationError: {:?}; node therefore skipped.", + untranslated_address, err); + return Ok::, QueryError>(None); + } + }, + ) + } + }; - let peers = try_join_all(translated_peers_futures).await?; - Ok(peers.into_iter().flatten().collect()) + let tokens_str: Vec = tokens.unwrap_or_default(); + + // Parse string representation of tokens as integer values + let tokens: Vec = match tokens_str + .iter() + .map(|s| Token::from_str(s)) + .collect::, _>>() + { + Ok(parsed) => parsed, + Err(e) => { + // FIXME: we could allow the users to provide custom partitioning information + // in order for it to work with non-standard token sizes. + // Also, we could implement support for Cassandra's other standard partitioners + // like RandomPartitioner or ByteOrderedPartitioner. + trace!("Couldn't parse tokens as 64-bit integers: {}, proceeding with a dummy token. If you're using a partitioner with different token size, consider migrating to murmur3", e); + vec![Token { + value: rand::thread_rng().gen::(), + }] + } + }; + + Ok(Some(Peer { + host_id, + untranslated_address, + address, + tokens, + datacenter, + rack, + })) } async fn query_filter_keyspace_name( From b18e118f6df4732ca18457fb2a32d2665127585a Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 16 Feb 2023 13:22:53 +0100 Subject: [PATCH 09/13] topology: convert query_peers to use query_iter instead of query_all Previously, query_peers used query_all to fetch topology information and then processed it using iterators. Now, this method is changed to use query_iter and rows are processed on the fly using asynchronous streams. In order to preserve the behavior of system.peers and system.local being queried in parallel, stream::select is used which merges results of both streams as they become available. Previously, we just put results of one query after results of the other one. Because of this, the order of Peers in the final result might be slightly different on each fetch - however, this order was never specified and could actually change e.g. when the control connection changes, so I think this should be OK. --- scylla/src/transport/topology.rs | 50 +++++++++++++++----------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index 390a13147e..b1b0c5792f 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -9,8 +9,8 @@ use crate::transport::session::{AddressTranslator, IntoTypedRows}; use crate::utils::parse::{ParseErrorCause, ParseResult, ParserState}; use crate::QueryResult; -use futures::future::try_join_all; -use itertools::Itertools; +use futures::future::{self, FutureExt}; +use futures::stream::{self, StreamExt, TryStreamExt}; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; use scylla_macros::FromRow; @@ -510,42 +510,40 @@ async fn query_peers( let mut peers_query = Query::new("select host_id, rpc_address, data_center, rack, tokens from system.peers"); peers_query.set_page_size(1024); - let peers_query_future = conn.query_all(&peers_query, &[]); + let peers_query_stream = conn + .clone() + .query_iter(peers_query, &[]) + .into_stream() + .try_flatten() + .and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result))); let mut local_query = Query::new("select host_id, rpc_address, data_center, rack, tokens from system.local"); local_query.set_page_size(1024); - let local_query_future = conn.query_all(&local_query, &[]); + let local_query_stream = conn + .clone() + .query_iter(local_query, &[]) + .into_stream() + .try_flatten() + .and_then(|row_result| future::ok((NodeInfoSource::Local, row_result))); - let (peers_res, local_res) = tokio::try_join!(peers_query_future, local_query_future)?; - - let peers_rows = peers_res.rows.ok_or(QueryError::ProtocolError( - "system.peers query response was not Rows", - ))?; - - let local_rows = local_res.rows.ok_or(QueryError::ProtocolError( - "system.local query response was not Rows", - ))?; - - let typed_peers_rows = peers_rows.into_typed::(); + let untranslated_rows = stream::select(peers_query_stream, local_query_stream); let local_ip: IpAddr = conn.get_connect_address().ip(); let local_address = SocketAddr::new(local_ip, connect_port); - let typed_local_rows = local_rows.into_typed::(); - - let untranslated_rows = typed_peers_rows - .map(|res| res.map(|peer_row| (NodeInfoSource::Peer, peer_row))) - .chain(typed_local_rows.map(|res| res.map(|local_row| (NodeInfoSource::Local, local_row)))); - - let translated_peers_futures = untranslated_rows.map(|untranslated_row| async { - let (source, row) = untranslated_row.map_err( - |_| QueryError::ProtocolError("system.peers or system.local has invalid column type") - )?; + let translated_peers_futures = untranslated_rows.map(|row_result| async { + let (source, raw_row) = row_result?; + let row = raw_row.into_typed().map_err(|_| { + QueryError::ProtocolError("system.peers or system.local has invalid column type") + })?; create_peer_from_row(source, row, local_address, address_translator).await }); - let peers = try_join_all(translated_peers_futures).await?; + let peers = translated_peers_futures + .buffer_unordered(256) + .try_collect::>() + .await?; Ok(peers.into_iter().flatten().collect()) } From b8b19a7bf2b68b03b5b72b783bbe0eed3c66099b Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 16 Feb 2023 15:14:34 +0100 Subject: [PATCH 10/13] topology: get rid of other uses of query_all There are other functions in topology.rs that use query_all, but they are less complicated and changing them to use query_iter is relatively straightforward. Therefore, we do it in a single commit. --- scylla/src/transport/topology.rs | 207 +++++++++++++++++-------------- 1 file changed, 112 insertions(+), 95 deletions(-) diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index b1b0c5792f..6e3388d3c3 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -5,14 +5,16 @@ use crate::transport::connection::{Connection, ConnectionConfig}; use crate::transport::connection_pool::{NodeConnectionPool, PoolConfig, PoolSize}; use crate::transport::errors::{DbError, QueryError}; use crate::transport::host_filter::HostFilter; -use crate::transport::session::{AddressTranslator, IntoTypedRows}; +use crate::transport::session::AddressTranslator; use crate::utils::parse::{ParseErrorCause, ParseResult, ParserState}; -use crate::QueryResult; use futures::future::{self, FutureExt}; use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::Stream; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; +use scylla_cql::frame::response::result::Row; +use scylla_cql::frame::value::ValueList; use scylla_macros::FromRow; use std::borrow::BorrowMut; use std::collections::HashMap; @@ -638,24 +640,30 @@ async fn create_peer_from_row( })) } -async fn query_filter_keyspace_name( - conn: &Connection, +fn query_filter_keyspace_name( + conn: &Arc, query_str: &str, keyspaces_to_fetch: &[String], -) -> Result { +) -> impl Stream> { let keyspaces = &[keyspaces_to_fetch] as &[&[String]]; let (query_str, query_values) = if !keyspaces_to_fetch.is_empty() { (format!("{query_str} where keyspace_name in ?"), keyspaces) } else { (query_str.into(), &[] as &[&[String]]) }; + let query_values = query_values.serialized().map(|sv| sv.into_owned()); let mut query = Query::new(query_str); + let conn = conn.clone(); query.set_page_size(1024); - conn.query_all(&query, query_values).await + let fut = async move { + let query_values = query_values?; + conn.query_iter(query, query_values).await + }; + fut.into_stream().try_flatten() } async fn query_keyspaces( - conn: &Connection, + conn: &Arc, keyspaces_to_fetch: &[String], fetch_schema: bool, ) -> Result, QueryError> { @@ -663,14 +671,8 @@ async fn query_keyspaces( conn, "select keyspace_name, replication from system_schema.keyspaces", keyspaces_to_fetch, - ) - .await? - .rows - .ok_or(QueryError::ProtocolError( - "system_schema.keyspaces query response was not Rows", - ))?; - - let mut result = HashMap::with_capacity(rows.len()); + ); + let (mut all_tables, mut all_views, mut all_user_defined_types) = if fetch_schema { ( query_tables(conn, keyspaces_to_fetch).await?, @@ -681,8 +683,9 @@ async fn query_keyspaces( (HashMap::new(), HashMap::new(), HashMap::new()) }; - for row in rows.into_typed::<(String, HashMap)>() { - let (keyspace_name, strategy_map) = row.map_err(|_| { + rows.map(|row_result| { + let row = row_result?; + let (keyspace_name, strategy_map) = row.into_typed().map_err(|_| { QueryError::ProtocolError("system_schema.keyspaces has invalid column type") })?; @@ -693,39 +696,39 @@ async fn query_keyspaces( .remove(&keyspace_name) .unwrap_or_default(); - result.insert( - keyspace_name, - Keyspace { - strategy, - tables, - views, - user_defined_types, - }, - ); - } + let keyspace = Keyspace { + strategy, + tables, + views, + user_defined_types, + }; - Ok(result) + Ok((keyspace_name, keyspace)) + }) + .try_collect() + .await } async fn query_user_defined_types( - conn: &Connection, + conn: &Arc, keyspaces_to_fetch: &[String], ) -> Result>>, QueryError> { let rows = query_filter_keyspace_name( conn, "select keyspace_name, type_name, field_names, field_types from system_schema.types", keyspaces_to_fetch, - ) - .await? - .rows - .ok_or(QueryError::ProtocolError( - "system_schema.types query response was not Rows", - ))?; + ); - let mut result = HashMap::with_capacity(rows.len()); + let mut result = HashMap::new(); - for row in rows.into_typed::<(String, String, Vec, Vec)>() { - let (keyspace_name, type_name, field_names, field_types) = row.map_err(|_| { + rows.map(|row_result| { + let row = row_result?; + let (keyspace_name, type_name, field_names, field_types): ( + String, + String, + Vec, + Vec, + ) = row.into_typed().map_err(|_| { QueryError::ProtocolError("system_schema.types has invalid column type") })?; @@ -739,31 +742,30 @@ async fn query_user_defined_types( .entry(keyspace_name) .or_insert_with(HashMap::new) .insert(type_name, fields); - } + + Ok::<_, QueryError>(()) + }) + .try_for_each(|_| future::ok(())) + .await?; Ok(result) } async fn query_tables( - conn: &Connection, + conn: &Arc, keyspaces_to_fetch: &[String], ) -> Result>, QueryError> { let rows = query_filter_keyspace_name( conn, "SELECT keyspace_name, table_name FROM system_schema.tables", keyspaces_to_fetch, - ) - .await? - .rows - .ok_or(QueryError::ProtocolError( - "system_schema.tables query response was not Rows", - ))?; - - let mut result = HashMap::with_capacity(rows.len()); + ); + let mut result = HashMap::new(); let mut tables = query_tables_schema(conn, keyspaces_to_fetch).await?; - for row in rows.into_typed::<(String, String)>() { - let (keyspace_name, table_name) = row.map_err(|_| { + rows.map(|row_result| { + let row = row_result?; + let (keyspace_name, table_name) = row.into_typed().map_err(|_| { QueryError::ProtocolError("system_schema.tables has invalid column type") })?; @@ -780,31 +782,31 @@ async fn query_tables( .entry(keyspace_and_table_name.0) .or_insert_with(HashMap::new) .insert(keyspace_and_table_name.1, table); - } + + Ok::<_, QueryError>(()) + }) + .try_for_each(|_| future::ok(())) + .await?; Ok(result) } async fn query_views( - conn: &Connection, + conn: &Arc, keyspaces_to_fetch: &[String], ) -> Result>, QueryError> { let rows = query_filter_keyspace_name( conn, "SELECT keyspace_name, view_name, base_table_name FROM system_schema.views", keyspaces_to_fetch, - ) - .await? - .rows - .ok_or(QueryError::ProtocolError( - "system_schema.views query response was not Rows", - ))?; - - let mut result = HashMap::with_capacity(rows.len()); + ); + + let mut result = HashMap::new(); let mut tables = query_tables_schema(conn, keyspaces_to_fetch).await?; - for row in rows.into_typed::<(String, String, String)>() { - let (keyspace_name, view_name, base_table_name) = row.map_err(|_| { + rows.map(|row_result| { + let row = row_result?; + let (keyspace_name, view_name, base_table_name) = row.into_typed().map_err(|_| { QueryError::ProtocolError("system_schema.views has invalid column type") })?; @@ -825,13 +827,17 @@ async fn query_views( .entry(keyspace_and_view_name.0) .or_insert_with(HashMap::new) .insert(keyspace_and_view_name.1, materialized_view); - } + + Ok::<_, QueryError>(()) + }) + .try_for_each(|_| future::ok(())) + .await?; Ok(result) } async fn query_tables_schema( - conn: &Connection, + conn: &Arc, keyspaces_to_fetch: &[String], ) -> Result, QueryError> { // Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of @@ -841,23 +847,25 @@ async fn query_tables_schema( let rows = query_filter_keyspace_name(conn, "select keyspace_name, table_name, column_name, kind, position, type from system_schema.columns", keyspaces_to_fetch - ) - .await? - .rows - .ok_or(QueryError::ProtocolError( - "system_schema.columns query response was not Rows", - ))?; - - let mut tables_schema = HashMap::with_capacity(rows.len()); + ); - for row in rows.into_typed::<(String, String, String, String, i32, String)>() { - let (keyspace_name, table_name, column_name, kind, position, type_) = - row.map_err(|_| { - QueryError::ProtocolError("system_schema.columns has invalid column type") - })?; + let mut tables_schema = HashMap::new(); + + rows.map(|row_result| { + let row = row_result?; + let (keyspace_name, table_name, column_name, kind, position, type_): ( + String, + String, + String, + String, + i32, + String, + ) = row.into_typed().map_err(|_| { + QueryError::ProtocolError("system_schema.columns has invalid column type") + })?; if type_ == THRIFT_EMPTY_TYPE { - continue; + return Ok::<_, QueryError>(()); } let entry = tables_schema.entry((keyspace_name, table_name)).or_insert(( @@ -888,7 +896,11 @@ async fn query_tables_schema( kind, }, ); - } + + Ok::<_, QueryError>(()) + }) + .try_for_each(|_| future::ok(())) + .await?; let mut all_partitioners = query_table_partitioners(conn).await?; let mut result = HashMap::new(); @@ -1042,33 +1054,38 @@ fn freeze_type(type_: CqlType) -> CqlType { } async fn query_table_partitioners( - conn: &Connection, + conn: &Arc, ) -> Result>, QueryError> { let mut partitioner_query = Query::new( "select keyspace_name, table_name, partitioner from system_schema.scylla_tables", ); partitioner_query.set_page_size(1024); - let rows = match conn.query_all(&partitioner_query, &[]).await { + let rows = conn + .clone() + .query_iter(partitioner_query, &[]) + .into_stream() + .try_flatten(); + + let result = rows + .map(|row_result| { + let (keyspace_name, table_name, partitioner) = + row_result?.into_typed().map_err(|_| { + QueryError::ProtocolError("system_schema.tables has invalid column type") + })?; + Ok::<_, QueryError>(((keyspace_name, table_name), partitioner)) + }) + .try_collect::>() + .await; + + match result { // FIXME: This match catches all database errors with this error code despite the fact // that we are only interested in the ones resulting from non-existent table // system_schema.scylla_tables. // For more information please refer to https://github.com/scylladb/scylla-rust-driver/pull/349#discussion_r762050262 - Err(QueryError::DbError(DbError::Invalid, _)) => return Ok(HashMap::new()), - query_result => query_result?.rows.ok_or(QueryError::ProtocolError( - "system_schema.scylla_tables query response was not Rows", - ))?, - }; - - let mut result = HashMap::with_capacity(rows.len()); - - for row in rows.into_typed::<(String, String, Option)>() { - let (keyspace_name, table_name, partitioner) = row.map_err(|_| { - QueryError::ProtocolError("system_schema.tables has invalid column type") - })?; - result.insert((keyspace_name, table_name), partitioner); + Err(QueryError::DbError(DbError::Invalid, _)) => Ok(HashMap::new()), + result => result, } - Ok(result) } fn strategy_from_string_map( From e183a0f74cc80bb84d3ef2cfd2bfd0b825f68caa Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 16 Feb 2023 17:24:25 +0100 Subject: [PATCH 11/13] connection: get rid of execute_all This method is a sibling of query_all that was never actually used. In the future we will consider switching to prepared statements for internal queries, but 1) this will be done via not-yet-existing execute_iter and 2) we might do it by converting query_iter to use prepared statements. Therefore, there is no need to keep the dead code. This commit gets rid of execute_all and its all occurrences in the connection_query_all_execute_all_test (now renamed to: connection_query_all_test). --- scylla/src/transport/connection.rs | 105 +---------------------------- 1 file changed, 3 insertions(+), 102 deletions(-) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index cb37892a29..ef8c5ff1c7 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -521,35 +521,6 @@ impl Connection { } } - pub async fn execute_single_page( - &self, - prepared_statement: &PreparedStatement, - values: impl ValueList, - paging_state: Option, - ) -> Result { - self.execute(prepared_statement, values, paging_state) - .await? - .into_query_result() - } - - pub async fn execute( - &self, - prepared_statement: &PreparedStatement, - values: impl ValueList, - paging_state: Option, - ) -> Result { - self.execute_with_consistency( - prepared_statement, - values, - prepared_statement - .config - .determine_consistency(self.config.default_consistency), - prepared_statement.config.serial_consistency.flatten(), - paging_state, - ) - .await - } - pub async fn execute_with_consistency( &self, prepared_statement: &PreparedStatement, @@ -592,43 +563,6 @@ impl Connection { } } - /// Performs execute_single_page multiple times to fetch all available pages - #[allow(dead_code)] - pub async fn execute_all( - &self, - prepared_statement: &PreparedStatement, - values: impl ValueList, - ) -> Result { - if prepared_statement.get_page_size().is_none() { - return Err(QueryError::BadQuery(BadQuery::Other( - "Called Connection::execute_all without page size set!".to_string(), - ))); - } - - let mut final_result = QueryResult::default(); - - let serialized_values = values.serialized()?; - let mut paging_state: Option = None; - - loop { - // Send next paged query - let mut cur_result: QueryResult = self - .execute_single_page(prepared_statement, &serialized_values, paging_state) - .await?; - - // Set paging_state for the next query - paging_state = cur_result.paging_state.take(); - - // Add current query results to the final_result - final_result.merge_with_next_page_res(cur_result); - - if paging_state.is_none() { - // No more pages to query, we can return the final result - return Ok(final_result); - } - } - } - /// Executes a query and fetches its results over multiple pages, using /// the asynchronous iterator interface. pub(crate) async fn query_iter( @@ -1621,14 +1555,14 @@ mod tests { } } - /// Tests for Connection::query_all and Connection::execute_all + /// Tests for Connection::query_all /// 1. SELECT from an empty table. /// 2. Create table and insert ints 0..100. - /// Then use query_all and execute_all with page_size set to 7 to select all 100 rows. + /// Then use query_all with page_size set to 7 to select all 100 rows. /// 3. INSERT query_all should have None in result rows. /// 4. Calling query_all with a Query that doesn't have page_size set should result in an error. #[tokio::test] - async fn connection_query_all_execute_all_test() { + async fn connection_query_all_test() { let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); let addr: SocketAddr = resolve_hostname(&uri).await; @@ -1670,11 +1604,6 @@ mod tests { let empty_res = connection.query_all(&select_query, &[]).await.unwrap(); assert!(empty_res.rows.unwrap().is_empty()); - let mut prepared_select = connection.prepare(&select_query).await.unwrap(); - prepared_select.set_page_size(7); - let empty_res_prepared = connection.execute_all(&prepared_select, &[]).await.unwrap(); - assert!(empty_res_prepared.rows.unwrap().is_empty()); - // 2. Insert 100 and select using query_all with page_size 7 let values: Vec = (0..100).collect(); let mut insert_futures = Vec::new(); @@ -1698,29 +1627,10 @@ mod tests { results.sort_unstable(); // Clippy recommended to use sort_unstable instead of sort() assert_eq!(results, values); - let mut results2: Vec = connection - .execute_all(&prepared_select, &[]) - .await - .unwrap() - .rows - .unwrap() - .into_typed::<(i32,)>() - .map(|r| r.unwrap().0) - .collect(); - results2.sort_unstable(); - assert_eq!(results2, values); - // 3. INSERT query_all should have None in result rows. let insert_res1 = connection.query_all(&insert_query, (0,)).await.unwrap(); assert!(insert_res1.rows.is_none()); - let prepared_insert = connection.prepare(&insert_query).await.unwrap(); - let insert_res2 = connection - .execute_all(&prepared_insert, (0,)) - .await - .unwrap(); - assert!(insert_res2.rows.is_none(),); - // 4. Calling query_all with a Query that doesn't have page_size set should result in an error. let no_page_size_query = Query::new("SELECT p FROM connection_query_all_tab"); let no_page_res = connection.query_all(&no_page_size_query, &[]).await; @@ -1728,15 +1638,6 @@ mod tests { no_page_res, Err(QueryError::BadQuery(BadQuery::Other(_))) )); - - let prepared_no_page_size_query = connection.prepare(&no_page_size_query).await.unwrap(); - let prepared_no_page_res = connection - .execute_all(&prepared_no_page_size_query, &[]) - .await; - assert!(matches!( - prepared_no_page_res, - Err(QueryError::BadQuery(BadQuery::Other(_))) - )); } #[tokio::test] From 3cc3443e56061715bef8b7debcfcaa08460ee04c Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 16 Feb 2023 17:56:07 +0100 Subject: [PATCH 12/13] connection: change the query_all test to query_iter test The query_iter method is meant to serve similar purpose to what query_all does currently - fetch data on control connection for internal queries - so it makes sense to adapt the (only) existing test for the new method. The last assertion (4) had to be removed because the new method does not check that the page size is set by the caller. --- scylla/src/transport/connection.rs | 66 ++++++++++++++++-------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index ef8c5ff1c7..fe56577632 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1518,7 +1518,6 @@ impl VerifiedKeyspaceName { #[cfg(test)] mod tests { - use scylla_cql::errors::BadQuery; use scylla_cql::frame::protocol_features::{ LWT_OPTIMIZATION_META_BIT_MASK_KEY, SCYLLA_LWT_ADD_METADATA_MARK_EXTENSION, }; @@ -1531,12 +1530,12 @@ mod tests { use tokio::select; use tokio::sync::mpsc; - use super::super::errors::QueryError; use super::ConnectionConfig; use crate::query::Query; use crate::transport::connection::open_connection; use crate::utils::test_utils::unique_keyspace_name; - use crate::{IntoTypedRows, SessionBuilder}; + use crate::SessionBuilder; + use futures::{StreamExt, TryStreamExt}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -1555,20 +1554,20 @@ mod tests { } } - /// Tests for Connection::query_all + /// Tests for Connection::query_iter /// 1. SELECT from an empty table. /// 2. Create table and insert ints 0..100. - /// Then use query_all with page_size set to 7 to select all 100 rows. - /// 3. INSERT query_all should have None in result rows. - /// 4. Calling query_all with a Query that doesn't have page_size set should result in an error. + /// Then use query_iter with page_size set to 7 to select all 100 rows. + /// 3. INSERT query_iter should work and not return any rows. #[tokio::test] - async fn connection_query_all_test() { + async fn connection_query_iter_test() { let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); let addr: SocketAddr = resolve_hostname(&uri).await; let (connection, _) = super::open_connection(addr, None, ConnectionConfig::default()) .await .unwrap(); + let connection = Arc::new(connection); let ks = unique_keyspace_name(); @@ -1582,12 +1581,12 @@ mod tests { session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'SimpleStrategy', 'replication_factor' : 1}}", ks.clone()), &[]).await.unwrap(); session.use_keyspace(ks.clone(), false).await.unwrap(); session - .query("DROP TABLE IF EXISTS connection_query_all_tab", &[]) + .query("DROP TABLE IF EXISTS connection_query_iter_tab", &[]) .await .unwrap(); session .query( - "CREATE TABLE IF NOT EXISTS connection_query_all_tab (p int primary key)", + "CREATE TABLE IF NOT EXISTS connection_query_iter_tab (p int primary key)", &[], ) .await @@ -1600,15 +1599,22 @@ mod tests { .unwrap(); // 1. SELECT from an empty table returns query result where rows are Some(Vec::new()) - let select_query = Query::new("SELECT p FROM connection_query_all_tab").with_page_size(7); - let empty_res = connection.query_all(&select_query, &[]).await.unwrap(); - assert!(empty_res.rows.unwrap().is_empty()); + let select_query = Query::new("SELECT p FROM connection_query_iter_tab").with_page_size(7); + let empty_res = connection + .clone() + .query_iter(select_query.clone(), &[]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert!(empty_res.is_empty()); - // 2. Insert 100 and select using query_all with page_size 7 + // 2. Insert 100 and select using query_iter with page_size 7 let values: Vec = (0..100).collect(); let mut insert_futures = Vec::new(); let insert_query = - Query::new("INSERT INTO connection_query_all_tab (p) VALUES (?)").with_page_size(7); + Query::new("INSERT INTO connection_query_iter_tab (p) VALUES (?)").with_page_size(7); for v in &values { insert_futures.push(connection.query_single_page(insert_query.clone(), (v,))); } @@ -1616,28 +1622,26 @@ mod tests { futures::future::try_join_all(insert_futures).await.unwrap(); let mut results: Vec = connection - .query_all(&select_query, &[]) + .clone() + .query_iter(select_query.clone(), &[]) .await .unwrap() - .rows - .unwrap() .into_typed::<(i32,)>() - .map(|r| r.unwrap().0) - .collect(); + .map(|ret| ret.unwrap().0) + .collect::>() + .await; results.sort_unstable(); // Clippy recommended to use sort_unstable instead of sort() assert_eq!(results, values); - // 3. INSERT query_all should have None in result rows. - let insert_res1 = connection.query_all(&insert_query, (0,)).await.unwrap(); - assert!(insert_res1.rows.is_none()); - - // 4. Calling query_all with a Query that doesn't have page_size set should result in an error. - let no_page_size_query = Query::new("SELECT p FROM connection_query_all_tab"); - let no_page_res = connection.query_all(&no_page_size_query, &[]).await; - assert!(matches!( - no_page_res, - Err(QueryError::BadQuery(BadQuery::Other(_))) - )); + // 3. INSERT query_iter should work and not return any rows. + let insert_res1 = connection + .query_iter(insert_query, (0,)) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert!(insert_res1.is_empty()); } #[tokio::test] From 012caa6e0fabb213c99b808521690636fb71a723 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 16 Feb 2023 17:56:41 +0100 Subject: [PATCH 13/13] connection: remove query_all Now that we got rid of all uses of query_all in the code, we can finally get rid of it and of all the other code that depended on it - most notably, QueryResult::merge_with_next_page_res for which it won't be possible to translate it to the upcoming iterator-based deserialization interface. --- scylla/src/transport/connection.rs | 65 +--------------------------- scylla/src/transport/query_result.rs | 17 -------- 2 files changed, 1 insertion(+), 81 deletions(-) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index fe56577632..624b848f20 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -29,7 +29,7 @@ use std::{ net::{Ipv4Addr, Ipv6Addr}, }; -use super::errors::{BadKeyspaceName, BadQuery, DbError, QueryError}; +use super::errors::{BadKeyspaceName, DbError, QueryError}; use super::iterator::RowIterator; use crate::batch::{Batch, BatchStatement}; @@ -458,69 +458,6 @@ impl Connection { .await } - /// Performs query_single_page multiple times to query all available pages - pub async fn query_all( - &self, - query: &Query, - values: impl ValueList, - ) -> Result { - // This method is used only for driver internal queries, so no need to consult execution profile here. - self.query_all_with_consistency( - query, - values, - query - .config - .determine_consistency(self.config.default_consistency), - query.get_serial_consistency(), - ) - .await - } - - pub async fn query_all_with_consistency( - &self, - query: &Query, - values: impl ValueList, - consistency: Consistency, - serial_consistency: Option, - ) -> Result { - if query.get_page_size().is_none() { - // Page size should be set when someone wants to use paging - return Err(QueryError::BadQuery(BadQuery::Other( - "Called Connection::query_all without page size set!".to_string(), - ))); - } - - let mut final_result = QueryResult::default(); - - let serialized_values = values.serialized()?; - let mut paging_state: Option = None; - - loop { - // Send next paged query - let mut cur_result: QueryResult = self - .query_with_consistency( - query, - &serialized_values, - consistency, - serial_consistency, - paging_state, - ) - .await? - .into_query_result()?; - - // Set paging_state for the next query - paging_state = cur_result.paging_state.take(); - - // Add current query results to the final_result - final_result.merge_with_next_page_res(cur_result); - - if paging_state.is_none() { - // No more pages to query, we can return the final result - return Ok(final_result); - } - } - } - pub async fn execute_with_consistency( &self, prepared_statement: &PreparedStatement, diff --git a/scylla/src/transport/query_result.rs b/scylla/src/transport/query_result.rs index 426553cbaf..ca861e787b 100644 --- a/scylla/src/transport/query_result.rs +++ b/scylla/src/transport/query_result.rs @@ -131,23 +131,6 @@ impl QueryResult { .enumerate() .find(|(_id, spec)| spec.name == name) } - - /// This function is used to merge results of multiple paged queries into one.\ - /// other is the result of a new paged query.\ - /// It is merged with current result kept in self.\ - pub(crate) fn merge_with_next_page_res(&mut self, other: QueryResult) { - if let Some(other_rows) = other.rows { - match &mut self.rows { - Some(self_rows) => self_rows.extend(other_rows), - None => self.rows = Some(other_rows), - } - }; - - self.warnings.extend(other.warnings); - self.tracing_id = other.tracing_id; - self.paging_state = other.paging_state; - self.col_specs = other.col_specs; - } } /// [`QueryResult::rows()`](QueryResult::rows) or a similar function called on a bad QueryResult.\