From 8e6399c2c6a89afaee81eca1e88e0938f41e91bc Mon Sep 17 00:00:00 2001 From: xacadil <92389481+xacadil@users.noreply.github.com> Date: Fri, 19 Apr 2024 03:39:35 +0500 Subject: [PATCH] Pagination fix for fetching all of the data. (#19) --- tap_restaurant365/streams.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tap_restaurant365/streams.py b/tap_restaurant365/streams.py index f90aedd..a7bc129 100644 --- a/tap_restaurant365/streams.py +++ b/tap_restaurant365/streams.py @@ -24,22 +24,31 @@ def get_next_page_token( self, response: requests.Response, previous_token: t.Optional[t.Any] ) -> t.Optional[t.Any]: """Return a token for identifying next page or None if no more pages.""" + # Check if pagination is enabled if self.paginate == True: data = response.json() + # Check for the presence of a next page link in the response data. nextLink is only present if there are more than 5000 records in filter response. if "@odata.nextLink" in data: + # Increment the skip counter for pagination self.skip += 5000 + # Update the previous token if it exists if previous_token: previous_token = previous_token['token'] + # Return the next page token and the updated skip value return {"token":previous_token,"skip":self.skip} else: + # Reset skip value for a new pagination sequence self.skip = 0 - #Pick starting value just incase progress marker is not present. + # Determine the starting replication value for data extraction replication_key_value = self.tap_state["bookmarks"][self.name]['starting_replication_value'] + # Update the replication key value if progress markers are present if "progress_markers" in self.tap_state["bookmarks"][self.name]: replication_key_value = self.tap_state["bookmarks"][self.name]['progress_markers']["replication_key_value"] + # Calculate the start date for data extraction start_date = (parser.parse(replication_key_value) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date")) today = datetime.today() + # Adjust the start date based on the previous token if applicable (will occur if progress marker is unable to find a value in empty data response) if ( previous_token and "token" in previous_token @@ -50,10 +59,13 @@ def get_next_page_token( start_date = previous_token["token"] + timedelta(days=self.days_delta) next_token = start_date.replace(tzinfo=None) - if (today - next_token).days < self.days_delta: + # Disable pagination if the next token's date is in the future + if (today - next_token).days < 0: self.paginate = False + # Return the next token and the current skip value return {"token":next_token,'skip':self.skip} else: + # Return None if pagination is not enabled return None def get_url_params( @@ -73,6 +85,7 @@ def get_url_params( params["$filter"] = ( f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT23:59:59Z')}" ) + #Order by replication key so the response is consistent params['$orderby'] = f"{self.replication_key}" if self.name == "journal_entries": #set a date in the stream to check later to see if we need to keep calling to the stream