Skip to content

Commit

Permalink
fixed incremental streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Jong authored and Jong committed Nov 15, 2023
1 parent 068b920 commit 1bfaa1c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 17 deletions.
14 changes: 7 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 10 additions & 8 deletions tap_restaurant365/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ def get_new_paginator(self) -> BaseAPIPaginator:
return super().get_new_paginator()


def get_starting_time(self, context):
start_date = self.config.get("start_date")
if start_date:
start_date = parser.parse(self.config.get("start_date"))
rep_key = self.get_starting_timestamp(context)
return rep_key or start_date




def get_url_params(
Expand All @@ -84,14 +92,8 @@ def get_url_params(

params: dict = {}
if self.replication_key:
params["$filter"] = f"{self.replication_key} ge {self.config.get('start_date')}"
if self.name == "journal_entries":
#set a date in the stream to check later to see if we need to keep calling to the stream
params["$filter"] += f" and type eq 'Journal Entry' and {self.replication_key} lt {(parser.parse(self.config.get('start_date')) + timedelta(days=30)).isoformat()}"
# if self.name == "bills":
# #set a date in the stream to check later to see if we need to keep calling to the stream
# self.started_on = (parser.parse(self.config.get('start_date')) + timedelta(days=30)).isoformat()
# params["$filter"] += f" and type eq 'AP Invoices' and {self.replication_key} lt {self.started_on}"
start_date = self.get_starting_time(context).strftime('%Y-%m-%dT%H:%M:%SZ')
params["$filter"] = f"{self.replication_key} ge {start_date}"

return params

Expand Down
6 changes: 4 additions & 2 deletions tap_restaurant365/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ def get_next_page_token(
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
if self.paginate == True:
# start_date = self.config.get("start_date")
start_date = parser.parse(self.tap_state["bookmarks"][self.name]['starting_replication_value']) or parser.parse(self.config.get("start_date"))
today = datetime.today()
previous_token = previous_token or parser.parse(self.config.get("start_date"))
previous_token = previous_token or start_date
next_token = (previous_token + timedelta(days=30)).replace(tzinfo=None)

if (today - next_token).days < 30:
Expand All @@ -98,7 +100,7 @@ def get_url_params(
#x.strftime('%Y-%m-%dT%H:%M:%SZ')


start_date = next_page_token or parser.parse(self.config.get("start_date"))
start_date = next_page_token or self.get_starting_time(context)
end_date = start_date + timedelta(days = 30)
if self.replication_key:
params["$filter"] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"
Expand Down

0 comments on commit 1bfaa1c

Please sign in to comment.