diff --git a/tap_restaurant365/client.py b/tap_restaurant365/client.py index 2639455..0a3a1af 100644 --- a/tap_restaurant365/client.py +++ b/tap_restaurant365/client.py @@ -2,33 +2,32 @@ from __future__ import annotations -from pathlib import Path -from typing import Any, Callable, Iterable, Optional -from datetime import timedelta, datetime -from dateutil import parser - +from datetime import timedelta +from http import HTTPStatus +from typing import Any, Callable import requests +from dateutil import parser from singer_sdk.authenticators import BasicAuthenticator -from singer_sdk.helpers.jsonpath import extract_jsonpath +from singer_sdk.exceptions import FatalAPIError, RetriableAPIError from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002 from singer_sdk.streams import RESTStream -from singer_sdk.exceptions import FatalAPIError, RetriableAPIError -from http import HTTPStatus + _Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest] class Restaurant365Stream(RESTStream): """Restaurant365 stream class.""" + skip = 0 days_delta = 10 + @property def url_base(self) -> str: """Return the API URL root, configurable via tap settings.""" return "https://odata.restaurant365.net/api/v2/views" - records_jsonpath = "$.value[*]" - + records_jsonpath = "$.value[*]" @property def authenticator(self) -> BasicAuthenticator: @@ -62,9 +61,7 @@ def get_new_paginator(self) -> BaseAPIPaginator: # else: # params - return super().get_new_paginator() - def get_starting_time(self, context): start_date = self.config.get("start_date") @@ -90,11 +87,11 @@ def get_url_params( params: dict = {} if self.replication_key: - start_date = self.get_starting_time(context).strftime('%Y-%m-%dT%H:%M:%SZ') + start_date = self.get_starting_time(context).strftime("%Y-%m-%dT%H:%M:%SZ") params["$filter"] = f"{self.replication_key} ge {start_date}" return params - + def validate_response(self, response: requests.Response) -> None: if ( response.status_code in self.extra_retry_statuses diff --git a/tap_restaurant365/streams.py b/tap_restaurant365/streams.py index a100cb0..2f1b359 100644 --- a/tap_restaurant365/streams.py +++ b/tap_restaurant365/streams.py @@ -3,13 +3,13 @@ from __future__ import annotations import typing as t -from pathlib import Path +from datetime import datetime, timedelta from typing import Any + import requests -from datetime import timedelta, datetime from dateutil import parser - from singer_sdk import typing as th # JSON Schema typing helpers +from singer_sdk.helpers.jsonpath import extract_jsonpath from tap_restaurant365.client import Restaurant365Stream @@ -33,37 +33,45 @@ def get_next_page_token( self.skip += 5000 # Update the previous token if it exists if previous_token: - previous_token = previous_token['token'] + previous_token = previous_token["token"] # Return the next page token and the updated skip value - return {"token":previous_token,"skip":self.skip} + return {"token": previous_token, "skip": self.skip} else: # Reset skip value for a new pagination sequence self.skip = 0 # Determine the starting replication value for data extraction - replication_key_value = self.tap_state["bookmarks"][self.name]['starting_replication_value'] + replication_key_value = self.tap_state["bookmarks"][self.name][ + "starting_replication_value" + ] # Update the replication key value if progress markers are present if "progress_markers" in self.tap_state["bookmarks"][self.name]: - replication_key_value = self.tap_state["bookmarks"][self.name]['progress_markers']["replication_key_value"] + replication_key_value = self.tap_state["bookmarks"][self.name][ + "progress_markers" + ]["replication_key_value"] # Calculate the start date for data extraction - start_date = (parser.parse(replication_key_value) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date")) + start_date = ( + parser.parse(replication_key_value) + timedelta(seconds=1) + ) or parser.parse(self.config.get("start_date")) today = datetime.today() # Adjust the start date based on the previous token if applicable (will occur if progress marker is unable to find a value in empty data response) if ( previous_token and "token" in previous_token - and previous_token['token'] + and previous_token["token"] and start_date.replace(tzinfo=None) <= previous_token["token"].replace(tzinfo=None) ): - start_date = previous_token["token"] + timedelta(days=self.days_delta) + start_date = previous_token["token"] + timedelta( + days=self.days_delta + ) next_token = start_date.replace(tzinfo=None) # Disable pagination if the next token's date is in the future if (today - next_token).days < 0: self.paginate = False # Return the next token and the current skip value - return {"token":next_token,'skip':self.skip} + return {"token": next_token, "skip": self.skip} else: # Return None if pagination is not enabled return None @@ -80,15 +88,15 @@ def get_url_params( if next_page_token: token_date, skip = next_page_token["token"], next_page_token["skip"] start_date = token_date or self.get_starting_time(context) - end_date = start_date + timedelta(days = self.days_delta) + end_date = start_date + timedelta(days=self.days_delta) if self.replication_key: - params["$filter"] = ( - f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT23:59:59Z')}" - ) - #Order by replication key so the response is consistent - params['$orderby'] = f"{self.replication_key}" + params[ + "$filter" + ] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT23:59:59Z')}" + # Order by replication key so the response is consistent + params["$orderby"] = f"{self.replication_key}" if self.name == "journal_entries": - #set a date in the stream to check later to see if we need to keep calling to the stream + # set a date in the stream to check later to see if we need to keep calling to the stream params["$filter"] += f" and type eq 'Journal Entry'" if self.name == "bills": params["$filter"] += f" and type eq 'AP Invoice'" @@ -99,13 +107,11 @@ def get_url_params( if self.name == "bank_expenses": params["$filter"] += f" and type eq 'Bank Expense'" # - if skip>0: + if skip > 0: params["$skip"] = skip return params - - class AccountsStream(Restaurant365Stream): """Define custom stream.""" @@ -136,14 +142,14 @@ class AccountsStream(Restaurant365Stream): th.Property("createdBy", th.StringType), th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), - th.Property("modifiedOn", th.DateTimeType) + th.Property("modifiedOn", th.DateTimeType), ).to_dict() -class TransactionsStream(LimitedTimeframeStream): +class TransactionsParentStream(LimitedTimeframeStream): """Define custom stream.""" - name = "transaction" + name = "transaction_parent_stream" path = "/Transaction" primary_keys = ["transactionId"] replication_key = "modifiedOn" @@ -162,55 +168,55 @@ class TransactionsStream(LimitedTimeframeStream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedOn", th.DateTimeType), th.Property("createdBy", th.StringType), - th.Property("modifiedBy", th.StringType) + th.Property("modifiedBy", th.StringType), ).to_dict() - def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: - return {"transaction_id":record["transactionId"]} - -class BillsStream(TransactionsStream): +class BillsStream(TransactionsParentStream): """Define custom stream.""" name = "bills" - path = "/Transaction" #?$filter=type eq 'AP Invoices' + path = "/Transaction" # ?$filter=type eq 'AP Invoices' primary_keys = ["transactionId"] replication_key = "modifiedOn" paginate = True -class JournalEntriesStream(TransactionsStream): +class JournalEntriesStream(TransactionsParentStream): """Define custom stream.""" name = "journal_entries" - path = "/Transaction" #?$filter=type eq 'Journal Entry and modifiedOn ge ' + path = "/Transaction" # ?$filter=type eq 'Journal Entry and modifiedOn ge ' primary_keys = ["transactionId"] replication_key = "modifiedOn" paginate = True -class CreditMemosStream(TransactionsStream): + +class CreditMemosStream(TransactionsParentStream): """Define custom stream.""" name = "credit_memos" - path = "/Transaction" #?$filter=type eq 'AP Credit memo and modifiedOn ge ' + path = "/Transaction" # ?$filter=type eq 'AP Credit memo and modifiedOn ge ' primary_keys = ["transactionId"] replication_key = "modifiedOn" paginate = True -class StockCountStream(TransactionsStream): + +class StockCountStream(TransactionsParentStream): """Define custom stream.""" name = "stock_count" - path = "/Transaction" #?$filter=type eq 'Stock Count and modifiedOn ge ' + path = "/Transaction" # ?$filter=type eq 'Stock Count and modifiedOn ge ' primary_keys = ["transactionId"] replication_key = "modifiedOn" paginate = True -class BankExpensesStream(TransactionsStream): + +class BankExpensesStream(TransactionsParentStream): """Define custom stream.""" name = "bank_expenses" - path = "/Transaction" #?$filter=type eq 'Bank Expense and modifiedOn ge ' + path = "/Transaction" # ?$filter=type eq 'Bank Expense and modifiedOn ge ' primary_keys = ["transactionId"] replication_key = "modifiedOn" paginate = True @@ -234,6 +240,7 @@ class VendorsStream(Restaurant365Stream): th.Property("modifiedOn", th.DateTimeType), ).to_dict() + class ItemsStream(Restaurant365Stream): """Define custom stream.""" @@ -252,7 +259,6 @@ class ItemsStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() @@ -274,9 +280,9 @@ class LocationsStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() + class EmployeesStream(Restaurant365Stream): """Define custom stream.""" @@ -312,9 +318,9 @@ class EmployeesStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() + class JobTitleStream(Restaurant365Stream): """Define custom stream.""" @@ -337,9 +343,9 @@ class JobTitleStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() + class LaborDetailStream(Restaurant365Stream): """Define custom stream.""" @@ -372,7 +378,6 @@ class LaborDetailStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() @@ -393,9 +398,9 @@ class POSEmployeeStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() + class SalesEmployeeStream(LimitedTimeframeStream): """Define custom stream.""" @@ -430,11 +435,9 @@ class SalesEmployeeStream(LimitedTimeframeStream): th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), th.Property("serviceType", th.StringType), - ).to_dict() - class SalesDetailStream(LimitedTimeframeStream): """Define custom stream.""" @@ -464,7 +467,6 @@ class SalesDetailStream(LimitedTimeframeStream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() @@ -499,11 +501,9 @@ class SalesPaymentStream(LimitedTimeframeStream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() - class EntityDeletedStream(Restaurant365Stream): """Define custom stream.""" @@ -516,9 +516,67 @@ class EntityDeletedStream(Restaurant365Stream): th.Property("entityName", th.StringType), th.Property("deletedOn", th.DateTimeType), th.Property("rowVersion", th.IntegerType), - ).to_dict() + +class TransactionsStream(TransactionsParentStream): + """Define custom stream.""" + + name = "transaction" + # We get node limit exceeded for number larger than this one. + batch_size = 20 + result_count = 0 + + def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: + return {"transaction_id": record["transactionId"]} + + def parse_response(self, response: requests.Response) -> t.Iterable[dict]: + """Parse the response and return an iterator of result records. + + Args: + response: A raw `requests.Response`_ object. + + Yields: + One item for every item found in the response. + + .. _requests.Response: + https://requests.readthedocs.io/en/latest/api/#requests.Response + """ + data = response.json() + if "value" in data: + # We cant get this number later because it breaks the generator flow. + self.result_count = len(data["value"]) + yield from extract_jsonpath(self.records_jsonpath, input=data) + + def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]: + """Override the get records to call child stream once batch size is reached we have processed all of the records. .""" # noqa: E501 + batch_size = self.batch_size + current_batch = [] + for i, record in enumerate(self.request_records(context), 1): + num_records = self.result_count + transformed_record = self.post_process(record, context) + if transformed_record is None: + # Record filtered out during post_process() + continue + current_batch.append(record["transactionId"]) + if ( + i % batch_size == 0 or i == num_records + ): # Check if the batch is full or it's the last record + self._sync_children({"transaction_ids": current_batch}) + current_batch = [] + yield transformed_record + + def _process_record( + self, + record: dict, + child_context: dict | None = None, + partition_context: dict | None = None, + ) -> None: + """Need to override this function because we don't want to call child stream for each parent request. + Above get_records will call child stream in batches. + """ + + class TransactionDetailsStream(LimitedTimeframeStream): """Define custom stream.""" @@ -547,9 +605,8 @@ class TransactionDetailsStream(LimitedTimeframeStream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() - + def get_next_page_token( self, response: requests.Response, previous_token: t.Optional[t.Any] ) -> t.Optional[t.Any]: @@ -557,18 +614,19 @@ def get_next_page_token( # Check if pagination is enabled data = response.json() # Check for the presence of a next page link in the response data. nextLink is only present if there are more than 5000 records in filter response. - #This is unlikely that a single transaction will have 5k records but it is possible so leaving this code part here. + # This is unlikely that a single transaction will have 5k records but it is possible so leaving this code part here. if "@odata.nextLink" in data: # Increment the skip counter for pagination self.skip += 5000 # Update the previous token if it exists if previous_token: - previous_token = previous_token['token'] + previous_token = previous_token["token"] # Return the next page token and the updated skip value - return {"token":previous_token,"skip":self.skip} + return {"token": previous_token, "skip": self.skip} self.skip = 0 # Return the next token and the current skip value return None + def get_url_params( self, context: dict | None, # noqa: ARG002 @@ -580,10 +638,15 @@ def get_url_params( skip = 0 if next_page_token: token_date, skip = next_page_token["token"], next_page_token["skip"] - if context.get('transaction_id'): - params["$filter"] = ( - f"transactionId eq {context.get('transaction_id')}" + if context.get("transaction_ids"): + params["$filter"] = " or ".join( + [ + f"transactionId eq '{transaction_id}'" + for transaction_id in context.get("transaction_ids") + ], ) - if skip>0: + # Replace ' from the populated filter to avoid errors + params["$filter"] = params["$filter"].replace("'", "") + if skip > 0: params["$skip"] = skip return params