From 6030755694b5ff90809e976f86fb08259ce57c00 Mon Sep 17 00:00:00 2001 From: Adil Ahmed Date: Wed, 17 Apr 2024 22:33:14 +0500 Subject: [PATCH 1/3] Pagination fix for the duplicates. --- tap_restaurant365/streams.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tap_restaurant365/streams.py b/tap_restaurant365/streams.py index cabe4e6..a7e07c1 100644 --- a/tap_restaurant365/streams.py +++ b/tap_restaurant365/streams.py @@ -19,6 +19,7 @@ class LimitedTimeframeStream(Restaurant365Stream): name = "vendors" path = "/Company" + previous_replications = [] # noqa: RUF012 def get_next_page_token( self, response: requests.Response, previous_token: t.Optional[t.Any] @@ -32,8 +33,17 @@ def get_next_page_token( previous_token = previous_token['token'] return {"token":previous_token,"skip":self.skip} else: - self.skip = 0 - start_date = (parser.parse(self.tap_state["bookmarks"][self.name]['starting_replication_value']) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date")) + self.skip = 0 + #Pick starting value just incase progress marker is not present. + replication_key_value = self.tap_state["bookmarks"][self.name]['starting_replication_value'] + if "progress_markers" in self.tap_state["bookmarks"][self.name]: + replication_key_value = self.tap_state["bookmarks"][self.name]['progress_markers']["replication_key_value"] + # if replication_key_value in self.previous_replications: + # self.logger.warn(f"Duplicated replication_key_value: {replication_key_value}") # noqa: G004 + # self.logger.warn(f"Possible previous duplicate request. {response.request.url}") # noqa: G004 + + self.previous_replications.append(replication_key_value) + start_date = (parser.parse(replication_key_value) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date")) today = datetime.today() if previous_token: if "token" in previous_token: @@ -65,6 +75,7 @@ def get_url_params( end_date = start_date + timedelta(days = self.days_delta) if self.replication_key: params["$filter"] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}" + params['$orderby'] = f"{self.replication_key}" if self.name == "journal_entries": #set a date in the stream to check later to see if we need to keep calling to the stream params["$filter"] += f" and type eq 'Journal Entry'" From 57728d19ff1e1db4282a1af88f87f4e6c675ac05 Mon Sep 17 00:00:00 2001 From: Adil Ahmed Date: Thu, 18 Apr 2024 03:51:08 +0500 Subject: [PATCH 2/3] Revised pagination logic to be incremental based on replication value. --- tap_restaurant365/streams.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/tap_restaurant365/streams.py b/tap_restaurant365/streams.py index a7e07c1..1db2b59 100644 --- a/tap_restaurant365/streams.py +++ b/tap_restaurant365/streams.py @@ -38,21 +38,11 @@ def get_next_page_token( replication_key_value = self.tap_state["bookmarks"][self.name]['starting_replication_value'] if "progress_markers" in self.tap_state["bookmarks"][self.name]: replication_key_value = self.tap_state["bookmarks"][self.name]['progress_markers']["replication_key_value"] - # if replication_key_value in self.previous_replications: - # self.logger.warn(f"Duplicated replication_key_value: {replication_key_value}") # noqa: G004 - # self.logger.warn(f"Possible previous duplicate request. {response.request.url}") # noqa: G004 self.previous_replications.append(replication_key_value) start_date = (parser.parse(replication_key_value) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date")) today = datetime.today() - if previous_token: - if "token" in previous_token: - previous_token = previous_token['token'] - if previous_token: - #Look for records that are greater than the previous token - previous_token = previous_token + timedelta(seconds=1) - previous_token = previous_token or start_date - next_token = (previous_token + timedelta(days=self.days_delta)).replace(tzinfo=None) + next_token = start_date.replace(tzinfo=None) if (today - next_token).days < self.days_delta: self.paginate = False @@ -74,7 +64,9 @@ def get_url_params( start_date = token_date or self.get_starting_time(context) end_date = start_date + timedelta(days = self.days_delta) if self.replication_key: - params["$filter"] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}" + params["$filter"] = ( + f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT23:59:59Z')}" + ) params['$orderby'] = f"{self.replication_key}" if self.name == "journal_entries": #set a date in the stream to check later to see if we need to keep calling to the stream From ee617352a9ea932ad5a21f617088561a19a79bba Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Wed, 17 Apr 2024 23:49:06 -0400 Subject: [PATCH 3/3] Update streams.py --- tap_restaurant365/streams.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tap_restaurant365/streams.py b/tap_restaurant365/streams.py index 1db2b59..9db9c76 100644 --- a/tap_restaurant365/streams.py +++ b/tap_restaurant365/streams.py @@ -19,7 +19,6 @@ class LimitedTimeframeStream(Restaurant365Stream): name = "vendors" path = "/Company" - previous_replications = [] # noqa: RUF012 def get_next_page_token( self, response: requests.Response, previous_token: t.Optional[t.Any] @@ -39,7 +38,6 @@ def get_next_page_token( if "progress_markers" in self.tap_state["bookmarks"][self.name]: replication_key_value = self.tap_state["bookmarks"][self.name]['progress_markers']["replication_key_value"] - self.previous_replications.append(replication_key_value) start_date = (parser.parse(replication_key_value) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date")) today = datetime.today() next_token = start_date.replace(tzinfo=None)