From bb84a567c62e455ecf6e3f2f1c80ee312dba1144 Mon Sep 17 00:00:00 2001 From: Adil Ahmed Date: Fri, 14 Jun 2024 14:44:46 -0600 Subject: [PATCH] Parallel requests for transactions and transaction details streams --- tap_restaurant365/streams.py | 37 +++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/tap_restaurant365/streams.py b/tap_restaurant365/streams.py index f2c0c3a..4ea0f81 100644 --- a/tap_restaurant365/streams.py +++ b/tap_restaurant365/streams.py @@ -3,6 +3,7 @@ from __future__ import annotations import typing as t +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from typing import Any @@ -551,20 +552,30 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]: def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]: """Override the get records to call child stream once batch size is reached we have processed all of the records. .""" # noqa: E501 batch_size = self.batch_size + session = requests.Session() current_batch = [] - for i, record in enumerate(self.request_records(context), 1): - num_records = self.result_count - transformed_record = self.post_process(record, context) - if transformed_record is None: - # Record filtered out during post_process() - continue - current_batch.append(record["transactionId"]) - if ( - i % batch_size == 0 or i == num_records - ): # Check if the batch is full or it's the last record - self._sync_children({"transaction_ids": current_batch}) - current_batch = [] - yield transformed_record + futures = [] + with ThreadPoolExecutor(max_workers=2) as executor: + for i, record in enumerate(self.request_records(context), 1): + num_records = self.result_count + transformed_record = self.post_process(record, context) + if transformed_record is None: + # Record filtered out during post_process() + continue + current_batch.append(record["transactionId"]) + if ( + i % batch_size == 0 or i == num_records + ): # Check if the batch is full or it's the last record + futures.append(executor.submit(self._sync_children, {"transaction_ids": current_batch})) + current_batch = [] + for future in as_completed(futures): + try: + future.result() + except Exception as e: + self.logger.error(f"Error in syncing children: {e}") + session.close() + for record in self.request_records(context): + yield self.post_process(record, context) def _process_record( self,