Skip to content

Commit

Permalink
Parallel requests for transactions and transaction details streams
Browse files Browse the repository at this point in the history
  • Loading branch information
xacadil committed Jun 14, 2024
1 parent 73883b3 commit bb84a56
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions tap_restaurant365/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import typing as t
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Any

Expand Down Expand Up @@ -551,20 +552,30 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]:
def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
"""Override the get records to call child stream once batch size is reached we have processed all of the records. .""" # noqa: E501
batch_size = self.batch_size
session = requests.Session()
current_batch = []
for i, record in enumerate(self.request_records(context), 1):
num_records = self.result_count
transformed_record = self.post_process(record, context)
if transformed_record is None:
# Record filtered out during post_process()
continue
current_batch.append(record["transactionId"])
if (
i % batch_size == 0 or i == num_records
): # Check if the batch is full or it's the last record
self._sync_children({"transaction_ids": current_batch})
current_batch = []
yield transformed_record
futures = []
with ThreadPoolExecutor(max_workers=2) as executor:
for i, record in enumerate(self.request_records(context), 1):
num_records = self.result_count
transformed_record = self.post_process(record, context)
if transformed_record is None:
# Record filtered out during post_process()
continue
current_batch.append(record["transactionId"])
if (
i % batch_size == 0 or i == num_records
): # Check if the batch is full or it's the last record
futures.append(executor.submit(self._sync_children, {"transaction_ids": current_batch}))
current_batch = []
for future in as_completed(futures):
try:
future.result()
except Exception as e:
self.logger.error(f"Error in syncing children: {e}")
session.close()
for record in self.request_records(context):
yield self.post_process(record, context)

def _process_record(
self,
Expand Down

0 comments on commit bb84a56

Please sign in to comment.