From 6508195efbf259305172f944ee4ae8df92b03216 Mon Sep 17 00:00:00 2001 From: Adil Ahmed Date: Sat, 27 Apr 2024 03:05:36 +0500 Subject: [PATCH] Black formatting. --- tap_restaurant365/client.py | 11 ++-- tap_restaurant365/streams.py | 107 ++++++++++++++++++----------------- 2 files changed, 61 insertions(+), 57 deletions(-) diff --git a/tap_restaurant365/client.py b/tap_restaurant365/client.py index d78d7d0..0a3a1af 100644 --- a/tap_restaurant365/client.py +++ b/tap_restaurant365/client.py @@ -18,15 +18,16 @@ class Restaurant365Stream(RESTStream): """Restaurant365 stream class.""" + skip = 0 days_delta = 10 + @property def url_base(self) -> str: """Return the API URL root, configurable via tap settings.""" return "https://odata.restaurant365.net/api/v2/views" - records_jsonpath = "$.value[*]" - + records_jsonpath = "$.value[*]" @property def authenticator(self) -> BasicAuthenticator: @@ -60,9 +61,7 @@ def get_new_paginator(self) -> BaseAPIPaginator: # else: # params - return super().get_new_paginator() - def get_starting_time(self, context): start_date = self.config.get("start_date") @@ -88,11 +87,11 @@ def get_url_params( params: dict = {} if self.replication_key: - start_date = self.get_starting_time(context).strftime('%Y-%m-%dT%H:%M:%SZ') + start_date = self.get_starting_time(context).strftime("%Y-%m-%dT%H:%M:%SZ") params["$filter"] = f"{self.replication_key} ge {start_date}" return params - + def validate_response(self, response: requests.Response) -> None: if ( response.status_code in self.extra_retry_statuses diff --git a/tap_restaurant365/streams.py b/tap_restaurant365/streams.py index df766bd..2f1b359 100644 --- a/tap_restaurant365/streams.py +++ b/tap_restaurant365/streams.py @@ -33,37 +33,45 @@ def get_next_page_token( self.skip += 5000 # Update the previous token if it exists if previous_token: - previous_token = previous_token['token'] + previous_token = previous_token["token"] # Return the next page token and the updated skip value - return {"token":previous_token,"skip":self.skip} + return {"token": previous_token, "skip": self.skip} else: # Reset skip value for a new pagination sequence self.skip = 0 # Determine the starting replication value for data extraction - replication_key_value = self.tap_state["bookmarks"][self.name]['starting_replication_value'] + replication_key_value = self.tap_state["bookmarks"][self.name][ + "starting_replication_value" + ] # Update the replication key value if progress markers are present if "progress_markers" in self.tap_state["bookmarks"][self.name]: - replication_key_value = self.tap_state["bookmarks"][self.name]['progress_markers']["replication_key_value"] + replication_key_value = self.tap_state["bookmarks"][self.name][ + "progress_markers" + ]["replication_key_value"] # Calculate the start date for data extraction - start_date = (parser.parse(replication_key_value) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date")) + start_date = ( + parser.parse(replication_key_value) + timedelta(seconds=1) + ) or parser.parse(self.config.get("start_date")) today = datetime.today() # Adjust the start date based on the previous token if applicable (will occur if progress marker is unable to find a value in empty data response) if ( previous_token and "token" in previous_token - and previous_token['token'] + and previous_token["token"] and start_date.replace(tzinfo=None) <= previous_token["token"].replace(tzinfo=None) ): - start_date = previous_token["token"] + timedelta(days=self.days_delta) + start_date = previous_token["token"] + timedelta( + days=self.days_delta + ) next_token = start_date.replace(tzinfo=None) # Disable pagination if the next token's date is in the future if (today - next_token).days < 0: self.paginate = False # Return the next token and the current skip value - return {"token":next_token,'skip':self.skip} + return {"token": next_token, "skip": self.skip} else: # Return None if pagination is not enabled return None @@ -80,15 +88,15 @@ def get_url_params( if next_page_token: token_date, skip = next_page_token["token"], next_page_token["skip"] start_date = token_date or self.get_starting_time(context) - end_date = start_date + timedelta(days = self.days_delta) + end_date = start_date + timedelta(days=self.days_delta) if self.replication_key: - params["$filter"] = ( - f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT23:59:59Z')}" - ) - #Order by replication key so the response is consistent - params['$orderby'] = f"{self.replication_key}" + params[ + "$filter" + ] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT23:59:59Z')}" + # Order by replication key so the response is consistent + params["$orderby"] = f"{self.replication_key}" if self.name == "journal_entries": - #set a date in the stream to check later to see if we need to keep calling to the stream + # set a date in the stream to check later to see if we need to keep calling to the stream params["$filter"] += f" and type eq 'Journal Entry'" if self.name == "bills": params["$filter"] += f" and type eq 'AP Invoice'" @@ -99,13 +107,11 @@ def get_url_params( if self.name == "bank_expenses": params["$filter"] += f" and type eq 'Bank Expense'" # - if skip>0: + if skip > 0: params["$skip"] = skip return params - - class AccountsStream(Restaurant365Stream): """Define custom stream.""" @@ -136,7 +142,7 @@ class AccountsStream(Restaurant365Stream): th.Property("createdBy", th.StringType), th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), - th.Property("modifiedOn", th.DateTimeType) + th.Property("modifiedOn", th.DateTimeType), ).to_dict() @@ -162,7 +168,7 @@ class TransactionsParentStream(LimitedTimeframeStream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedOn", th.DateTimeType), th.Property("createdBy", th.StringType), - th.Property("modifiedBy", th.StringType) + th.Property("modifiedBy", th.StringType), ).to_dict() @@ -170,7 +176,7 @@ class BillsStream(TransactionsParentStream): """Define custom stream.""" name = "bills" - path = "/Transaction" #?$filter=type eq 'AP Invoices' + path = "/Transaction" # ?$filter=type eq 'AP Invoices' primary_keys = ["transactionId"] replication_key = "modifiedOn" paginate = True @@ -180,34 +186,37 @@ class JournalEntriesStream(TransactionsParentStream): """Define custom stream.""" name = "journal_entries" - path = "/Transaction" #?$filter=type eq 'Journal Entry and modifiedOn ge ' + path = "/Transaction" # ?$filter=type eq 'Journal Entry and modifiedOn ge ' primary_keys = ["transactionId"] replication_key = "modifiedOn" paginate = True + class CreditMemosStream(TransactionsParentStream): """Define custom stream.""" name = "credit_memos" - path = "/Transaction" #?$filter=type eq 'AP Credit memo and modifiedOn ge ' + path = "/Transaction" # ?$filter=type eq 'AP Credit memo and modifiedOn ge ' primary_keys = ["transactionId"] replication_key = "modifiedOn" paginate = True + class StockCountStream(TransactionsParentStream): """Define custom stream.""" name = "stock_count" - path = "/Transaction" #?$filter=type eq 'Stock Count and modifiedOn ge ' + path = "/Transaction" # ?$filter=type eq 'Stock Count and modifiedOn ge ' primary_keys = ["transactionId"] replication_key = "modifiedOn" paginate = True + class BankExpensesStream(TransactionsParentStream): """Define custom stream.""" name = "bank_expenses" - path = "/Transaction" #?$filter=type eq 'Bank Expense and modifiedOn ge ' + path = "/Transaction" # ?$filter=type eq 'Bank Expense and modifiedOn ge ' primary_keys = ["transactionId"] replication_key = "modifiedOn" paginate = True @@ -231,6 +240,7 @@ class VendorsStream(Restaurant365Stream): th.Property("modifiedOn", th.DateTimeType), ).to_dict() + class ItemsStream(Restaurant365Stream): """Define custom stream.""" @@ -249,7 +259,6 @@ class ItemsStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() @@ -271,9 +280,9 @@ class LocationsStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() + class EmployeesStream(Restaurant365Stream): """Define custom stream.""" @@ -309,9 +318,9 @@ class EmployeesStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() + class JobTitleStream(Restaurant365Stream): """Define custom stream.""" @@ -334,9 +343,9 @@ class JobTitleStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() + class LaborDetailStream(Restaurant365Stream): """Define custom stream.""" @@ -369,7 +378,6 @@ class LaborDetailStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() @@ -390,9 +398,9 @@ class POSEmployeeStream(Restaurant365Stream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() + class SalesEmployeeStream(LimitedTimeframeStream): """Define custom stream.""" @@ -427,11 +435,9 @@ class SalesEmployeeStream(LimitedTimeframeStream): th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), th.Property("serviceType", th.StringType), - ).to_dict() - class SalesDetailStream(LimitedTimeframeStream): """Define custom stream.""" @@ -461,7 +467,6 @@ class SalesDetailStream(LimitedTimeframeStream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() @@ -496,11 +501,9 @@ class SalesPaymentStream(LimitedTimeframeStream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() - class EntityDeletedStream(Restaurant365Stream): """Define custom stream.""" @@ -513,20 +516,20 @@ class EntityDeletedStream(Restaurant365Stream): th.Property("entityName", th.StringType), th.Property("deletedOn", th.DateTimeType), th.Property("rowVersion", th.IntegerType), - ).to_dict() + class TransactionsStream(TransactionsParentStream): """Define custom stream.""" name = "transaction" - #We get node limit exceeded for number larger than this one. + # We get node limit exceeded for number larger than this one. batch_size = 20 result_count = 0 def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: return {"transaction_id": record["transactionId"]} - + def parse_response(self, response: requests.Response) -> t.Iterable[dict]: """Parse the response and return an iterator of result records. @@ -541,7 +544,7 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]: """ data = response.json() if "value" in data: - #We cant get this number later because it breaks the generator flow. + # We cant get this number later because it breaks the generator flow. self.result_count = len(data["value"]) yield from extract_jsonpath(self.records_jsonpath, input=data) @@ -556,8 +559,10 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]: # Record filtered out during post_process() continue current_batch.append(record["transactionId"]) - if i % batch_size == 0 or i == num_records: # Check if the batch is full or it's the last record - self._sync_children({"transaction_ids":current_batch}) + if ( + i % batch_size == 0 or i == num_records + ): # Check if the batch is full or it's the last record + self._sync_children({"transaction_ids": current_batch}) current_batch = [] yield transformed_record @@ -600,9 +605,8 @@ class TransactionDetailsStream(LimitedTimeframeStream): th.Property("createdOn", th.DateTimeType), th.Property("modifiedBy", th.StringType), th.Property("modifiedOn", th.DateTimeType), - ).to_dict() - + def get_next_page_token( self, response: requests.Response, previous_token: t.Optional[t.Any] ) -> t.Optional[t.Any]: @@ -610,18 +614,19 @@ def get_next_page_token( # Check if pagination is enabled data = response.json() # Check for the presence of a next page link in the response data. nextLink is only present if there are more than 5000 records in filter response. - #This is unlikely that a single transaction will have 5k records but it is possible so leaving this code part here. + # This is unlikely that a single transaction will have 5k records but it is possible so leaving this code part here. if "@odata.nextLink" in data: # Increment the skip counter for pagination self.skip += 5000 # Update the previous token if it exists if previous_token: - previous_token = previous_token['token'] + previous_token = previous_token["token"] # Return the next page token and the updated skip value - return {"token":previous_token,"skip":self.skip} + return {"token": previous_token, "skip": self.skip} self.skip = 0 # Return the next token and the current skip value return None + def get_url_params( self, context: dict | None, # noqa: ARG002 @@ -633,15 +638,15 @@ def get_url_params( skip = 0 if next_page_token: token_date, skip = next_page_token["token"], next_page_token["skip"] - if context.get('transaction_ids'): + if context.get("transaction_ids"): params["$filter"] = " or ".join( [ f"transactionId eq '{transaction_id}'" for transaction_id in context.get("transaction_ids") ], ) - #Replace ' from the populated filter to avoid errors - params["$filter"] = params["$filter"].replace("'","") - if skip>0: + # Replace ' from the populated filter to avoid errors + params["$filter"] = params["$filter"].replace("'", "") + if skip > 0: params["$skip"] = skip return params