From 24c87077d2b36d1482f3f590a3f5d5375cd8619b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 13 Aug 2024 17:21:36 +0200 Subject: [PATCH] iterator: reorder code for better grouping Now, worker-related code comes strictly before iterator/stream-related code. This locality aid readability. --- scylla/src/transport/iterator.rs | 1452 +++++++++++++++--------------- 1 file changed, 726 insertions(+), 726 deletions(-) diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index c3a81c08c..28d3c8dd3 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -52,15 +52,6 @@ macro_rules! ready_some_ok { }; } -/// Iterator over rows returned by paged queries\ -/// Allows to easily access rows without worrying about handling multiple pages -pub struct LegacyRowIterator { - current_row_idx: usize, - current_page: Rows, - page_receiver: mpsc::Receiver>, - tracing_ids: Vec, -} - struct ReceivedPage { rows: RawRows, tracing_id: Option, @@ -74,849 +65,858 @@ pub(crate) struct PreparedIteratorConfig { pub(crate) metrics: Arc, } -/// Fetching pages is asynchronous so `RowIterator` does not implement the `Iterator` trait.\ -/// Instead it uses the asynchronous `Stream` trait -impl Stream for LegacyRowIterator { - type Item = Result; +// A separate module is used here so that the parent module cannot construct +// SendAttemptedProof directly. +mod checked_channel_sender { + use scylla_cql::frame::response::result::RawRows; + use std::marker::PhantomData; + use tokio::sync::mpsc; + use uuid::Uuid; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_next_internal(cx) - } -} + use crate::transport::errors::QueryError; -impl LegacyRowIterator { - fn poll_next_internal( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - if self.as_ref().is_current_page_exhausted() { - ready_some_ok!(self.as_mut().poll_next_page(cx)); - } + use super::ReceivedPage; - let mut s = self.as_mut(); + /// A value whose existence proves that there was an attempt + /// to send an item of type T through a channel. + /// Can only be constructed by ProvingSender::send. + pub(crate) struct SendAttemptedProof(PhantomData); - let idx = s.current_row_idx; - if idx < s.current_page.rows.len() { - let row = mem::take(&mut s.current_page.rows[idx]); - s.current_row_idx += 1; - return Poll::Ready(Some(Ok(row))); + /// An mpsc::Sender which returns proofs that it attempted to send items. + pub(crate) struct ProvingSender(mpsc::Sender); + + impl From> for ProvingSender { + fn from(s: mpsc::Sender) -> Self { + Self(s) } - // We probably got a zero-sized page - // Yield, but tell that we are ready - cx.waker().wake_by_ref(); - Poll::Pending } - /// Makes an attempt to acquire the next page (which may be empty). - /// - /// On success, returns Some(Ok()). - /// On failure, returns Some(Err()). - /// If there are no more pages, returns None. - fn poll_next_page<'r>( - mut self: Pin<&'r mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let mut s = self.as_mut(); - - let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx)); - let rows = match received_page - .rows - // As RowIteratorWorker manages paging itself, the paging state response - // returned to the user is always NoMorePages. It used to be so before - // the deserialization refactor, too. - .into_legacy_rows(PagingStateResponse::NoMorePages) - { - Ok(rows) => rows, - Err(err) => return Poll::Ready(Some(Err(err.into()))), - }; - s.current_page = rows; - s.current_row_idx = 0; - - if let Some(tracing_id) = received_page.tracing_id { - s.tracing_ids.push(tracing_id); + impl ProvingSender { + pub(crate) async fn send( + &self, + value: T, + ) -> (SendAttemptedProof, Result<(), mpsc::error::SendError>) { + (SendAttemptedProof(PhantomData), self.0.send(value).await) } - - Poll::Ready(Some(Ok(()))) } - /// Converts this iterator into an iterator over rows parsed as given type - pub fn into_typed(self) -> LegacyTypedRowIterator { - LegacyTypedRowIterator { - row_iterator: self, - phantom_data: Default::default(), + type ResultPage = Result; + + impl ProvingSender { + pub(crate) async fn send_empty_page( + &self, + tracing_id: Option, + ) -> ( + SendAttemptedProof, + Result<(), mpsc::error::SendError>, + ) { + let empty_page = ReceivedPage { + rows: RawRows::mock_empty(), + tracing_id, + }; + self.send(Ok(empty_page)).await } } +} - pub(crate) async fn new_for_query( - query: Query, - execution_profile: Arc, - cluster_data: Arc, - metrics: Arc, - ) -> Result { - let (sender, receiver) = mpsc::channel(1); +use checked_channel_sender::{ProvingSender, SendAttemptedProof}; - let consistency = query - .config - .consistency - .unwrap_or(execution_profile.consistency); - let serial_consistency = query - .config - .serial_consistency - .unwrap_or(execution_profile.serial_consistency); +type PageSendAttemptedProof = SendAttemptedProof>; - let page_size = query.get_validated_page_size(); +// RowIteratorWorker works in the background to fetch pages +// RowIterator receives them through a channel +struct RowIteratorWorker<'a, QueryFunc, SpanCreatorFunc> { + sender: ProvingSender>, - let routing_info = RoutingInfo { - consistency, - serial_consistency, - ..Default::default() - }; + // Closure used to perform a single page query + // AsyncFn(Arc, Option>) -> Result + page_query: QueryFunc, - let retry_session = query - .get_retry_policy() - .map(|rp| &**rp) - .unwrap_or(&*execution_profile.retry_policy) - .new_session(); + statement_info: RoutingInfo<'a>, + query_is_idempotent: bool, + query_consistency: Consistency, + retry_session: Box, + execution_profile: Arc, + metrics: Arc, - let parent_span = tracing::Span::current(); - let worker_task = async move { - let query_ref = &query; + paging_state: PagingState, - let page_query = |connection: Arc, - consistency: Consistency, - paging_state: PagingState| { - async move { - connection - .query_raw_with_consistency( - query_ref, - consistency, - serial_consistency, - Some(page_size), - paging_state, - ) - .await - } - }; + history_listener: Option>, + current_query_id: Option, + current_attempt_id: Option, - let query_ref = &query; + parent_span: tracing::Span, + span_creator: SpanCreatorFunc, +} - let span_creator = move || { - let span = RequestSpan::new_query(&query_ref.contents); - span.record_request_size(0); - span - }; +impl RowIteratorWorker<'_, QueryFunc, SpanCreator> +where + QueryFunc: Fn(Arc, Consistency, PagingState) -> QueryFut, + QueryFut: Future>, + SpanCreator: Fn() -> RequestSpan, +{ + // Contract: this function MUST send at least one item through self.sender + async fn work(mut self, cluster_data: Arc) -> PageSendAttemptedProof { + let load_balancer = self.execution_profile.load_balancing_policy.clone(); + let statement_info = self.statement_info.clone(); + let query_plan = + load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_data); - let worker = RowIteratorWorker { - sender: sender.into(), - page_query, - statement_info: routing_info, - query_is_idempotent: query.config.is_idempotent, - query_consistency: consistency, - retry_session, - execution_profile, - metrics, - paging_state: PagingState::start(), - history_listener: query.config.history_listener.clone(), - current_query_id: None, - current_attempt_id: None, - parent_span, - span_creator, - }; + let mut last_error: QueryError = QueryError::EmptyPlan; + let mut current_consistency: Consistency = self.query_consistency; - worker.work(cluster_data).await - }; + self.log_query_start(); - Self::new_from_worker_future(worker_task, receiver).await - } + 'nodes_in_plan: for (node, shard) in query_plan { + let span = + trace_span!(parent: &self.parent_span, "Executing query", node = %node.address); + // For each node in the plan choose a connection to use + // This connection will be reused for same node retries to preserve paging cache on the shard + let connection: Arc = match node + .connection_for_shard(shard) + .instrument(span.clone()) + .await + { + Ok(connection) => connection, + Err(e) => { + trace!( + parent: &span, + error = %e, + "Choosing connection failed" + ); + last_error = e.into(); + // Broken connection doesn't count as a failed query, don't log in metrics + continue 'nodes_in_plan; + } + }; - pub(crate) async fn new_for_prepared_statement( - config: PreparedIteratorConfig, - ) -> Result { - let (sender, receiver) = mpsc::channel(1); + 'same_node_retries: loop { + trace!(parent: &span, "Execution started"); + // Query pages until an error occurs + let queries_result: Result = self + .query_pages(&connection, current_consistency, node) + .instrument(span.clone()) + .await; - let consistency = config - .prepared - .config - .consistency - .unwrap_or(config.execution_profile.consistency); - let serial_consistency = config - .prepared - .config - .serial_consistency - .unwrap_or(config.execution_profile.serial_consistency); - - let page_size = config.prepared.get_validated_page_size(); - - let retry_session = config - .prepared - .get_retry_policy() - .map(|rp| &**rp) - .unwrap_or(&*config.execution_profile.retry_policy) - .new_session(); - - let parent_span = tracing::Span::current(); - let worker_task = async move { - let prepared_ref = &config.prepared; - let values_ref = &config.values; - - let (partition_key, token) = match prepared_ref - .extract_partition_key_and_calculate_token( - prepared_ref.get_partitioner_name(), - values_ref, - ) { - Ok(res) => res.unzip(), - Err(err) => { - let (proof, _res) = ProvingSender::from(sender).send(Err(err)).await; - return proof; - } - }; - - let table_spec = config.prepared.get_table_spec(); - let statement_info = RoutingInfo { - consistency, - serial_consistency, - token, - table: table_spec, - is_confirmed_lwt: config.prepared.is_confirmed_lwt(), - }; - - let page_query = |connection: Arc, - consistency: Consistency, - paging_state: PagingState| async move { - connection - .execute_raw_with_consistency( - prepared_ref, - values_ref, - consistency, - serial_consistency, - Some(page_size), - paging_state, - ) - .await - }; - - let serialized_values_size = config.values.buffer_size(); + last_error = match queries_result { + Ok(proof) => { + trace!(parent: &span, "Query succeeded"); + // query_pages returned Ok, so we are guaranteed + // that it attempted to send at least one page + // through self.sender and we can safely return now. + return proof; + } + Err(error) => { + trace!( + parent: &span, + error = %error, + "Query failed" + ); + error + } + }; - let replicas: Option> = - if let (Some(table_spec), Some(token)) = - (statement_info.table, statement_info.token) - { - Some( - config - .cluster_data - .get_token_endpoints_iter(table_spec, token) - .map(|(node, shard)| (node.clone(), shard)) - .collect(), - ) - } else { - None + // Use retry policy to decide what to do next + let query_info = QueryInfo { + error: &last_error, + is_idempotent: self.query_is_idempotent, + consistency: self.query_consistency, }; - let span_creator = move || { - let span = RequestSpan::new_prepared( - partition_key.as_ref().map(|pk| pk.iter()), - token, - serialized_values_size, + let retry_decision = self.retry_session.decide_should_retry(query_info); + trace!( + parent: &span, + retry_decision = format!("{:?}", retry_decision).as_str() ); - if let Some(replicas) = replicas.as_ref() { - span.record_replicas(replicas); - } - span - }; - - let worker = RowIteratorWorker { - sender: sender.into(), - page_query, - statement_info, - query_is_idempotent: config.prepared.config.is_idempotent, - query_consistency: consistency, - retry_session, - execution_profile: config.execution_profile, - metrics: config.metrics, - paging_state: PagingState::start(), - history_listener: config.prepared.config.history_listener.clone(), - current_query_id: None, - current_attempt_id: None, - parent_span, - span_creator, - }; + self.log_attempt_error(&last_error, &retry_decision); + match retry_decision { + RetryDecision::RetrySameNode(cl) => { + self.metrics.inc_retries_num(); + current_consistency = cl.unwrap_or(current_consistency); + continue 'same_node_retries; + } + RetryDecision::RetryNextNode(cl) => { + self.metrics.inc_retries_num(); + current_consistency = cl.unwrap_or(current_consistency); + continue 'nodes_in_plan; + } + RetryDecision::DontRetry => break 'nodes_in_plan, + RetryDecision::IgnoreWriteError => { + warn!("Ignoring error during fetching pages; stopping fetching."); + // If we are here then, most likely, we didn't send + // anything through the self.sender channel. + // Although we are in an awkward situation (_iter + // interface isn't meant for sending writes), + // we must attempt to send something because + // the iterator expects it. + let (proof, _) = self.sender.send_empty_page(None).await; + return proof; + } + }; + } + } - worker.work(config.cluster_data).await - }; + // Send last_error to RowIterator - query failed fully + self.log_query_error(&last_error); + let (proof, _) = self.sender.send(Err(last_error)).await; + proof + } - Self::new_from_worker_future(worker_task, receiver).await + // Given a working connection query as many pages as possible until the first error. + // + // Contract: this function must either: + // - Return an error + // - Return Ok but have attempted to send a page via self.sender + async fn query_pages( + &mut self, + connection: &Arc, + consistency: Consistency, + node: NodeRef<'_>, + ) -> Result { + loop { + let request_span = (self.span_creator)(); + match self + .query_one_page(connection, consistency, node, &request_span) + .instrument(request_span.span().clone()) + .await? + { + ControlFlow::Break(proof) => return Ok(proof), + ControlFlow::Continue(_) => {} + } + } } - pub(crate) async fn new_for_connection_query_iter( - query: Query, - connection: Arc, + async fn query_one_page( + &mut self, + connection: &Arc, consistency: Consistency, - serial_consistency: Option, - ) -> Result { - let (sender, receiver) = mpsc::channel::>(1); + node: NodeRef<'_>, + request_span: &RequestSpan, + ) -> Result, QueryError> { + self.metrics.inc_total_paged_queries(); + let query_start = std::time::Instant::now(); - let page_size = query.get_validated_page_size(); + trace!( + connection = %connection.get_connect_address(), + "Sending" + ); + self.log_attempt_start(connection.get_connect_address()); - let worker_task = async move { - let worker = SingleConnectionRowIteratorWorker { - sender: sender.into(), - fetcher: |paging_state| { - connection.query_raw_with_consistency( - &query, - consistency, - serial_consistency, - Some(page_size), - paging_state, - ) - }, - }; - worker.work().await - }; + let query_response = + (self.page_query)(connection.clone(), consistency, self.paging_state.clone()) + .await + .and_then(QueryResponse::into_non_error_query_response); - Self::new_from_worker_future(worker_task, receiver).await - } + let elapsed = query_start.elapsed(); - pub(crate) async fn new_for_connection_execute_iter( - prepared: PreparedStatement, - values: SerializedValues, - connection: Arc, - consistency: Consistency, - serial_consistency: Option, - ) -> Result { - let (sender, receiver) = mpsc::channel::>(1); + request_span.record_shard_id(connection); - let page_size = prepared.get_validated_page_size(); + match query_response { + Ok(NonErrorQueryResponse { + response: + NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))), + tracing_id, + .. + }) => { + let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); + self.log_attempt_success(); + self.log_query_success(); + self.execution_profile + .load_balancing_policy + .on_query_success(&self.statement_info, elapsed, node); - let worker_task = async move { - let worker = SingleConnectionRowIteratorWorker { - sender: sender.into(), - fetcher: |paging_state| { - connection.execute_raw_with_consistency( - &prepared, - &values, - consistency, - serial_consistency, - Some(page_size), - paging_state, - ) - }, - }; - worker.work().await - }; + let received_page = ReceivedPage { rows, tracing_id }; - Self::new_from_worker_future(worker_task, receiver).await - } + // Send next page to RowIterator + let (proof, res) = self.sender.send(Ok(received_page)).await; + if res.is_err() { + // channel was closed, RowIterator was dropped - should shutdown + return Ok(ControlFlow::Break(proof)); + } - async fn new_from_worker_future( - worker_task: impl Future + Send + 'static, - mut receiver: mpsc::Receiver>, - ) -> Result { - tokio::task::spawn(worker_task); + match paging_state_response.into_paging_control_flow() { + ControlFlow::Continue(paging_state) => { + self.paging_state = paging_state; + } + ControlFlow::Break(()) => { + // Reached the last query, shutdown + return Ok(ControlFlow::Break(proof)); + } + } - // This unwrap is safe because: - // - The future returned by worker.work sends at least one item - // to the channel (the PageSendAttemptedProof helps enforce this) - // - That future is polled in a tokio::task which isn't going to be - // cancelled - let pages_received = receiver.recv().await.unwrap()?; - let rows = pages_received - .rows - .into_legacy_rows(PagingStateResponse::NoMorePages)?; + // Query succeeded, reset retry policy for future retries + self.retry_session.reset(); + self.log_query_start(); - Ok(Self { - current_row_idx: 0, - current_page: rows, - page_receiver: receiver, - tracing_ids: if let Some(tracing_id) = pages_received.tracing_id { - vec![tracing_id] - } else { - Vec::new() - }, - }) - } + Ok(ControlFlow::Continue(())) + } + Err(err) => { + let err = err.into(); + self.metrics.inc_failed_paged_queries(); + self.execution_profile + .load_balancing_policy + .on_query_failure(&self.statement_info, elapsed, node, &err); + Err(err) + } + Ok(NonErrorQueryResponse { + response: NonErrorResponse::Result(_), + tracing_id, + .. + }) => { + // We have most probably sent a modification statement (e.g. INSERT or UPDATE), + // so let's return an empty iterator as suggested in #631. - /// If tracing was enabled returns tracing ids of all finished page queries - pub fn get_tracing_ids(&self) -> &[Uuid] { - &self.tracing_ids + // We must attempt to send something because the iterator expects it. + let (proof, _) = self.sender.send_empty_page(tracing_id).await; + Ok(ControlFlow::Break(proof)) + } + Ok(response) => { + self.metrics.inc_failed_paged_queries(); + let err = + ProtocolError::UnexpectedResponse(response.response.to_response_kind()).into(); + self.execution_profile + .load_balancing_policy + .on_query_failure(&self.statement_info, elapsed, node, &err); + Err(err) + } + } } - /// Returns specification of row columns - pub fn get_column_specs(&self) -> &[ColumnSpec] { - self.current_page.metadata.col_specs() - } + fn log_query_start(&mut self) { + let history_listener: &dyn HistoryListener = match &self.history_listener { + Some(hl) => &**hl, + None => return, + }; - fn is_current_page_exhausted(&self) -> bool { - self.current_row_idx >= self.current_page.rows.len() + self.current_query_id = Some(history_listener.log_query_start()); } -} - -// A separate module is used here so that the parent module cannot construct -// SendAttemptedProof directly. -mod checked_channel_sender { - use scylla_cql::frame::response::result::RawRows; - use std::marker::PhantomData; - use tokio::sync::mpsc; - use uuid::Uuid; - use crate::transport::errors::QueryError; + fn log_query_success(&mut self) { + let history_listener: &dyn HistoryListener = match &self.history_listener { + Some(hl) => &**hl, + None => return, + }; - use super::ReceivedPage; + let query_id: history::QueryId = match &self.current_query_id { + Some(id) => *id, + None => return, + }; - /// A value whose existence proves that there was an attempt - /// to send an item of type T through a channel. - /// Can only be constructed by ProvingSender::send. - pub(crate) struct SendAttemptedProof(PhantomData); + history_listener.log_query_success(query_id); + } - /// An mpsc::Sender which returns proofs that it attempted to send items. - pub(crate) struct ProvingSender(mpsc::Sender); + fn log_query_error(&mut self, error: &QueryError) { + let history_listener: &dyn HistoryListener = match &self.history_listener { + Some(hl) => &**hl, + None => return, + }; - impl From> for ProvingSender { - fn from(s: mpsc::Sender) -> Self { - Self(s) - } - } + let query_id: history::QueryId = match &self.current_query_id { + Some(id) => *id, + None => return, + }; - impl ProvingSender { - pub(crate) async fn send( - &self, - value: T, - ) -> (SendAttemptedProof, Result<(), mpsc::error::SendError>) { - (SendAttemptedProof(PhantomData), self.0.send(value).await) - } + history_listener.log_query_error(query_id, error); } - type ResultPage = Result; + fn log_attempt_start(&mut self, node_addr: SocketAddr) { + let history_listener: &dyn HistoryListener = match &self.history_listener { + Some(hl) => &**hl, + None => return, + }; - impl ProvingSender { - pub(crate) async fn send_empty_page( - &self, - tracing_id: Option, - ) -> ( - SendAttemptedProof, - Result<(), mpsc::error::SendError>, - ) { - let empty_page = ReceivedPage { - rows: RawRows::mock_empty(), - tracing_id, - }; - self.send(Ok(empty_page)).await - } - } -} + let query_id: history::QueryId = match &self.current_query_id { + Some(id) => *id, + None => return, + }; -use checked_channel_sender::{ProvingSender, SendAttemptedProof}; + self.current_attempt_id = + Some(history_listener.log_attempt_start(query_id, None, node_addr)); + } -type PageSendAttemptedProof = SendAttemptedProof>; + fn log_attempt_success(&mut self) { + let history_listener: &dyn HistoryListener = match &self.history_listener { + Some(hl) => &**hl, + None => return, + }; -// RowIteratorWorker works in the background to fetch pages -// RowIterator receives them through a channel -struct RowIteratorWorker<'a, QueryFunc, SpanCreatorFunc> { - sender: ProvingSender>, + let attempt_id: history::AttemptId = match &self.current_attempt_id { + Some(id) => *id, + None => return, + }; - // Closure used to perform a single page query - // AsyncFn(Arc, Option>) -> Result - page_query: QueryFunc, + history_listener.log_attempt_success(attempt_id); + } - statement_info: RoutingInfo<'a>, - query_is_idempotent: bool, - query_consistency: Consistency, - retry_session: Box, - execution_profile: Arc, - metrics: Arc, + fn log_attempt_error(&mut self, error: &QueryError, retry_decision: &RetryDecision) { + let history_listener: &dyn HistoryListener = match &self.history_listener { + Some(hl) => &**hl, + None => return, + }; - paging_state: PagingState, + let attempt_id: history::AttemptId = match &self.current_attempt_id { + Some(id) => *id, + None => return, + }; - history_listener: Option>, - current_query_id: Option, - current_attempt_id: Option, + history_listener.log_attempt_error(attempt_id, error, retry_decision); + } +} - parent_span: tracing::Span, - span_creator: SpanCreatorFunc, +/// A massively simplified version of the RowIteratorWorker. It does not have +/// any complicated logic related to retries, it just fetches pages from +/// a single connection. +struct SingleConnectionRowIteratorWorker { + sender: ProvingSender>, + fetcher: Fetcher, } -impl RowIteratorWorker<'_, QueryFunc, SpanCreator> +impl SingleConnectionRowIteratorWorker where - QueryFunc: Fn(Arc, Consistency, PagingState) -> QueryFut, - QueryFut: Future>, - SpanCreator: Fn() -> RequestSpan, + Fetcher: Fn(PagingState) -> FetchFut + Send + Sync, + FetchFut: Future> + Send, { - // Contract: this function MUST send at least one item through self.sender - async fn work(mut self, cluster_data: Arc) -> PageSendAttemptedProof { - let load_balancer = self.execution_profile.load_balancing_policy.clone(); - let statement_info = self.statement_info.clone(); - let query_plan = - load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_data); + async fn work(mut self) -> PageSendAttemptedProof { + match self.do_work().await { + Ok(proof) => proof, + Err(err) => { + let (proof, _) = self.sender.send(Err(err)).await; + proof + } + } + } - let mut last_error: QueryError = QueryError::EmptyPlan; - let mut current_consistency: Consistency = self.query_consistency; + async fn do_work(&mut self) -> Result { + let mut paging_state = PagingState::start(); + loop { + let result = (self.fetcher)(paging_state).await?; + let response = result.into_non_error_query_response()?; + match response.response { + NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))) => { + let (proof, send_result) = self + .sender + .send(Ok(ReceivedPage { + rows, + tracing_id: response.tracing_id, + })) + .await; - self.log_query_start(); + if send_result.is_err() { + // channel was closed, RowIterator was dropped - should shutdown + return Ok(proof); + } - 'nodes_in_plan: for (node, shard) in query_plan { - let span = - trace_span!(parent: &self.parent_span, "Executing query", node = %node.address); - // For each node in the plan choose a connection to use - // This connection will be reused for same node retries to preserve paging cache on the shard - let connection: Arc = match node - .connection_for_shard(shard) - .instrument(span.clone()) - .await - { - Ok(connection) => connection, - Err(e) => { - trace!( - parent: &span, - error = %e, - "Choosing connection failed" - ); - last_error = e.into(); - // Broken connection doesn't count as a failed query, don't log in metrics - continue 'nodes_in_plan; + match paging_state_response.into_paging_control_flow() { + ControlFlow::Continue(new_paging_state) => { + paging_state = new_paging_state; + } + ControlFlow::Break(()) => { + // Reached the last query, shutdown + return Ok(proof); + } + } } - }; + NonErrorResponse::Result(_) => { + // We have most probably sent a modification statement (e.g. INSERT or UPDATE), + // so let's return an empty iterator as suggested in #631. - 'same_node_retries: loop { - trace!(parent: &span, "Execution started"); - // Query pages until an error occurs - let queries_result: Result = self - .query_pages(&connection, current_consistency, node) - .instrument(span.clone()) - .await; + // We must attempt to send something because the iterator expects it. + let (proof, _) = self.sender.send_empty_page(response.tracing_id).await; + return Ok(proof); + } + _ => { + return Err(ProtocolError::UnexpectedResponse( + response.response.to_response_kind(), + ) + .into()); + } + } + } + } +} - last_error = match queries_result { - Ok(proof) => { - trace!(parent: &span, "Query succeeded"); - // query_pages returned Ok, so we are guaranteed - // that it attempted to send at least one page - // through self.sender and we can safely return now. - return proof; - } - Err(error) => { - trace!( - parent: &span, - error = %error, - "Query failed" - ); - error - } - }; +/// Iterator over rows returned by paged queries\ +/// Allows to easily access rows without worrying about handling multiple pages +pub struct LegacyRowIterator { + current_row_idx: usize, + current_page: Rows, + page_receiver: mpsc::Receiver>, + tracing_ids: Vec, +} - // Use retry policy to decide what to do next - let query_info = QueryInfo { - error: &last_error, - is_idempotent: self.query_is_idempotent, - consistency: self.query_consistency, - }; +/// Fetching pages is asynchronous so `RowIterator` does not implement the `Iterator` trait.\ +/// Instead it uses the asynchronous `Stream` trait +impl Stream for LegacyRowIterator { + type Item = Result; - let retry_decision = self.retry_session.decide_should_retry(query_info); - trace!( - parent: &span, - retry_decision = format!("{:?}", retry_decision).as_str() - ); - self.log_attempt_error(&last_error, &retry_decision); - match retry_decision { - RetryDecision::RetrySameNode(cl) => { - self.metrics.inc_retries_num(); - current_consistency = cl.unwrap_or(current_consistency); - continue 'same_node_retries; - } - RetryDecision::RetryNextNode(cl) => { - self.metrics.inc_retries_num(); - current_consistency = cl.unwrap_or(current_consistency); - continue 'nodes_in_plan; - } - RetryDecision::DontRetry => break 'nodes_in_plan, - RetryDecision::IgnoreWriteError => { - warn!("Ignoring error during fetching pages; stopping fetching."); - // If we are here then, most likely, we didn't send - // anything through the self.sender channel. - // Although we are in an awkward situation (_iter - // interface isn't meant for sending writes), - // we must attempt to send something because - // the iterator expects it. - let (proof, _) = self.sender.send_empty_page(None).await; - return proof; - } - }; - } + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_next_internal(cx) + } +} + +impl LegacyRowIterator { + fn poll_next_internal( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + if self.as_ref().is_current_page_exhausted() { + ready_some_ok!(self.as_mut().poll_next_page(cx)); + } + + let mut s = self.as_mut(); + + let idx = s.current_row_idx; + if idx < s.current_page.rows.len() { + let row = mem::take(&mut s.current_page.rows[idx]); + s.current_row_idx += 1; + return Poll::Ready(Some(Ok(row))); + } + // We probably got a zero-sized page + // Yield, but tell that we are ready + cx.waker().wake_by_ref(); + Poll::Pending + } + + /// Makes an attempt to acquire the next page (which may be empty). + /// + /// On success, returns Some(Ok()). + /// On failure, returns Some(Err()). + /// If there are no more pages, returns None. + fn poll_next_page<'r>( + mut self: Pin<&'r mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut s = self.as_mut(); + + let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx)); + let rows = match received_page + .rows + // As RowIteratorWorker manages paging itself, the paging state response + // returned to the user is always NoMorePages. It used to be so before + // the deserialization refactor, too. + .into_legacy_rows(PagingStateResponse::NoMorePages) + { + Ok(rows) => rows, + Err(err) => return Poll::Ready(Some(Err(err.into()))), + }; + s.current_page = rows; + s.current_row_idx = 0; + + if let Some(tracing_id) = received_page.tracing_id { + s.tracing_ids.push(tracing_id); } - // Send last_error to RowIterator - query failed fully - self.log_query_error(&last_error); - let (proof, _) = self.sender.send(Err(last_error)).await; - proof + Poll::Ready(Some(Ok(()))) } - // Given a working connection query as many pages as possible until the first error. - // - // Contract: this function must either: - // - Return an error - // - Return Ok but have attempted to send a page via self.sender - async fn query_pages( - &mut self, - connection: &Arc, - consistency: Consistency, - node: NodeRef<'_>, - ) -> Result { - loop { - let request_span = (self.span_creator)(); - match self - .query_one_page(connection, consistency, node, &request_span) - .instrument(request_span.span().clone()) - .await? - { - ControlFlow::Break(proof) => return Ok(proof), - ControlFlow::Continue(_) => {} - } + /// Converts this iterator into an iterator over rows parsed as given type + pub fn into_typed(self) -> LegacyTypedRowIterator { + LegacyTypedRowIterator { + row_iterator: self, + phantom_data: Default::default(), } } - async fn query_one_page( - &mut self, - connection: &Arc, - consistency: Consistency, - node: NodeRef<'_>, - request_span: &RequestSpan, - ) -> Result, QueryError> { - self.metrics.inc_total_paged_queries(); - let query_start = std::time::Instant::now(); - - trace!( - connection = %connection.get_connect_address(), - "Sending" - ); - self.log_attempt_start(connection.get_connect_address()); - - let query_response = - (self.page_query)(connection.clone(), consistency, self.paging_state.clone()) - .await - .and_then(QueryResponse::into_non_error_query_response); + pub(crate) async fn new_for_query( + query: Query, + execution_profile: Arc, + cluster_data: Arc, + metrics: Arc, + ) -> Result { + let (sender, receiver) = mpsc::channel(1); - let elapsed = query_start.elapsed(); + let consistency = query + .config + .consistency + .unwrap_or(execution_profile.consistency); + let serial_consistency = query + .config + .serial_consistency + .unwrap_or(execution_profile.serial_consistency); - request_span.record_shard_id(connection); + let page_size = query.get_validated_page_size(); - match query_response { - Ok(NonErrorQueryResponse { - response: - NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))), - tracing_id, - .. - }) => { - let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); - self.log_attempt_success(); - self.log_query_success(); - self.execution_profile - .load_balancing_policy - .on_query_success(&self.statement_info, elapsed, node); + let routing_info = RoutingInfo { + consistency, + serial_consistency, + ..Default::default() + }; - let received_page = ReceivedPage { rows, tracing_id }; + let retry_session = query + .get_retry_policy() + .map(|rp| &**rp) + .unwrap_or(&*execution_profile.retry_policy) + .new_session(); - // Send next page to RowIterator - let (proof, res) = self.sender.send(Ok(received_page)).await; - if res.is_err() { - // channel was closed, RowIterator was dropped - should shutdown - return Ok(ControlFlow::Break(proof)); - } + let parent_span = tracing::Span::current(); + let worker_task = async move { + let query_ref = &query; - match paging_state_response.into_paging_control_flow() { - ControlFlow::Continue(paging_state) => { - self.paging_state = paging_state; - } - ControlFlow::Break(()) => { - // Reached the last query, shutdown - return Ok(ControlFlow::Break(proof)); - } + let page_query = |connection: Arc, + consistency: Consistency, + paging_state: PagingState| { + async move { + connection + .query_raw_with_consistency( + query_ref, + consistency, + serial_consistency, + Some(page_size), + paging_state, + ) + .await } + }; - // Query succeeded, reset retry policy for future retries - self.retry_session.reset(); - self.log_query_start(); + let query_ref = &query; - Ok(ControlFlow::Continue(())) - } - Err(err) => { - let err = err.into(); - self.metrics.inc_failed_paged_queries(); - self.execution_profile - .load_balancing_policy - .on_query_failure(&self.statement_info, elapsed, node, &err); - Err(err) - } - Ok(NonErrorQueryResponse { - response: NonErrorResponse::Result(_), - tracing_id, - .. - }) => { - // We have most probably sent a modification statement (e.g. INSERT or UPDATE), - // so let's return an empty iterator as suggested in #631. + let span_creator = move || { + let span = RequestSpan::new_query(&query_ref.contents); + span.record_request_size(0); + span + }; - // We must attempt to send something because the iterator expects it. - let (proof, _) = self.sender.send_empty_page(tracing_id).await; - Ok(ControlFlow::Break(proof)) - } - Ok(response) => { - self.metrics.inc_failed_paged_queries(); - let err = - ProtocolError::UnexpectedResponse(response.response.to_response_kind()).into(); - self.execution_profile - .load_balancing_policy - .on_query_failure(&self.statement_info, elapsed, node, &err); - Err(err) - } - } - } + let worker = RowIteratorWorker { + sender: sender.into(), + page_query, + statement_info: routing_info, + query_is_idempotent: query.config.is_idempotent, + query_consistency: consistency, + retry_session, + execution_profile, + metrics, + paging_state: PagingState::start(), + history_listener: query.config.history_listener.clone(), + current_query_id: None, + current_attempt_id: None, + parent_span, + span_creator, + }; - fn log_query_start(&mut self) { - let history_listener: &dyn HistoryListener = match &self.history_listener { - Some(hl) => &**hl, - None => return, + worker.work(cluster_data).await }; - self.current_query_id = Some(history_listener.log_query_start()); + Self::new_from_worker_future(worker_task, receiver).await } - fn log_query_success(&mut self) { - let history_listener: &dyn HistoryListener = match &self.history_listener { - Some(hl) => &**hl, - None => return, - }; + pub(crate) async fn new_for_prepared_statement( + config: PreparedIteratorConfig, + ) -> Result { + let (sender, receiver) = mpsc::channel(1); + + let consistency = config + .prepared + .config + .consistency + .unwrap_or(config.execution_profile.consistency); + let serial_consistency = config + .prepared + .config + .serial_consistency + .unwrap_or(config.execution_profile.serial_consistency); + + let page_size = config.prepared.get_validated_page_size(); + + let retry_session = config + .prepared + .get_retry_policy() + .map(|rp| &**rp) + .unwrap_or(&*config.execution_profile.retry_policy) + .new_session(); + + let parent_span = tracing::Span::current(); + let worker_task = async move { + let prepared_ref = &config.prepared; + let values_ref = &config.values; + + let (partition_key, token) = match prepared_ref + .extract_partition_key_and_calculate_token( + prepared_ref.get_partitioner_name(), + values_ref, + ) { + Ok(res) => res.unzip(), + Err(err) => { + let (proof, _res) = ProvingSender::from(sender).send(Err(err)).await; + return proof; + } + }; + + let table_spec = config.prepared.get_table_spec(); + let statement_info = RoutingInfo { + consistency, + serial_consistency, + token, + table: table_spec, + is_confirmed_lwt: config.prepared.is_confirmed_lwt(), + }; + + let page_query = |connection: Arc, + consistency: Consistency, + paging_state: PagingState| async move { + connection + .execute_raw_with_consistency( + prepared_ref, + values_ref, + consistency, + serial_consistency, + Some(page_size), + paging_state, + ) + .await + }; + + let serialized_values_size = config.values.buffer_size(); - let query_id: history::QueryId = match &self.current_query_id { - Some(id) => *id, - None => return, - }; + let replicas: Option> = + if let (Some(table_spec), Some(token)) = + (statement_info.table, statement_info.token) + { + Some( + config + .cluster_data + .get_token_endpoints_iter(table_spec, token) + .map(|(node, shard)| (node.clone(), shard)) + .collect(), + ) + } else { + None + }; - history_listener.log_query_success(query_id); - } + let span_creator = move || { + let span = RequestSpan::new_prepared( + partition_key.as_ref().map(|pk| pk.iter()), + token, + serialized_values_size, + ); + if let Some(replicas) = replicas.as_ref() { + span.record_replicas(replicas); + } + span + }; - fn log_query_error(&mut self, error: &QueryError) { - let history_listener: &dyn HistoryListener = match &self.history_listener { - Some(hl) => &**hl, - None => return, - }; + let worker = RowIteratorWorker { + sender: sender.into(), + page_query, + statement_info, + query_is_idempotent: config.prepared.config.is_idempotent, + query_consistency: consistency, + retry_session, + execution_profile: config.execution_profile, + metrics: config.metrics, + paging_state: PagingState::start(), + history_listener: config.prepared.config.history_listener.clone(), + current_query_id: None, + current_attempt_id: None, + parent_span, + span_creator, + }; - let query_id: history::QueryId = match &self.current_query_id { - Some(id) => *id, - None => return, + worker.work(config.cluster_data).await }; - history_listener.log_query_error(query_id, error); + Self::new_from_worker_future(worker_task, receiver).await } - fn log_attempt_start(&mut self, node_addr: SocketAddr) { - let history_listener: &dyn HistoryListener = match &self.history_listener { - Some(hl) => &**hl, - None => return, - }; + pub(crate) async fn new_for_connection_query_iter( + query: Query, + connection: Arc, + consistency: Consistency, + serial_consistency: Option, + ) -> Result { + let (sender, receiver) = mpsc::channel::>(1); - let query_id: history::QueryId = match &self.current_query_id { - Some(id) => *id, - None => return, + let page_size = query.get_validated_page_size(); + + let worker_task = async move { + let worker = SingleConnectionRowIteratorWorker { + sender: sender.into(), + fetcher: |paging_state| { + connection.query_raw_with_consistency( + &query, + consistency, + serial_consistency, + Some(page_size), + paging_state, + ) + }, + }; + worker.work().await }; - self.current_attempt_id = - Some(history_listener.log_attempt_start(query_id, None, node_addr)); + Self::new_from_worker_future(worker_task, receiver).await } - fn log_attempt_success(&mut self) { - let history_listener: &dyn HistoryListener = match &self.history_listener { - Some(hl) => &**hl, - None => return, - }; + pub(crate) async fn new_for_connection_execute_iter( + prepared: PreparedStatement, + values: SerializedValues, + connection: Arc, + consistency: Consistency, + serial_consistency: Option, + ) -> Result { + let (sender, receiver) = mpsc::channel::>(1); - let attempt_id: history::AttemptId = match &self.current_attempt_id { - Some(id) => *id, - None => return, + let page_size = prepared.get_validated_page_size(); + + let worker_task = async move { + let worker = SingleConnectionRowIteratorWorker { + sender: sender.into(), + fetcher: |paging_state| { + connection.execute_raw_with_consistency( + &prepared, + &values, + consistency, + serial_consistency, + Some(page_size), + paging_state, + ) + }, + }; + worker.work().await }; - history_listener.log_attempt_success(attempt_id); + Self::new_from_worker_future(worker_task, receiver).await } - fn log_attempt_error(&mut self, error: &QueryError, retry_decision: &RetryDecision) { - let history_listener: &dyn HistoryListener = match &self.history_listener { - Some(hl) => &**hl, - None => return, - }; + async fn new_from_worker_future( + worker_task: impl Future + Send + 'static, + mut receiver: mpsc::Receiver>, + ) -> Result { + tokio::task::spawn(worker_task); - let attempt_id: history::AttemptId = match &self.current_attempt_id { - Some(id) => *id, - None => return, - }; + // This unwrap is safe because: + // - The future returned by worker.work sends at least one item + // to the channel (the PageSendAttemptedProof helps enforce this) + // - That future is polled in a tokio::task which isn't going to be + // cancelled + let pages_received = receiver.recv().await.unwrap()?; + let rows = pages_received + .rows + .into_legacy_rows(PagingStateResponse::NoMorePages)?; - history_listener.log_attempt_error(attempt_id, error, retry_decision); + Ok(Self { + current_row_idx: 0, + current_page: rows, + page_receiver: receiver, + tracing_ids: if let Some(tracing_id) = pages_received.tracing_id { + vec![tracing_id] + } else { + Vec::new() + }, + }) } -} - -/// A massively simplified version of the RowIteratorWorker. It does not have -/// any complicated logic related to retries, it just fetches pages from -/// a single connection. -struct SingleConnectionRowIteratorWorker { - sender: ProvingSender>, - fetcher: Fetcher, -} -impl SingleConnectionRowIteratorWorker -where - Fetcher: Fn(PagingState) -> FetchFut + Send + Sync, - FetchFut: Future> + Send, -{ - async fn work(mut self) -> PageSendAttemptedProof { - match self.do_work().await { - Ok(proof) => proof, - Err(err) => { - let (proof, _) = self.sender.send(Err(err)).await; - proof - } - } + /// If tracing was enabled returns tracing ids of all finished page queries + pub fn get_tracing_ids(&self) -> &[Uuid] { + &self.tracing_ids } - async fn do_work(&mut self) -> Result { - let mut paging_state = PagingState::start(); - loop { - let result = (self.fetcher)(paging_state).await?; - let response = result.into_non_error_query_response()?; - match response.response { - NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))) => { - let (proof, send_result) = self - .sender - .send(Ok(ReceivedPage { - rows, - tracing_id: response.tracing_id, - })) - .await; - - if send_result.is_err() { - // channel was closed, RowIterator was dropped - should shutdown - return Ok(proof); - } - - match paging_state_response.into_paging_control_flow() { - ControlFlow::Continue(new_paging_state) => { - paging_state = new_paging_state; - } - ControlFlow::Break(()) => { - // Reached the last query, shutdown - return Ok(proof); - } - } - } - NonErrorResponse::Result(_) => { - // We have most probably sent a modification statement (e.g. INSERT or UPDATE), - // so let's return an empty iterator as suggested in #631. + /// Returns specification of row columns + pub fn get_column_specs(&self) -> &[ColumnSpec] { + self.current_page.metadata.col_specs() + } - // We must attempt to send something because the iterator expects it. - let (proof, _) = self.sender.send_empty_page(response.tracing_id).await; - return Ok(proof); - } - _ => { - return Err(ProtocolError::UnexpectedResponse( - response.response.to_response_kind(), - ) - .into()); - } - } - } + fn is_current_page_exhausted(&self) -> bool { + self.current_row_idx >= self.current_page.rows.len() } }