diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 92918281d..cb51471b0 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -127,9 +127,9 @@ use checked_channel_sender::{ProvingSender, SendAttemptedProof}; type PageSendAttemptedProof = SendAttemptedProof>; -// RowIteratorWorker works in the background to fetch pages -// RowIterator receives them through a channel -struct RowIteratorWorker<'a, QueryFunc, SpanCreatorFunc> { +// PagerWorker works in the background to fetch pages +// QueryPager receives them through a channel +struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> { sender: ProvingSender>, // Closure used to perform a single page query @@ -153,7 +153,7 @@ struct RowIteratorWorker<'a, QueryFunc, SpanCreatorFunc> { span_creator: SpanCreatorFunc, } -impl RowIteratorWorker<'_, QueryFunc, SpanCreator> +impl PagerWorker<'_, QueryFunc, SpanCreator> where QueryFunc: Fn(Arc, Consistency, PagingState) -> QueryFut, QueryFut: Future>, @@ -260,7 +260,7 @@ where } } - // Send last_error to RowIterator - query failed fully + // Send last_error to QueryPager - query failed fully self.log_query_error(&last_error); let (proof, _) = self.sender.send(Err(last_error)).await; proof @@ -333,10 +333,10 @@ where let received_page = ReceivedPage { rows, tracing_id }; - // Send next page to RowIterator + // Send next page to QueryPager let (proof, res) = self.sender.send(Ok(received_page)).await; if res.is_err() { - // channel was closed, RowIterator was dropped - should shutdown + // channel was closed, QueryPager was dropped - should shutdown return Ok(ControlFlow::Break(proof)); } @@ -469,15 +469,15 @@ where } } -/// A massively simplified version of the RowIteratorWorker. It does not have +/// A massively simplified version of the PagerWorker. It does not have /// any complicated logic related to retries, it just fetches pages from /// a single connection. -struct SingleConnectionRowIteratorWorker { +struct SingleConnectionPagerWorker { sender: ProvingSender>, fetcher: Fetcher, } -impl SingleConnectionRowIteratorWorker +impl SingleConnectionPagerWorker where Fetcher: Fn(PagingState) -> FetchFut + Send + Sync, FetchFut: Future> + Send, @@ -508,7 +508,7 @@ where .await; if send_result.is_err() { - // channel was closed, RowIterator was dropped - should shutdown + // channel was closed, QueryPager was dropped - should shutdown return Ok(proof); } @@ -742,7 +742,7 @@ impl QueryPager { span }; - let worker = RowIteratorWorker { + let worker = PagerWorker { sender: sender.into(), page_query, statement_info: routing_info, @@ -860,7 +860,7 @@ impl QueryPager { span }; - let worker = RowIteratorWorker { + let worker = PagerWorker { sender: sender.into(), page_query, statement_info, @@ -894,7 +894,7 @@ impl QueryPager { let page_size = query.get_validated_page_size(); let worker_task = async move { - let worker = SingleConnectionRowIteratorWorker { + let worker = SingleConnectionPagerWorker { sender: sender.into(), fetcher: |paging_state| { connection.query_raw_with_consistency( @@ -924,7 +924,7 @@ impl QueryPager { let page_size = prepared.get_validated_page_size(); let worker_task = async move { - let worker = SingleConnectionRowIteratorWorker { + let worker = SingleConnectionPagerWorker { sender: sender.into(), fetcher: |paging_state| { connection.execute_raw_with_consistency(