Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Converted TransactionDetail to a child stream due to API instability #20

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions tap_restaurant365/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_next_page_token(
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
# Check if pagination is enabled
if self.paginate == True:
if self.paginate:
data = response.json()
# Check for the presence of a next page link in the response data. nextLink is only present if there are more than 5000 records in filter response.
if "@odata.nextLink" in data:
Expand Down Expand Up @@ -165,6 +165,9 @@ class TransactionsStream(LimitedTimeframeStream):
th.Property("modifiedBy", th.StringType)
).to_dict()

def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict:
return {"transaction_id":record["transactionId"]}


class BillsStream(TransactionsStream):
"""Define custom stream."""
Expand Down Expand Up @@ -522,8 +525,9 @@ class TransactionDetailsStream(LimitedTimeframeStream):
name = "transaction_detail"
path = "/TransactionDetail"
primary_keys = ["transactionDetailId"]
replication_key = "modifiedOn"
replication_key = None
paginate = True
parent_stream_type = TransactionsStream
schema = th.PropertiesList(
th.Property("transactionDetailId", th.StringType),
th.Property("transactionId", th.StringType),
Expand All @@ -545,3 +549,41 @@ class TransactionDetailsStream(LimitedTimeframeStream):
th.Property("modifiedOn", th.DateTimeType),

).to_dict()

def get_next_page_token(
self, response: requests.Response, previous_token: t.Optional[t.Any]
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
# Check if pagination is enabled
data = response.json()
# Check for the presence of a next page link in the response data. nextLink is only present if there are more than 5000 records in filter response.
#This is unlikely that a single transaction will have 5k records but it is possible so leaving this code part here.
if "@odata.nextLink" in data:
# Increment the skip counter for pagination
self.skip += 5000
# Update the previous token if it exists
if previous_token:
previous_token = previous_token['token']
# Return the next page token and the updated skip value
return {"token":previous_token,"skip":self.skip}
self.skip = 0
# Return the next token and the current skip value
return None
def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:

params: dict = {}
token_date = None
skip = 0
if next_page_token:
token_date, skip = next_page_token["token"], next_page_token["skip"]
if context.get('transaction_id'):
params["$filter"] = (
f"transactionId eq {context.get('transaction_id')}"
)
if skip>0:
params["$skip"] = skip
return params
Loading