From f6b897f463ea13268a932f21bb4b6b081c9bfc06 Mon Sep 17 00:00:00 2001 From: Key Date: Fri, 25 Oct 2024 10:43:52 -0500 Subject: [PATCH 1/6] divide data in date chunks and fetch concurrently --- tap_stripe/client.py | 110 ++++++++++++++++++++++++++++++++++++++++++ tap_stripe/streams.py | 65 ++++++++++++++----------- 2 files changed, 147 insertions(+), 28 deletions(-) diff --git a/tap_stripe/client.py b/tap_stripe/client.py index cf453c5..dd8ec50 100644 --- a/tap_stripe/client.py +++ b/tap_stripe/client.py @@ -14,6 +14,8 @@ from typing import Any, Callable, Dict, Iterable, Optional import backoff from singer_sdk.exceptions import RetriableAPIError, FatalAPIError +import copy +import concurrent.futures import singer from singer import StateMessage @@ -345,3 +347,111 @@ def get_url_params(self, context, next_page_token): if not self.from_invoice_items and next_page_token == 1: del params["starting_after"] return params + + +class ConcurrentStream(stripeStream): + """Class for the streams that need to get data from invoice items in full sync and more than one event in incremental syncs """ + + def prepare_request( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> requests.PreparedRequest: + http_method = self.rest_method + url: str = self.get_url(context) + # update params with the date chunks for concurrent requests + params: dict = self.get_url_params(context, next_page_token) + if context: + params.update(context) + + request_data = self.prepare_request_payload(context, next_page_token) + headers = self.http_headers + + authenticator = self.authenticator + if authenticator: + headers.update(authenticator.auth_headers or {}) + params.update(authenticator.auth_params or {}) + + request = cast( + requests.PreparedRequest, + self.requests_session.prepare_request( + requests.Request( + method=http_method, + url=url, + params=params, + headers=headers, + json=request_data, + ), + ), + ) + return request + + def concurrent_request(self, context): + next_page_token: Any = None + finished = False + decorated_request = self.request_decorator(self._request) + + consolidated_response = [] + + while not finished: + prepared_request = self.prepare_request( + context, next_page_token=next_page_token + ) + resp = decorated_request(prepared_request, context) + consolidated_response.extend(self.parse_response(resp)) + previous_token = copy.deepcopy(next_page_token) + next_page_token = self.get_next_page_token( + response=resp, previous_token=previous_token + ) + if next_page_token and next_page_token == previous_token: + raise RuntimeError( + f"Loop detected in pagination. " + f"Pagination token {next_page_token} is identical to prior token." + ) + # Cycle until get_next_page_token() no longer returns a value + finished = not next_page_token + return consolidated_response + + def get_concurrent_params(self, context, max_requests): + base_params = self.get_url_params(context, None) + + start_date = self.get_starting_time(context) + start_date_timestamp = int(start_date.timestamp()) + end_date_timestamp = int(datetime.utcnow().timestamp()) + + current_start = start_date_timestamp + partition_size = round((end_date_timestamp - start_date_timestamp) / max_requests) + + concurrent_params = [] + + for _ in range(max_requests): + current_end = current_start + partition_size + params = base_params.copy() + params["created[gte]"] = current_start + params["created[lt]"] = current_end + concurrent_params.append(params) + current_start = current_end + + return concurrent_params + + + def request_records(self, context: Optional[dict]) -> Iterable[dict]: + max_requests = 1 + requests_params = [] + + # use sequential requests for incremental syncs + if self.stream_state.get("replication_key_value") or not self.replication_key: + max_requests = 20 + requests_params = self.get_concurrent_params(context, max_requests) + + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_requests + ) as executor: + futures = { + executor.submit(self.concurrent_request, x): x for x in requests_params + } + + # Process each future as it completes + for future in concurrent.futures.as_completed(futures): + context = futures[future] + # Yield records + for result in future.result(): + yield result \ No newline at end of file diff --git a/tap_stripe/streams.py b/tap_stripe/streams.py index fb6a35b..f7c291a 100644 --- a/tap_stripe/streams.py +++ b/tap_stripe/streams.py @@ -1,9 +1,9 @@ """Stream type classes for tap-stripe-v2.""" -from typing import Any, Optional, Iterable, Dict +from typing import Any, Optional, Iterable, Dict, cast from singer_sdk import typing as th from singer_sdk.exceptions import FatalAPIError, RetriableAPIError -from tap_stripe.client import stripeStream, StripeStreamV2 +from tap_stripe.client import stripeStream, StripeStreamV2, ConcurrentStream from urllib.parse import urlencode import requests from singer_sdk.helpers.jsonpath import extract_jsonpath @@ -15,7 +15,7 @@ from datetime import datetime -class Invoices(stripeStream): +class Invoices(ConcurrentStream): """Define Invoices stream.""" name = "invoices" @@ -134,6 +134,7 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict: stripeStream.invoice_lines = data return {} + class InvoiceLineItems(stripeStream): name = "invoice_line_items" parent_stream_type = Invoices @@ -196,7 +197,7 @@ def get_next_page_token( return None -class InvoiceItems(stripeStream): +class InvoiceItems(ConcurrentStream): """Define InvoiceItems stream.""" name = "invoice_items" @@ -234,7 +235,7 @@ class InvoiceItems(stripeStream): ).to_dict() -class Subscriptions(stripeStream): +class Subscriptions(ConcurrentStream): """Define Subscriptions stream.""" name = "subscriptions" @@ -360,7 +361,6 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict: return {"subscription_item_id": record["id"]} - class Plans(StripeStreamV2): """Define Plans stream.""" @@ -483,8 +483,7 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: yield record - -class CreditNotes(stripeStream): +class CreditNotes(ConcurrentStream): """Define CreditNotes stream.""" name = "credit_notes" @@ -534,7 +533,7 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict: } -class Coupons(stripeStream): +class Coupons(ConcurrentStream): """Define Coupons stream.""" name = "coupons" @@ -626,7 +625,7 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: yield record -class Customers(stripeStream): +class Customers(ConcurrentStream): """Define Customers stream.""" name = "customers" @@ -665,7 +664,7 @@ def path(self): ).to_dict() -class Events(stripeStream): +class Events(ConcurrentStream): """Define Coupons stream.""" name = "events" @@ -685,7 +684,7 @@ class Events(stripeStream): ).to_dict() -class SubscriptionSchedulesStream(stripeStream): +class SubscriptionSchedulesStream(ConcurrentStream): """Define stream.""" name = "subscription_schedules" @@ -739,6 +738,7 @@ def validate_response(self, response: requests.Response) -> None: msg = self.response_error_message(response) raise RetriableAPIError(msg, response) + class TaxRatesStream(stripeStream): name = "tax_rates" @@ -764,7 +764,7 @@ class TaxRatesStream(stripeStream): ).to_dict() -class BalanceTransactionsStream(stripeStream): +class BalanceTransactionsStream(ConcurrentStream): name = "balance_transactions" path = "balance_transactions" @@ -807,7 +807,7 @@ def apply_catalog(self, catalog) -> None: self.forced_replication_method = catalog_entry.replication_method -class ChargesStream(stripeStream): +class ChargesStream(ConcurrentStream): name = "charges" object = "charge" @@ -863,7 +863,8 @@ def path(self): th.Property("transfer_group", th.StringType), ).to_dict() -class CheckoutSessionsStream(stripeStream): + +class CheckoutSessionsStream(ConcurrentStream): name = "checkout_sessions" object = "checkout.session" @@ -949,7 +950,9 @@ class CreditNoteLineItemsStream(stripeStream): th.Property("unit_amount_decimal", th.StringType), th.Property("unit_amount_excluding_tax", th.StringType), ).to_dict() -class DisputesIssuingStream(stripeStream): + + +class DisputesIssuingStream(ConcurrentStream): name = "disputes_issuing" object = "issuing.dispute" @@ -972,8 +975,9 @@ def path(self): th.Property("transaction", th.StringType), ).to_dict() -class PaymentIntentsStream(stripeStream): + +class PaymentIntentsStream(ConcurrentStream): name = "payment_intents" object = "payment_intent" replication_key = "updated" @@ -981,7 +985,6 @@ class PaymentIntentsStream(stripeStream): @property def path(self): return "events" if self.get_from_events else "payment_intents" - schema = th.PropertiesList( th.Property("id", th.StringType), @@ -1022,9 +1025,10 @@ def path(self): th.Property("status", th.StringType), th.Property("transfer_data", th.CustomType({"type": ["object", "string"]})), th.Property("transfer_group", th.StringType), - ).to_dict() -class PayoutsStream(stripeStream): + + +class PayoutsStream(ConcurrentStream): name = "payouts" object = "payout" @@ -1059,9 +1063,10 @@ def path(self): th.Property("statement_descriptor", th.StringType), th.Property("status", th.StringType), th.Property("type", th.StringType), - ).to_dict() -class PromotionCodesStream(stripeStream): + + +class PromotionCodesStream(ConcurrentStream): name = "promotion_codes" object = "promotion_code" @@ -1089,7 +1094,8 @@ def path(self): th.Property("times_redeemed", th.NumberType), ).to_dict() -class TransfersStream(stripeStream): + +class TransfersStream(ConcurrentStream): name = "transfers" object = "transfer" @@ -1118,9 +1124,10 @@ def path(self): th.Property("source_transaction", th.StringType), th.Property("source_type", th.StringType), th.Property("transfer_group", th.StringType), - ).to_dict() -class RefundsStream(stripeStream): + + +class RefundsStream(ConcurrentStream): name = "refunds" replication_key = "updated" @@ -1155,8 +1162,9 @@ def path(self): th.Property("source_transfer_reversal", th.StringType), th.Property("status", th.StringType), th.Property("transfer_reversal", th.StringType) -).to_dict() - + ).to_dict() + + class PayoutReportsStream(stripeStream): name = "report_payout_reconciliation" @@ -1367,7 +1375,8 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: continue yield transformed_record -class DisputesStream(stripeStream): + +class DisputesStream(ConcurrentStream): name = "disputes" object = "dispute" replication_key = "updated" From f75fd7416dd2bfef9875f1a344284723216559ed Mon Sep 17 00:00:00 2001 From: Key Date: Fri, 25 Oct 2024 14:56:26 -0500 Subject: [PATCH 2/6] add concurrency for fullsync requests --- tap_stripe/client.py | 133 ++++++++++++++++++------------------------- 1 file changed, 56 insertions(+), 77 deletions(-) diff --git a/tap_stripe/client.py b/tap_stripe/client.py index dd8ec50..e492e42 100644 --- a/tap_stripe/client.py +++ b/tap_stripe/client.py @@ -16,6 +16,7 @@ from singer_sdk.exceptions import RetriableAPIError, FatalAPIError import copy import concurrent.futures +import math import singer from singer import StateMessage @@ -323,66 +324,17 @@ def _write_state_message(self) -> None: singer.write_message(StateMessage(value=tap_state)) -class StripeStreamV2(stripeStream): +class ConcurrentStream(stripeStream): """Class for the streams that need to get data from invoice items in full sync and more than one event in incremental syncs """ - def request_records(self, context: Optional[dict]) -> Iterable[dict]: - # activate flag if it's a full sync to fetch prices from invoiceitems - if not self.stream_state.get("replication_key"): - self.from_invoice_items = True - return super().request_records(context) - - def get_next_page_token(self, response, previous_token): - next_page_token = super().get_next_page_token(response, previous_token) - # get a dummy next page token to iterate first through invoice_items and then through prices - if self.from_invoice_items and not next_page_token: - next_page_token = 1 - self.from_invoice_items = False - return next_page_token - - def get_url_params(self, context, next_page_token): + def get_url_params( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" params = super().get_url_params(context, next_page_token) - # delete the dummy next page token to avoid errors - if not self.from_invoice_items and next_page_token == 1: - del params["starting_after"] + if context and "concurrent_params" in context: + params.update(context["concurrent_params"]) return params - - -class ConcurrentStream(stripeStream): - """Class for the streams that need to get data from invoice items in full sync and more than one event in incremental syncs """ - - def prepare_request( - self, context: Optional[dict], next_page_token: Optional[Any] - ) -> requests.PreparedRequest: - http_method = self.rest_method - url: str = self.get_url(context) - # update params with the date chunks for concurrent requests - params: dict = self.get_url_params(context, next_page_token) - if context: - params.update(context) - - request_data = self.prepare_request_payload(context, next_page_token) - headers = self.http_headers - - authenticator = self.authenticator - if authenticator: - headers.update(authenticator.auth_headers or {}) - params.update(authenticator.auth_params or {}) - - request = cast( - requests.PreparedRequest, - self.requests_session.prepare_request( - requests.Request( - method=http_method, - url=url, - params=params, - headers=headers, - json=request_data, - ), - ), - ) - return request def concurrent_request(self, context): next_page_token: Any = None @@ -391,6 +343,10 @@ def concurrent_request(self, context): consolidated_response = [] + start_date = datetime.fromtimestamp(context["concurrent_params"]["created[gte]"]).isoformat() + end_date = datetime.fromtimestamp(context["concurrent_params"]["created[lt]"]).isoformat() + self.logger.info(f"Fetching data concurrently from {self.name} from {start_date} to {end_date}") + while not finished: prepared_request = self.prepare_request( context, next_page_token=next_page_token @@ -411,23 +367,25 @@ def concurrent_request(self, context): return consolidated_response def get_concurrent_params(self, context, max_requests): - base_params = self.get_url_params(context, None) - start_date = self.get_starting_time(context) start_date_timestamp = int(start_date.timestamp()) end_date_timestamp = int(datetime.utcnow().timestamp()) current_start = start_date_timestamp - partition_size = round((end_date_timestamp - start_date_timestamp) / max_requests) + default_partition_size = 2592000 # one month as default + partition_size = min(math.ceil((end_date_timestamp - start_date_timestamp) / max_requests), default_partition_size) concurrent_params = [] - for _ in range(max_requests): + chunks = math.ceil((end_date_timestamp - start_date_timestamp)/partition_size) + for i in range(chunks): + params = {} current_end = current_start + partition_size - params = base_params.copy() params["created[gte]"] = current_start - params["created[lt]"] = current_end - concurrent_params.append(params) + # don't put an end_date at the final chunk + if i < chunks-1: + params["created[lt]"] = current_end + concurrent_params.append({"concurrent_params": params}) current_start = current_end return concurrent_params @@ -438,20 +396,41 @@ def request_records(self, context: Optional[dict]) -> Iterable[dict]: requests_params = [] # use sequential requests for incremental syncs - if self.stream_state.get("replication_key_value") or not self.replication_key: - max_requests = 20 + if not self.stream_state.get("replication_key_value") or not self.replication_key: + max_requests = 25 if "live" in self.config.get("client_secret") else 10 requests_params = self.get_concurrent_params(context, max_requests) + + if len(requests_params): + for i in range(0, len(requests_params), max_requests): + req_params = requests_params[i: i + max_requests] + + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_requests + ) as executor: + futures = { + executor.submit(self.concurrent_request, x): x for x in req_params + } + + # Process each future as it completes + for future in concurrent.futures.as_completed(futures): + context = futures[future] + # Yield records + for result in future.result(): + yield result + else: + # keep original behaviour for incremental syncs + yield from super().request_records(context) + + +class StripeStreamV2(ConcurrentStream): + """Class for the streams that need to get data from invoice items in full sync and more than one event in incremental syncs """ - with concurrent.futures.ThreadPoolExecutor( - max_workers=max_requests - ) as executor: - futures = { - executor.submit(self.concurrent_request, x): x for x in requests_params - } - - # Process each future as it completes - for future in concurrent.futures.as_completed(futures): - context = futures[future] - # Yield records - for result in future.result(): - yield result \ No newline at end of file + def request_records(self, context: Optional[dict]) -> Iterable[dict]: + # activate flag if it's a full sync to fetch prices from invoiceitems + if not self.stream_state.get("replication_key"): + # get data from invoice items + self.from_invoice_items = True + yield from super().request_records(context) + # get data from normal enpoint + self.from_invoice_items = False + yield from super().request_records(context) \ No newline at end of file From 3b7f64c4da9fbb20a1eecc039424f3d3cf226ea7 Mon Sep 17 00:00:00 2001 From: Key Date: Fri, 25 Oct 2024 15:05:37 -0500 Subject: [PATCH 3/6] fix logs --- tap_stripe/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_stripe/client.py b/tap_stripe/client.py index e492e42..8026111 100644 --- a/tap_stripe/client.py +++ b/tap_stripe/client.py @@ -344,7 +344,7 @@ def concurrent_request(self, context): consolidated_response = [] start_date = datetime.fromtimestamp(context["concurrent_params"]["created[gte]"]).isoformat() - end_date = datetime.fromtimestamp(context["concurrent_params"]["created[lt]"]).isoformat() + end_date = datetime.fromtimestamp(context["concurrent_params"].get("created[lt]", datetime.utcnow().timestamp())).isoformat() self.logger.info(f"Fetching data concurrently from {self.name} from {start_date} to {end_date}") while not finished: From b2e6d8c3188c4bdb2927516c0e55746d2d43474f Mon Sep 17 00:00:00 2001 From: Key Date: Wed, 30 Oct 2024 16:52:31 -0500 Subject: [PATCH 4/6] add concurrent requests for incremental syncs and child streams --- tap_stripe/client.py | 379 ++++++++++++++++++++++++++++++++---------- tap_stripe/streams.py | 1 + 2 files changed, 292 insertions(+), 88 deletions(-) diff --git a/tap_stripe/client.py b/tap_stripe/client.py index 8026111..679f1fc 100644 --- a/tap_stripe/client.py +++ b/tap_stripe/client.py @@ -1,7 +1,7 @@ """REST client handling, including stripeStream base class.""" from datetime import datetime -from typing import Any, Dict, Iterable, Optional, cast +from typing import Any, Dict, Iterable, Optional, cast, List import requests from requests.exceptions import JSONDecodeError @@ -13,13 +13,20 @@ from pendulum import parse from typing import Any, Callable, Dict, Iterable, Optional import backoff -from singer_sdk.exceptions import RetriableAPIError, FatalAPIError +from singer_sdk.exceptions import RetriableAPIError, FatalAPIError, InvalidStreamSortException import copy import concurrent.futures import math import singer from singer import StateMessage +import requests +from requests.adapters import HTTPAdapter + +from singer_sdk.helpers._state import ( + finalize_state_progress_markers, + log_sort_error, +) class stripeStream(RESTStream): """stripe stream class.""" @@ -79,7 +86,8 @@ def get_url_params( ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" params: dict = self.params.copy() - params["limit"] = self._page_size + if not self.parent_stream_type: + params["limit"] = self._page_size if next_page_token: params["starting_after"] = next_page_token if self.replication_key and self.path != "credit_notes": @@ -107,6 +115,9 @@ def datetime_fields(self): def post_process(self, row: dict, context: Optional[dict]) -> dict: """As needed, append or transform raw data to match expected structure.""" + # add updated as created if not in record (normally for full syncs, for incremental updated is added from events) + if not "updated" in row: + row["updated"] = row.get("created") for field in self.datetime_fields: if row.get(field): dt_field = datetime.utcfromtimestamp(int(row[field])) @@ -119,15 +130,11 @@ def not_sync_invoice_status(self): if not_sync_invoice_status: return not_sync_invoice_status.split(",") return ["deleted"] - - def parse_response(self, response: requests.Response) -> Iterable[dict]: - decorated_request = self.request_decorator(self._request) - base_url = "/".join(self.url_base.split("/")[:-2]) - try: - records = extract_jsonpath(self.records_jsonpath, input=response.json()) - except: - records = response - + + def clean_records_from_events(self, records): + """ + Clean list of records fetched from events for incremental syncs + """ if self.name == "plans" and self.path == "events": records = list(records) # for plans get all prices, including the updated ones from subscriptions @@ -155,79 +162,92 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: [item.update({"id": item["product"]}) for item in invoiceitems_products] # Combine both sets of plans records = products + invoiceitems_products - - for record in records: - # logic for incremental syncs - if self.name != "events" and ((self.path == "events" and self.get_from_events) or self.get_data_from_id): - event_date = record["created"] + + # clean duplicates + if self.name != "events" and ((self.path == "events" and self.get_from_events) or self.get_data_from_id): + clean_records = [] + for record in records: if self.name not in ["plans", "products"]: record = record["data"]["object"] record_id = record.get("id") - if ( - not record_id - or (record_id in self.event_ids) - or ( - self.object != record["object"] - if self.object not in ["plan", "product"] - else False + if record_id not in self.event_ids: + self.event_ids.append(record_id) + clean_records.append(record) + return clean_records + + def get_lines(self, record, decorated_request): + base_url = self.url_base + if "lines" in record: + if record["lines"].get("has_more"): + next_page_token = self.get_next_page_token_lines(record["lines"]) + url = base_url + record["lines"]["url"] + lines = record["lines"].get("data", []) + while next_page_token: + params = {"limit": 100, "starting_after": next_page_token} + lines_response = decorated_request( + self.prepare_request_lines(url, params), {} ) - ): - continue - - # filter status that we need to ignore, ignore deleted status as default - if record.get("status") in self.not_sync_invoice_status: - self.logger.debug(f"{self.name} with id {record_id} skipped due to status {record.get('status')}") - continue - # using prices API instead of plans API - if self.name == "plans": - url = base_url + f"/v1/prices/{record_id}" - # discounts is a synthetic stream, it uses data from invoices - elif self.name == "discounts": - url = base_url + f"/v1/invoices/{record_id}" - else: - url = base_url + f"/v1/{self.name}/{record['id']}" - params = {} - if self.expand: - params["expand[]"] = self.expand - - response_obj = decorated_request( - self.prepare_request_lines(url, params), {} + response_obj = lines_response.json() + next_page_token = self.get_next_page_token_lines(response_obj) + response_data = response_obj.get("data", []) + lines.extend(response_data) + record["lines"]["data"] = lines + record["lines"]["has_more"] = False + return record + + def get_record_from_events(self, record, decorated_request): + # logic for incremental syncs + base_url = self.url_base + if self.name != "events" and ((self.path == "events" and self.get_from_events) or self.get_data_from_id): + event_date = record["created"] + record_id = record.get("id") + if ( + not record_id + or ( + self.object != record["object"] + if self.object not in ["plan", "product"] + else False ) - if response_obj.status_code in self.ignore_statuscode: - self.logger.debug(f"{self.name} with id {record_id} skipped") - continue - record = response_obj.json() - record["updated"] = event_date - - # add record id to event_ids to not get dupplicates in incremental syncs - self.event_ids.append(record_id) - if not record.get("updated") and "created" in record: - record["updated"] = record["created"] - - # iterate through lines pages - if "lines" in record: - if record["lines"].get("has_more"): - next_page_token = self.get_next_page_token_lines(record["lines"]) - url = base_url + record["lines"]["url"] - lines = record["lines"].get("data", []) - while next_page_token: - params = {"limit": 100, "starting_after": next_page_token} - lines_response = decorated_request( - self.prepare_request_lines(url, params), {} - ) - response_obj = lines_response.json() - next_page_token = self.get_next_page_token_lines(response_obj) - response_data = response_obj.get("data", []) - lines.extend(response_data) - record["lines"]["data"] = lines - record["lines"]["has_more"] = False - - # clean dupplicates for fullsync streams that fetch data from more than one endpoint - if hasattr(self, "from_invoice_items"): - if self.from_invoice_items: - if record["id"] in self.fullsync_ids: - continue - yield record + ): + return + + # filter status that we need to ignore, ignore deleted status as default + if record.get("status") in self.not_sync_invoice_status: + self.logger.debug(f"{self.name} with id {record_id} skipped due to status {record.get('status')}") + return + # using prices API instead of plans API + if self.name == "plans": + url = base_url + f"prices/{record_id}" + # discounts is a synthetic stream, it uses data from invoices + elif self.name == "discounts": + url = base_url + f"invoices/{record_id}" + else: + url = base_url + f"{self.name}/{record['id']}" + params = {} + if self.expand: + params["expand[]"] = self.expand + + response_obj = decorated_request( + self.prepare_request_lines(url, params), {} + ) + if response_obj.status_code in self.ignore_statuscode: + self.logger.debug(f"{self.name} with id {record_id} skipped") + return + record = response_obj.json() + record["updated"] = event_date + + # iterate through lines pages + record = self.get_lines(record, decorated_request) + return record + + def parse_response(self, response: requests.Response) -> Iterable[dict]: + decorated_request = self.request_decorator(self._request) + try: + records = extract_jsonpath(self.records_jsonpath, input=response.json()) + except: + records = response + records = self.clean_records_from_events(records) + yield from [self.get_record_from_events(record, decorated_request) for record in records] def get_next_page_token_lines(self, response: requests.Response) -> Optional[Any]: """Return a token for identifying next page or None if no more pages.""" @@ -323,10 +343,42 @@ def _write_state_message(self) -> None: singer.write_message(StateMessage(value=tap_state)) - class ConcurrentStream(stripeStream): """Class for the streams that need to get data from invoice items in full sync and more than one event in incremental syncs """ + def __init__( + self, + tap, + name = None, + schema = None, + path = None, + ) -> None: + super().__init__(name=name, schema=schema, tap=tap, path=path) + self.requests_session = requests.Session() + adapter = HTTPAdapter(pool_connections=90, pool_maxsize=90) + self.requests_session.mount("https://", adapter) + + + @property + def max_concurrent_requests(self): + # if stream has child streams selected use half of possible connections + # for parent and half for child to be able to do concurrent calls in both + max_requests = 2 if "live" in self.config.get("client_secret") else 20 + has_child_selected = any(getattr(obj, 'selected', False) for obj in self.child_streams) + if has_child_selected: + max_requests = max_requests/2 + return math.floor(max_requests) + + @property + def requests_session(self) -> requests.Session: + if not self._requests_session: + self._requests_session = requests.Session() + return self._requests_session + + @requests_session.setter + def requests_session(self, value): + self._requests_session = value + def get_url_params( self, context: Optional[dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: @@ -389,18 +441,22 @@ def get_concurrent_params(self, context, max_requests): current_start = current_end return concurrent_params - def request_records(self, context: Optional[dict]) -> Iterable[dict]: max_requests = 1 requests_params = [] - # use sequential requests for incremental syncs - if not self.stream_state.get("replication_key_value") or not self.replication_key: - max_requests = 25 if "live" in self.config.get("client_secret") else 10 + # use concurrent requests for fullsyncs and streams that don't use events for incremental syncs + if (not self.stream_state.get("replication_key_value") or self.name in [ + "invoice_items", + "events", + "subscription_schedules", + "tax_rates", + "balance_transactions", + ]) and not self.parent_stream_type: + max_requests = self.max_concurrent_requests requests_params = self.get_concurrent_params(context, max_requests) - if len(requests_params): for i in range(0, len(requests_params), max_requests): req_params = requests_params[i: i + max_requests] @@ -418,10 +474,149 @@ def request_records(self, context: Optional[dict]) -> Iterable[dict]: for result in future.result(): yield result else: - # keep original behaviour for incremental syncs yield from super().request_records(context) + def get_inc_concurrent_params(self, records, decorated_request): + params = [] + for record in records: + params.append({"record": record, "decorated_request": decorated_request}) + return params + def parse_response(self, response: requests.Response) -> Iterable[dict]: + decorated_request = self.request_decorator(self._request) + try: + records = extract_jsonpath(self.records_jsonpath, input=response.json()) + except: + records = response + + if self.name != "events" and ((self.path == "events" and self.get_from_events) or self.get_data_from_id): + decorated_request = self.request_decorator(self._request) + records = self.clean_records_from_events(records) + # get records from "base_url/record_id" concurrently + requests_params = self.get_inc_concurrent_params(records, decorated_request) + max_requests = self.max_concurrent_requests + if len(requests_params): + for i in range(0, len(requests_params), max_requests): + req_params = requests_params[i: i + max_requests] + + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_requests + ) as executor: + futures = { + executor.submit(self.get_record_from_events, x["record"], x["decorated_request"]): x for x in req_params + } + # Process each future as it completes + for future in concurrent.futures.as_completed(futures): + # Yield records + if future.result(): + yield future.result() + else: + for record in records: + record = self.get_lines(record, decorated_request) + yield record + + def mark_last_item(self, generator): + iterator = iter(generator) + try: + # Attempt to get the first item + previous = next(iterator) + except StopIteration: + # If generator is empty, exit the function + return + + # Iterate through remaining items + for current in iterator: + yield previous, False + previous = current + + # Yield the last item with `is_last=True` + yield previous, True + + def _sync_records( # noqa C901 # too complex + self, context: Optional[dict] = None + ) -> None: + record_count = 0 + current_context: Optional[dict] + context_list: Optional[List[dict]] + context_list = [context] if context is not None else self.partitions + selected = self.selected + + for current_context in context_list or [{}]: + partition_record_count = 0 + current_context = current_context or None + state = self.get_context_state(current_context) + state_partition_context = self._get_state_partition_context(current_context) + self._write_starting_replication_value(current_context) + child_context: Optional[dict] = ( + None if current_context is None else copy.copy(current_context) + ) + child_contexts = [] + for record_result, is_last in self.mark_last_item(self.get_records(current_context)): + if isinstance(record_result, tuple): + # Tuple items should be the record and the child context + record, child_context = record_result + else: + record = record_result + child_context = copy.copy( + self.get_child_context(record=record, context=child_context) + ) + for key, val in (state_partition_context or {}).items(): + # Add state context to records if not already present + if key not in record: + record[key] = val + + ### Modified behaviour + # Sync children concurrently, except when primary mapper filters out the record + if self.stream_maps[0].get_filter_result(record): + # invoices child stream is a synthetic stream + if self.name == "invoices": + self._sync_children(child_context) + else: + child_contexts.append(child_context) + if len(child_contexts) == self.max_concurrent_requests or is_last: + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.max_concurrent_requests + ) as executor: + futures = { + executor.submit(self._sync_children, x): x for x in child_contexts + } + child_contexts = [] + + ###- + + self._check_max_record_limit(record_count) + if selected: + if (record_count - 1) % self.STATE_MSG_FREQUENCY == 0: + self._write_state_message() + self._write_record_message(record) + try: + self._increment_stream_state(record, context=current_context) + except InvalidStreamSortException as ex: + log_sort_error( + log_fn=self.logger.error, + ex=ex, + record_count=record_count + 1, + partition_record_count=partition_record_count + 1, + current_context=current_context, + state_partition_context=state_partition_context, + stream_name=self.name, + ) + raise ex + + record_count += 1 + partition_record_count += 1 + if current_context == state_partition_context: + # Finalize per-partition state only if 1:1 with context + finalize_state_progress_markers(state) + if not context: + # Finalize total stream only if we have the full full context. + # Otherwise will be finalized by tap at end of sync. + finalize_state_progress_markers(self.stream_state) + self._write_record_count_log(record_count=record_count, context=context) + # Reset interim bookmarks before emitting final STATE message: + self._write_state_message() + + class StripeStreamV2(ConcurrentStream): """Class for the streams that need to get data from invoice items in full sync and more than one event in incremental syncs """ @@ -433,4 +628,12 @@ def request_records(self, context: Optional[dict]) -> Iterable[dict]: yield from super().request_records(context) # get data from normal enpoint self.from_invoice_items = False - yield from super().request_records(context) \ No newline at end of file + yield from super().request_records(context) + else: + yield from super().request_records(context) + + def parse_response(self, response) -> Iterable[dict]: + for record in super().parse_response(response): + if self.from_invoice_items: + if record["id"] in self.fullsync_ids: + return \ No newline at end of file diff --git a/tap_stripe/streams.py b/tap_stripe/streams.py index f7c291a..5562af3 100644 --- a/tap_stripe/streams.py +++ b/tap_stripe/streams.py @@ -981,6 +981,7 @@ class PaymentIntentsStream(ConcurrentStream): name = "payment_intents" object = "payment_intent" replication_key = "updated" + event_filter = "payment_intent.*" @property def path(self): From dabb5689ac3604bd38668f32404763b88abbad51 Mon Sep 17 00:00:00 2001 From: Key Date: Wed, 30 Oct 2024 16:56:15 -0500 Subject: [PATCH 5/6] fix max_concurrent_requests --- tap_stripe/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_stripe/client.py b/tap_stripe/client.py index 679f1fc..c625839 100644 --- a/tap_stripe/client.py +++ b/tap_stripe/client.py @@ -363,7 +363,7 @@ def __init__( def max_concurrent_requests(self): # if stream has child streams selected use half of possible connections # for parent and half for child to be able to do concurrent calls in both - max_requests = 2 if "live" in self.config.get("client_secret") else 20 + max_requests = 80 if "live" in self.config.get("client_secret") else 20 has_child_selected = any(getattr(obj, 'selected', False) for obj in self.child_streams) if has_child_selected: max_requests = max_requests/2 From 2950d6be62707368764fd90d86e828ce8c1d6ecd Mon Sep 17 00:00:00 2001 From: Keyna Rafael <95432445+keyn4@users.noreply.github.com> Date: Mon, 4 Nov 2024 13:07:47 -0500 Subject: [PATCH 6/6] fix schema for payment_intents shipping (#33) --- tap_stripe/streams.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tap_stripe/streams.py b/tap_stripe/streams.py index 5562af3..6258f96 100644 --- a/tap_stripe/streams.py +++ b/tap_stripe/streams.py @@ -1019,10 +1019,9 @@ def path(self): th.Property("receipt_email", th.StringType), th.Property("review", th.StringType), th.Property("setup_future_usage", th.StringType), - th.Property("shipping", th.StringType), + th.Property("shipping", th.CustomType({"type": ["object", "string"]})), th.Property("source", th.StringType), th.Property("statement_descriptor_suffix", th.StringType), - th.Property("statement_descriptor_suffix", th.StringType), th.Property("status", th.StringType), th.Property("transfer_data", th.CustomType({"type": ["object", "string"]})), th.Property("transfer_group", th.StringType),