Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination fix for fetching all of the data. #19

Merged
merged 1 commit into from
Apr 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions tap_restaurant365/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,31 @@ def get_next_page_token(
self, response: requests.Response, previous_token: t.Optional[t.Any]
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
# Check if pagination is enabled
if self.paginate == True:
data = response.json()
# Check for the presence of a next page link in the response data. nextLink is only present if there are more than 5000 records in filter response.
if "@odata.nextLink" in data:
# Increment the skip counter for pagination
self.skip += 5000
# Update the previous token if it exists
if previous_token:
previous_token = previous_token['token']
# Return the next page token and the updated skip value
return {"token":previous_token,"skip":self.skip}
else:
# Reset skip value for a new pagination sequence
self.skip = 0
#Pick starting value just incase progress marker is not present.
# Determine the starting replication value for data extraction
replication_key_value = self.tap_state["bookmarks"][self.name]['starting_replication_value']
# Update the replication key value if progress markers are present
if "progress_markers" in self.tap_state["bookmarks"][self.name]:
replication_key_value = self.tap_state["bookmarks"][self.name]['progress_markers']["replication_key_value"]

# Calculate the start date for data extraction
start_date = (parser.parse(replication_key_value) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date"))
today = datetime.today()
# Adjust the start date based on the previous token if applicable (will occur if progress marker is unable to find a value in empty data response)
if (
previous_token
and "token" in previous_token
Expand All @@ -50,10 +59,13 @@ def get_next_page_token(
start_date = previous_token["token"] + timedelta(days=self.days_delta)
next_token = start_date.replace(tzinfo=None)

if (today - next_token).days < self.days_delta:
# Disable pagination if the next token's date is in the future
if (today - next_token).days < 0:
self.paginate = False
# Return the next token and the current skip value
return {"token":next_token,'skip':self.skip}
else:
# Return None if pagination is not enabled
return None

def get_url_params(
Expand All @@ -73,6 +85,7 @@ def get_url_params(
params["$filter"] = (
f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT23:59:59Z')}"
)
#Order by replication key so the response is consistent
params['$orderby'] = f"{self.replication_key}"
if self.name == "journal_entries":
#set a date in the stream to check later to see if we need to keep calling to the stream
Expand Down
Loading