Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination fix for non-timeframe streams #24

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions tap_restaurant365/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from datetime import timedelta
from http import HTTPStatus
from typing import Any, Callable
from typing import Any, Callable, Optional
from urllib.parse import parse_qs, urlparse

import requests
from dateutil import parser
Expand Down Expand Up @@ -52,16 +53,20 @@ def http_headers(self) -> dict:
# headers["Private-Token"] = self.config.get("auth_token") # noqa: ERA001
return headers

def get_new_paginator(self) -> BaseAPIPaginator:

# if self.name in ['bills', 'journal_entries']:
# if parser.parse(self.started_on) >= datetime.now():
# self.logger.info(f"Synced all data until {self.started_on}")
# pass
# else:
# params

return super().get_new_paginator()
def get_next_page_token(
self, response: requests.Response, previous_token: Any | None
) -> Any | None:
data = response.json()
next_page_token = None
if "@odata.nextLink" in data:
url = data["@odata.nextLink"]
parsed_url = urlparse(url)
# Extract the query parameters
params = parse_qs(parsed_url.query)
if "$skip" in params and len(params["$skip"]) > 0 :
return int(params["$skip"][0])

return next_page_token

def get_starting_time(self, context):
start_date = self.config.get("start_date")
Expand Down Expand Up @@ -91,7 +96,8 @@ def get_url_params(
if self.replication_key:
start_date = self.get_starting_time(context).strftime("%Y-%m-%dT%H:%M:%SZ")
params["$filter"] = f"{self.replication_key} ge {start_date}"

if next_page_token:
params["$skip"] = next_page_token
return params

def validate_response(self, response: requests.Response) -> None:
Expand Down
Loading